主頁 > 資料庫 > 大資料Hadoop之——Flink Table API 和 SQL(單機Kafka)

大資料Hadoop之——Flink Table API 和 SQL(單機Kafka)

2022-05-06 08:22:24 資料庫

目錄
  • 一、Table API 和 Flink SQL 是什么
  • 二、配置Table依賴(scala)
  • 三、兩種 planner(old & blink)的區別
  • 四、Catalogs
    • 1)Catalog概述
    • 2)Catalog 型別
    • 3)如何創建 Flink 表并將其注冊到 Catalog
      • 1、下載flink-sql-connector-hive相關版本jar包,放在$FLINK_HOME/lib目錄下
      • 2、添加Maven 依賴
      • 2、使用 SQL DDL
  • 五、SQL 客戶端
    • 1)啟動 SQL 客戶端命令列界面
    • 2)執行 SQL 查詢
      • 1、standalone模式(默認)
      • 2、yarn-session模式(常駐集群)
      • 3、啟動sql-client on yarn-session(測驗驗證)
    • 3)CLI 為維護和可視化結果提供三種模式
    • 4)查看幫助
    • 5)flink1.14.3中集成hive3.1.2(HiveCatalog )
      • 1、使用 Flink 提供的 Hive jar
      • 2、配置hive-site.xml并啟動metastore服務和hiveserver2服務
      • 3、啟動flink集群(on yarn)
      • 3、配置flink sql
  • 六、表執行環境與表介紹
    • 1)創建表的執行環境(TableEnvironment)
    • 2)在 Catalog 中注冊表
      • 1、表(Table)的概念
      • 2、臨時表(Temporary Table)和永久表(Permanent Table)
      • 3、屏蔽(Shadowing)
  • 七、Table API
  • 八、SQL
  • 九、Table & SQL Connectors
    • 1)概述
    • 2)Kafka安裝(單機)
      • 1、下載安裝包
      • 2、配置環境變數
      • 3、配置kafka
      • 3、配置ZooKeeper
      • 4、啟動kafka
      • 5、驗證
    • 3)Formats
      • 1、JSON Format
      • 2、CSV Format
    • 4)Apache Kafka SQL 連接器
      • 1、下載對應的jar包到$FLINK_HOME/lib目錄下
      • 2、創建 Kafka 表

Table API 和 SQL 集成在同一套 API 中, 這套 API 的核心概念是Table,用作查詢的輸入和輸出,這套 API 都是批處理和流處理統一的上層 API,這意味著在無邊界的實時資料流有邊界的歷史記錄資料流上,關系型 API 會以相同的語意執行查詢,并產生相同的結果,Table API 和 SQL借助了 Apache Calcite 來進行查詢的決議,校驗以及優化,它們可以與 DataStream 和DataSet API 無縫集成,并支持用戶自定義的標量函式,聚合函式以及表值函式,

Flink官方下載:https://flink.apache.org/downloads.html
官方檔案(最新版本):https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/common/
官方檔案(當前最新穩定版1.14.3):https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/common/

maven地址:

https://search.maven.org/
https://mvnrepository.com/

二、配置Table依賴(scala)

首先先配置flink基礎依賴

【問題提示】官方使用的2.11版本,但是我這里使用的2.12版本,

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
  <version>1.14.3</version>
</dependency>

除此之外,如果你想在 IDE 本地運行你的程式,你需要添加下面的模塊,具體用哪個取決于你使用哪個 Planner

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner_2.12</artifactId>
  <version>1.14.3</version>
</dependency>

添加擴展依賴(可選)

如果你想實作自定義格式或連接器 用于(反)序列化行或一組用戶定義的函式,下面的依賴就足夠了,編譯出來的 jar 檔案可以直接給 SQL Client 使用:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-common</artifactId>
  <version>1.14.3</version>
</dependency>

【溫馨提示】如果需要本地直接運行,得先把scope先注釋掉,要不然會報如下錯誤:Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/table/api/bridge/scala/StreamTableEnvironment$

  • flink-table-planner:planner 計劃器,是 table API 最主要的部分,提供了運行時環境和生
    成程式執行計劃的 planner;
  • flink-table-api-scala-bridge:bridge 橋接器,主要負責 table API 和 DataStream/DataSet API
    的連接支持,按照語言分 java 和 scala,
  • flink-table-common:當然,如果想使用用戶自定義函式,或是跟 kafka 做連接,需要有一個 SQL client,這個包含在 flink-table-common 里,

【溫馨提示】這里的flink-table-planner和flink-table-api-scala-bridge兩個依賴,是 IDE 環境下運行需要添加的;如果是生產環境,lib 目錄下默認已經有了 planner,就只需要有 bridge 就可以了,

三、兩種 planner(old & blink)的區別

  1. 批流統一Blink 將批處理作業,視為流式處理的特殊情況,所以,blink 不支持表和
    DataSet 之間的轉換
    ,批處理作業將不轉換為 DataSet 應用程式,而是跟流處理一樣,轉換
    為 DataStream 程式來處理
    ,因 為 批 流 統 一 , Blink planner 也 不 支 持 BatchTableSource , 而 使 用 有 界 的StreamTableSource 代替,
  2. Blink planner 只支持全新的目錄,不支持已棄用的 ExternalCatalog,
  3. 舊 planner 和 Blink planner 的 FilterableTableSource 實作不兼容,舊的 planner 會把
    PlannerExpressions 下推到 filterableTableSource 中,而 blink planner 則會把 Expressions 下推,
  4. 基于字串的鍵值配置選項僅適用于 Blink planner,
  5. PlannerConfig 在兩個 planner 中的實作不同,
  6. Blink planner 會將多個 sink 優化在一個 DAG 中(僅在 TableEnvironment 上受支持,而
    在 StreamTableEnvironment 上不受支持),而舊 planner 的優化總是將每一個 sink 放在一個新
    的 DAG 中,其中所有 DAG 彼此獨立,
  7. 舊的 planner 不支持目錄統計,而 Blink planner 支持,

四、Catalogs

官方檔案:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/catalogs/

1)Catalog概述

  • Catalog 提供了元資料資訊,例如資料庫、表、磁區、視圖以及資料庫或其他外部系統中存盤的函式和資訊,

  • 資料處理最關鍵的方面之一是管理元資料, 元資料可以是臨時的,例如臨時表、或者通過 TableEnvironment 注冊的 UDF, 元資料也可以是持久化的,例如 Hive Metastore 中的元資料,Catalog 提供了一個統一的API,用于管理元資料,并使其可以從 Table API 和 SQL 查詢陳述句中來訪問,

2)Catalog 型別

  • GenericInMemoryCatalog:GenericInMemoryCatalog 是基于記憶體實作的 Catalog,所有元資料只在 session 的生命周期內可用
  • JdbcCatalog:JdbcCatalog 使得用戶可以將 Flink 通過 JDBC 協議連接到關系資料庫,Postgres CatalogMySQL Catalog 是目前 JDBC Catalog 僅有的兩種實作, 參考 JdbcCatalog 檔案 獲取關于配置 JDBC catalog 的詳細資訊,
  • HiveCatalog:HiveCatalog 有兩個用途:作為原生 Flink 元資料的持久化存盤,以及作為讀寫現有 Hive 元資料的介面, Flink 的 Hive 檔案 提供了有關設定 HiveCatalog 以及訪問現有 Hive 元資料的詳細資訊,

【溫馨提示】Hive Metastore 以小寫形式存盤所有元資料物件名稱,而 GenericInMemoryCatalog 區分大小寫,

  • 用戶自定義 Catalog:Catalog 是可擴展的,用戶可以通過實作 Catalog 介面來開發自定義 Catalog, 想要在 SQL CLI 中使用自定義 Catalog,用戶除了需要實作自定義的 Catalog 之外,還需要為這個 Catalog 實作對應的 CatalogFactory 介面,CatalogFactory 定義了一組屬性,用于 SQL CLI 啟動時配置 Catalog, 這組屬性集將傳遞給發現服務,在該服務中,服務會嘗試將屬性關聯到 CatalogFactory 并初始化相應的 Catalog 實體,

# 登錄安裝flink的機器
$ cd /opt/bigdata/hadoop/server/flink-1.14.3/lib
$ wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.2_2.11/1.14.3/flink-sql-connector-hive-3.1.2_2.11-1.14.3.jar

2、添加Maven 依賴

如果您在構建自己的應用程式,則需要在 mvn 檔案中添加以下依賴項, 您應該在運行時添加以上的這些依賴項,而不要在已生成的 jar 檔案中去包含它們,官方檔案


hive 版本

$ hive --version


Maven依賴配置如下(這里不使用最新版,使用1.14.3):
使用新版,一般也不建議使用最新版,會有如下報錯:

Cannot resolve org.apache.flink:flink-table-api-java-bridge_2.12:1.15-SNAPSHOT

<!-- Flink Dependency -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-hive_2.12</artifactId>
    <version>1.14.3</version>
    <scope>provided</scope>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.12</artifactId>
    <version>1.14.3</version>
    <scope>provided</scope>
</dependency>

<!-- Hive Dependency -->
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>3.1.2</version>
    <scope>provided</scope>
</dependency>

還需要添加如下依賴,要不然會報如下錯誤:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/JobConf

version欄位是hadoop版本,查看hadoop版本(hadoop version)

<!--hadoop start-->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-core</artifactId>
    <version>3.3.1</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>3.3.1</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-common</artifactId>
    <version>3.3.1</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
    <version>3.3.1</version>
</dependency>
<!--hadoop end-->

2、使用 SQL DDL

用戶可以使用 DDL 通過 Table API 或者 SQL Client 在 Catalog 中創建表,

// 創建tableEnv
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

val settings = EnvironmentSettings
    .newInstance()
    .inStreamingMode()
    //.inBatchMode()
    .build()

val tableEnv = TableEnvironment.create(settings)

// Create a HiveCatalog 
val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>");

// Register the catalog
tableEnv.registerCatalog("myhive", catalog);

// Create a catalog database
tableEnv.executeSql("CREATE DATABASE mydb WITH (...)");

// Create a catalog table
tableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)");

tableEnv.listTables(); // should return the tables in current catalog and database.

用戶可以用編程的方式使用Java 或者 Scala 來創建 Catalog 表,

import org.apache.flink.table.api._
import org.apache.flink.table.catalog._
import org.apache.flink.table.catalog.hive.HiveCatalog

val tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode())

// Create a HiveCatalog
val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>")

// Register the catalog
tableEnv.registerCatalog("myhive", catalog)

// Create a catalog database
catalog.createDatabase("mydb", new CatalogDatabaseImpl(...))

// Create a catalog table
val schema = Schema.newBuilder()
    .column("name", DataTypes.STRING())
    .column("age", DataTypes.INT())
    .build()

tableEnv.createTable("myhive.mydb.mytable", TableDescriptor.forConnector("kafka")
    .schema(schema)
    // …
    .build())

val tables = catalog.listTables("mydb") // tables should contain "mytable"

五、SQL 客戶端

官方檔案:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sqlclient/

1)啟動 SQL 客戶端命令列界面

SQL Client 腳本也位于 Flink 的 bin 目錄中,將來,用戶可以通過啟動嵌入式 standalone 行程或通過連接到遠程 SQL 客戶端網關來啟動 SQL 客戶端命令列界面,目前僅支持 embedded,模式默認值embedded,可以通過以下方式啟動 CLI:

$ cd $FLINK_HOME
$ ./bin/sql-client.sh

或者顯式使用 embedded 模式:

$ ./bin/sql-client.sh embedded

幫助檔案

Flink SQL> HELP;

2)執行 SQL 查詢

這里主要講兩種模式standalone模式和yarn模式,部署環境,可以參考我之前的文章:大資料Hadoop之——實時計算流計算引擎Flink(Flink環境部署)

1、standalone模式(默認)

# 先啟動集群
$ cd $FLINK_HOME
$ ./bin/start-cluster.sh
# 啟動客戶端
$ ./bin/sql-client.sh
# SQL查詢
SELECT 'Hello World';

2、yarn-session模式(常駐集群)

【溫馨提示】yarn-session模式其實就是在yarn上生成一個standalone集群

$ cd $FLINK_HOME
$ bin/yarn-session.sh -s 2 -jm 1024 -tm 2048 -nm flink-test -d

### 引數解釋:
# -s 每個TaskManager 的slots 數量
# -jm 1024 表示jobmanager 1024M記憶體 
# -tm 1024表示taskmanager 1024M記憶體 
#-d 任務后臺運行 
### 如果你不希望flink yarn client一直運行,也可以啟動一個后臺運行的yarn session,使用這個引數:-d 或者 --detached,在這種情況下,flink yarn client將會只提交任務到集群然后關閉自己,注意:在這種情況下,無法使用flink停止yarn session,必須使用yarn工具來停止yarn session,
# yarn application -kill $applicationId
#-nm,--name  YARN上為一個自定義的應用設定一個名字

3、啟動sql-client on yarn-session(測驗驗證)

$ cd $FLINK_HOME
# 先把flink集群停掉
$ ./bin/stop-cluster.sh
# 再啟動sql客戶端
$ bin/sql-client.sh embedded -s yarn-session
# SQL查詢
SELECT 'Hello World';

3)CLI 為維護和可視化結果提供三種模式

  • 表格模式(table mode)在記憶體中物體化結果,并將結果用規則的分頁表格可視化展示出來,執行如下命令啟用(默認模式):
SET 'sql-client.execution.result-mode' = 'table';
  • 變更日志模式(changelog mode)不會物體化和可視化結果,而是由插入(+)和撤銷(-)組成的持續查詢產生結果流,執行如下命令啟用:
SET 'sql-client.execution.result-mode' = 'changelog';
  • Tableau模式(tableau mode)更接近傳統的資料庫,會將執行的結果以制表的形式直接打在螢屏之上,具體顯示的內容會取決于作業 執行模式的不同(execution.type):
SET 'sql-client.execution.result-mode' = 'tableau';

你可以用如下查詢來查看三種結果模式的運行情況:

SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;

4)查看幫助

$ ./bin/sql-client.sh --help

SQL CLI已經演示了,這里再演示一下-f接檔案的操作,

$ cat>test.sql<<EOF
show databases;
show tables;
EOF

執行

$ bin/sql-client.sh embedded -s yarn-session -f test.sql

5)flink1.14.3中集成hive3.1.2(HiveCatalog )

HiveCatalog 有兩個用途:作為原生 Flink 元資料的持久化存盤,以及作為讀寫現有 Hive 元資料的介面, Flink 的 Hive 檔案 提供了有關設定 HiveCatalog 以及訪問現有 Hive 元資料的詳細資訊,

$ cd $FLINK_HOME/lib
$ wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.2_2.11/1.14.3/flink-sql-connector-hive-3.1.2_2.11-1.14.3.jar

# maven網站上下載地址
$ wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.12/1.14.3/flink-connector-hive_2.12-1.14.3.jar
$ wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/3.1.2/hive-exec-3.1.2.jar
$ wget https://search.maven.org/remotecontent?filepath=org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jar
$ wget https://repo1.maven.org/maven2/org/antlr/antlr-runtime/3.5.2/antlr-runtime-3.5.2.jar

2、配置hive-site.xml并啟動metastore服務和hiveserver2服務

【溫馨提示】清楚hive metastore服務和hiveserver2服務,可以參考我之前的文章:大資料Hadoop之——資料倉庫Hive

hive-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="https://www.cnblogs.com/liugp/archive/2022/05/05/configuration.xsl"?>
<configuration>

    <!-- 所連接的 MySQL 資料庫的地址,hive_remote2是資料庫,程式會自動創建,自定義就行 -->
    <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:mysql://hadoop-node1:3306/hive_remote2?createDatabaseIfNotExist=true&amp;useSSL=false&amp;serverTimezone=Asia/Shanghai</value>
    </property>

    <!-- MySQL 驅動 -->
    <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>com.mysql.jdbc.Driver</value>
        <description>MySQL JDBC driver class</description>
    </property>

    <!-- mysql連接用戶 -->
    <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>root</value>
        <description>user name for connecting to mysql server</description>
    </property>

    <!-- mysql連接密碼 -->
    <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>123456</value>
        <description>password for connecting to mysql server</description>
    </property>

    <property>
        <name>hive.metastore.uris</name>
        <value>thrift://hadoop-node1:9083</value>
        <description>IP address (or fully-qualified domain name) and port of the metastore host</description>
    </property>

    <!-- host -->
    <property>
        <name>hive.server2.thrift.bind.host</name>
        <value>hadoop-node1</value>
        <description>Bind host on which to run the HiveServer2 Thrift service.</description>
    </property>

    <!-- hs2埠 默認是1000,為了區別,我這里不使用默認埠-->
    <property>
        <name>hive.server2.thrift.port</name>
        <value>11000</value>
    </property>

    <property>
        <name>hive.metastore.schema.verification</name>
        <value>true</value>
    </property>

</configuration>

啟動服務

$ cd $HIVE_HOME
# hive metastore 服務
$ nohup ./bin/hive --service metastore &
# hiveserver2服務
$ nohup ./bin/hiveserver2 > /dev/null 2>&1 &
# 檢查埠
$ ss -atnlp|grep 9083
$ ss -tanlp|grep 11000

3、啟動flink集群(on yarn)

$ cd $FLINK_HOME
$ bin/yarn-session.sh -s 2 -jm 1024 -tm 2048 -nm flink-test -d

在flink1.14+中已經移除sql-client-defaults.yml組態檔了,參考地址:https://issues.apache.org/jira/browse/FLINK-21454

于是我順著這個issue找到了FLIP-163這個鏈接:https://cwiki.apache.org/confluence/display/FLINK/FLIP-163%3A+SQL+Client+Improvements

也就是目前這個sql客戶端還有很多bug,并且使用yaml檔案和本身的命令語法會導致用戶學習成本增加,所以在未來會放棄使用這個配置項,可以通過命令列模式來配置

$ cd $FLINK_HOME
$ bin/sql-client.sh embedded -s yarn-session
# 顯示所有catalog,databases
show catalogs;
show databases;

創建hive catalog

CREATE CATALOG myhive WITH (
    'type' = 'hive',
    'hive-conf-dir' = '/opt/bigdata/hadoop/server/apache-hive-3.1.2-bin/conf/'
);
# 切換到myhive 
use catalog myhive;
# 查看資料庫
show databases;
# 使用 Hive 方言(Flink 目前支持兩種 SQL 方言: default 和 hive)

登錄hive客戶端進行驗證

$ cd $HIVE_HOME
$ ./bin/beeline
!connect jdbc:hive2://hadoop-node1:11000
show databases;

六、表執行環境與表介紹

1)創建表的執行環境(TableEnvironment)

TableEnvironment 是 Table API 和 SQL 的核心概念,它負責:

  • 在內部的 catalog 中注冊 Table
  • 注冊外部的 catalog
  • 加載可插拔模塊
  • 執行 SQL 查詢
  • 注冊自定義函式 (scalar、table 或 aggregation)
  • DataStream 和 Table 之間的轉換(面向 StreamTableEnvironment )

Table 總是與特定的 TableEnvironment 系結, 不能在同一條查詢中使用不同 TableEnvironment 中的表,例如,對它們進行 join 或 union 操作, TableEnvironment 可以通過靜態方法 TableEnvironment.create() 創建,

import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

val settings = EnvironmentSettings
    .newInstance()
    .inStreamingMode()
    //.inBatchMode()
    .build()

val tEnv = TableEnvironment.create(settings)

或者,用戶可以從現有的 StreamExecutionEnvironment 創建一個 StreamTableEnvironment 與 DataStream API 互操作,

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)

2)在 Catalog 中注冊表

1、表(Table)的概念

  • TableEnvironment 可以注冊目錄 Catalog,并可以基于 Catalog 注冊表,它會維護一個
    Catalog-Table 表之間的 map,
  • 表(Table)是由一個“識別符號”來指定的,由 3 部分組成:Catalog 名、資料庫(database)
    名和物件名(表名),如果沒有指定目錄或資料庫,就使用當前的默認值,
  • 表可以是常規的(Table,表),或者虛擬的(View,視圖),常規表(Table)一般可以
    用來描述外部資料,比如檔案、資料庫表或訊息佇列的資料,也可以直接從 DataStream 轉
    換而來,視圖可以從現有的表中創建,通常是 table API 或者 SQL 查詢的一個結果,

2、臨時表(Temporary Table)和永久表(Permanent Table)

  • 表可以是臨時的,并與單個 Flink 會話(session)的生命周期相關,也可以是永久的,并且在多個 Flink 會話和群集(cluster)中可見,

  • 永久表需要 catalog(例如 Hive Metastore)以維護表的元資料,一旦永久表被創建,它將對任何連接到 catalog 的 Flink 會話可見且持續存在,直至被明確洗掉,

  • 另一方面,臨時表通常保存于記憶體中并且僅在創建它們的 Flink 會話持續期間存在,這些表對于其它會話是不可見的,它們不與任何 catalog 或者資料庫系結但可以在一個命名空間(namespace)中創建,即使它們對應的資料庫被洗掉,臨時表也不會被洗掉,

3、屏蔽(Shadowing)

可以使用與已存在的永久表相同的識別符號去注冊臨時表,臨時表會屏蔽永久表,并且只要臨時表存在,永久表就無法訪問,所有使用該識別符號的查詢都將作用于臨時表,

七、Table API

Table API 是批處理和流處理的統一的關系型 API,Table API 的查詢不需要修改代碼就可以采用批輸入或流輸入來運行Table API 是 SQL 語言的超集,并且是針對 Apache Flink 專門設計的,Table API 集成了 Scala,Java 和 Python 語言的 API,Table API 的查詢是使用 Java,Scala 或 Python 語言嵌入的風格定義的,有諸如自動補全和語法校驗的 IDE 支持,而不是像普通 SQL 一樣使用字串型別的值來指定查詢,

官網檔案已經很詳細了,這里就不重復了:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/tableapi/

八、SQL

本頁面描述了 Flink 所支持的 SQL 語言,包括資料定義語言(Data Definition Language,DDL)、資料操縱語言(Data Manipulation Language,DML)以及查詢語言,Flink 對 SQL 的支持基于實作了 SQL 標準的 Apache Calcite,

本頁面列出了目前 Flink SQL 所支持的所有陳述句:

  • SELECT (Queries)
  • CREATE TABLE, DATABASE, VIEW, FUNCTION
  • DROP TABLE, DATABASE, VIEW, FUNCTION
  • ALTER TABLE, DATABASE, FUNCTION
  • INSERT
  • SQL HINTS
  • DESCRIBE
  • EXPLAIN
  • USE
  • SHOW
  • LOAD
  • UNLOAD

九、Table & SQL Connectors

1)概述

Flink的Table API&SQL程式可以連接到其他外部系統,用于讀寫批處理表和流式表,表源提供對存盤在外部系統(如資料庫、鍵值存盤、訊息佇列或檔案系統)中的資料的訪問,表接收器向外部存盤系統發送表,根據源和匯的型別,它們支持不同的格式,如CSV、Avro、Parquet或ORC,

官方檔案:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/connectors/table/overview/

這里主要講一下kafka連接器

2)Kafka安裝(單機)

1、下載安裝包

官方下載地址:http://kafka.apache.org/downloads

$ cd /opt/bigdata/hadoop/software
$ wget https://dlcdn.apache.org/kafka/3.1.0/kafka_2.13-3.1.0.tgz
$ tar -xvf kafka_2.13-3.1.0.tgz -C ../server/

2、配置環境變數

# ~/.bashrc添加如下內容:
export PATH=$PATH:/opt/bigdata/hadoop/server/kafka_2.13-3.1.0/bin

加載生效

$ source ~/.bashrc

3、配置kafka

$ cd /opt/bigdata/hadoop/server/kafka_2.13-3.1.0/config
$ vi server.properties
#添加以下內容:
broker.id=0
listeners=PLAINTEXT://hadoop-node1:9092
zookeeper.connect=hadoop-node1:2181
# 可以配置多個:zookeeper.connect=hadoop-node1:2181,hadoop-node2:2181,hadoop-node3:2181

【溫馨提示】其中0.0.0.0是同時監聽localhost(127.0.0.1)和內網IP(例如hadoop-node2或192.168.100.105),建議改為localhost或c1或192.168.0.113,每臺機的broker.id要設定一個唯一的值,

3、配置ZooKeeper

新版Kafka已內置了ZooKeeper,如果沒有其它大資料組件需要使用ZooKeeper的話,直接用內置的會更方便維護,

$ cd /opt/bigdata/hadoop/server/kafka_2.13-3.1.0/config
$ echo 0 > /tmp/zookeeper/myid
$ vi zookeeper.properties
#注釋掉
#maxClientCnxns=0

#設定連接引數,添加如下配置
#為zk的基本時間單元,毫秒
tickTime=2000
#Leader-Follower初始通信時限 tickTime*10
initLimit=10
#Leader-Follower同步通信時限 tickTime*5
syncLimit=5

#設定broker Id的服務地址
#hadoop-node1對應于前面在hosts里面配置的主機映射,0是broker.id, 2888是資料同步和訊息傳遞埠,3888是選舉埠
server.0=hadoop-node1:2888:3888

4、啟動kafka

【溫馨提示】kafka啟動時先啟動zookeeper,再啟動kafka;關閉時相反,先關閉kafka,再關閉zookeeper

$ cd /opt/bigdata/hadoop/server/kafka_2.13-3.1.0
$ ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
$ ./bin/kafka-server-start.sh -daemon config/server.properties
$ jsp
# 會看到jps、QuorumPeerMain、Kafka

5、驗證

#創建topic
kafka-topics.sh --bootstrap-server hadoop-node1:9092 --create --topic topic1 --partitions 8 --replication-factor 1

#列出所有topic
kafka-topics.sh --bootstrap-server hadoop-node1:9092 --list

#列出所有topic的資訊
kafka-topics.sh --bootstrap-server hadoop-node1:9092 --describe

#列出指定topic的資訊
kafka-topics.sh --bootstrap-server hadoop-node1:9092 --describe --topic topic1

#生產者(訊息發送程式)
kafka-console-producer.sh --broker-list hadoop-node1:9092 --topic topic1

#消費者(訊息接收程式)
kafka-console-consumer.sh --bootstrap-server hadoop-node1:9092 --topic topic1

這里只是搭建一個單機版的只為下面做實驗用,更對關于kafka的內容,可以參考我之前的博文(基于k8s部署):Kafka原理介紹+安裝+基本操作

3)Formats

Flink 提供了一套與表連接器(table connector)一起使用的表格式(table format),表格式是一種存盤格式,定義了如何把二進制資料映射到表的列上,

1、JSON Format

如果是maven,則可以添加如下依賴:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-json</artifactId>
  <version>1.14.3</version>
</dependency>

這里選擇直接下載jar的方式

$ cd $FLIN_HOME/lib/
$ wget https://search.maven.org/remotecontent?filepath=org/apache/flink/flink-json/1.14.3/flink-json-1.14.3.jar

以下是一個利用 Kafka 以及 JSON Format 構建表的例子:

CREATE TABLE user_behavior (
  user_id BIGINT,
  item_id BIGINT,
  category_id BIGINT,
  behavior STRING,
  ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = 'hadoop-node1:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'json',
 'json.fail-on-missing-field' = 'false',
 'json.ignore-parse-errors' = 'true'
)

引數解釋:

json.fail-on-missing-field:當決議欄位缺失時,是跳過當前欄位或行,還是拋出錯誤失敗(默認為 false,即拋出錯誤失敗),
json.ignore-parse-errors:當決議例外時,是跳過當前欄位或行,還是拋出錯誤失敗(默認為 false,即拋出錯誤失敗),如果忽略欄位的決議例外,則會將該欄位值設定為null,

2、CSV Format

$ cd $FLIN_HOME/lib/
$ wget https://search.maven.org/remotecontent?filepath=org/apache/flink/flink-csv/1.14.3/flink-csv-1.14.3.jar

以下是一個使用 Kafka 連接器和 CSV 格式創建表的示例:

CREATE TABLE user_behavior (
  user_id BIGINT,
  item_id BIGINT,
  category_id BIGINT,
  behavior STRING,
  ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = 'hadoop-node1:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'csv',
 'csv.ignore-parse-errors' = 'true',
 'csv.allow-comments' = 'true'
)

引數解釋:

csv.ignore-parse-errors:當決議例外時,是跳過當前欄位或行,還是拋出錯誤失敗(默認為 false,即拋出錯誤失敗),如果忽略欄位的決議例外,則會將該欄位值設定為null,
csv.allow-comments:是否允許忽略注釋行(默認不允許),注釋行以 '#' 作為起始字符, 如果允許注釋行,請確保 csv.ignore-parse-errors 也開啟了從而允許空行,

其它格式也類似

4)Apache Kafka SQL 連接器

$ cd $FLIN_HOME/lib/
$ wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka_2.12/1.14.3/flink-connector-kafka_2.12-1.14.3.jar

2、創建 Kafka 表

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'hadoop-node1:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
)

引數解釋:

scan.startup.mode:Kafka consumer 的啟動模式,有效值為:earliest-offsetlatest-offsetgroup-offsetstimestampspecific-offsets

  1. group-offsets:從 Zookeeper/Kafka 中某個指定的消費組已提交的偏移量開始,
  2. earliest-offset:從可能的最早偏移量開始,
  3. latest-offset:從最末尾偏移量開始,
  4. timestamp:從用戶為每個 partition 指定的時間戳開始,
  5. specific-offsets:從用戶為每個 partition 指定的偏移量開始,

未完待續~

轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/469745.html

標籤:其他

上一篇:開源之夏 2022 重磅來襲,歡迎報名 RadonDB 社區專案

下一篇:Flink 中的事件時間觸發器和處理時間觸發器

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • GPU虛擬機創建時間深度優化

    **?桔妹導讀:**GPU虛擬機實體創建速度慢是公有云面臨的普遍問題,由于通常情況下創建虛擬機屬于低頻操作而未引起業界的重視,實際生產中還是存在對GPU實體創建時間有苛刻要求的業務場景。本文將介紹滴滴云在解決該問題時的思路、方法、并展示最終的優化成果。 從公有云服務商那里購買過虛擬主機的資深用戶,一 ......

    uj5u.com 2020-09-10 06:09:13 more
  • 可編程網卡芯片在滴滴云網路的應用實踐

    **?桔妹導讀:**隨著云規模不斷擴大以及業務層面對延遲、帶寬的要求越來越高,采用DPDK 加速網路報文處理的方式在橫向縱向擴展都出現了局限性。可編程芯片成為業界熱點。本文主要講述了可編程網卡芯片在滴滴云網路中的應用實踐,遇到的問題、帶來的收益以及開源社區貢獻。 #1. 資料中心面臨的問題 隨著滴滴 ......

    uj5u.com 2020-09-10 06:10:21 more
  • 滴滴資料通道服務演進之路

    **?桔妹導讀:**滴滴資料通道引擎承載著全公司的資料同步,為下游實時和離線場景提供了必不可少的源資料。隨著任務量的不斷增加,資料通道的整體架構也隨之發生改變。本文介紹了滴滴資料通道的發展歷程,遇到的問題以及今后的規劃。 #1. 背景 資料,對于任何一家互聯網公司來說都是非常重要的資產,公司的大資料 ......

    uj5u.com 2020-09-10 06:11:05 more
  • 滴滴AI Labs斬獲國際機器翻譯大賽中譯英方向世界第三

    **桔妹導讀:**深耕人工智能領域,致力于探索AI讓出行更美好的滴滴AI Labs再次斬獲國際大獎,這次獲獎的專案是什么呢?一起來看看詳細報道吧! 近日,由國際計算語言學協會ACL(The Association for Computational Linguistics)舉辦的世界最具影響力的機器 ......

    uj5u.com 2020-09-10 06:11:29 more
  • MPP (Massively Parallel Processing)大規模并行處理

    1、什么是mpp? MPP (Massively Parallel Processing),即大規模并行處理,在資料庫非共享集群中,每個節點都有獨立的磁盤存盤系統和記憶體系統,業務資料根據資料庫模型和應用特點劃分到各個節點上,每臺資料節點通過專用網路或者商業通用網路互相連接,彼此協同計算,作為整體提供 ......

    uj5u.com 2020-09-10 06:11:41 more
  • 滴滴資料倉庫指標體系建設實踐

    **桔妹導讀:**指標體系是什么?如何使用OSM模型和AARRR模型搭建指標體系?如何統一流程、規范化、工具化管理指標體系?本文會對建設的方法論結合滴滴資料指標體系建設實踐進行解答分析。 #1. 什么是指標體系 ##1.1 指標體系定義 指標體系是將零散單點的具有相互聯系的指標,系統化的組織起來,通 ......

    uj5u.com 2020-09-10 06:12:52 more
  • 單表千萬行資料庫 LIKE 搜索優化手記

    我們經常在資料庫中使用 LIKE 運算子來完成對資料的模糊搜索,LIKE 運算子用于在 WHERE 子句中搜索列中的指定模式。 如果需要查找客戶表中所有姓氏是“張”的資料,可以使用下面的 SQL 陳述句: SELECT * FROM Customer WHERE Name LIKE '張%' 如果需要 ......

    uj5u.com 2020-09-10 06:13:25 more
  • 滴滴Ceph分布式存盤系統優化之鎖優化

    **桔妹導讀:**Ceph是國際知名的開源分布式存盤系統,在工業界和學術界都有著重要的影響。Ceph的架構和演算法設計發表在國際系統領域頂級會議OSDI、SOSP、SC等上。Ceph社區得到Red Hat、SUSE、Intel等大公司的大力支持。Ceph是國際云計算領域應用最廣泛的開源分布式存盤系統, ......

    uj5u.com 2020-09-10 06:14:51 more
  • es~通過ElasticsearchTemplate進行聚合~嵌套聚合

    之前寫過《es~通過ElasticsearchTemplate進行聚合操作》的文章,這一次主要寫一個嵌套的聚合,例如先對sex集合,再對desc聚合,最后再對age求和,共三層嵌套。 Aggregations的部分特性類似于SQL語言中的group by,avg,sum等函式,Aggregation ......

    uj5u.com 2020-09-10 06:14:59 more
  • 爬蟲日志監控 -- Elastc Stack(ELK)部署

    傻瓜式部署,只需替換IP與用戶 導讀: 現ELK四大組件分別為:Elasticsearch(核心)、logstash(處理)、filebeat(采集)、kibana(可視化) 下載均在https://www.elastic.co/cn/downloads/下tar包,各組件版本最好一致,配合fdm會 ......

    uj5u.com 2020-09-10 06:15:05 more
最新发布
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:33:24 more
  • MySQL中binlog備份腳本分享

    關于MySQL的二進制日志(binlog),我們都知道二進制日志(binlog)非常重要,尤其當你需要point to point災難恢復的時侯,所以我們要對其進行備份。關于二進制日志(binlog)的備份,可以基于flush logs方式先切換binlog,然后拷貝&壓縮到到遠程服務器或本地服務器 ......

    uj5u.com 2023-04-20 08:28:06 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:27:27 more
  • 快取與資料庫雙寫一致性幾種策略分析

    本文將對幾種快取與資料庫保證資料一致性的使用方式進行分析。為保證高并發性能,以下分析場景不考慮執行的原子性及加鎖等強一致性要求的場景,僅追求最終一致性。 ......

    uj5u.com 2023-04-20 08:26:48 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:26:35 more
  • 云時代,MySQL到ClickHouse資料同步產品對比推薦

    ClickHouse 在執行分析查詢時的速度優勢很好的彌補了MySQL的不足,但是對于很多開發者和DBA來說,如何將MySQL穩定、高效、簡單的同步到 ClickHouse 卻很困難。本文對比了 NineData、MaterializeMySQL(ClickHouse自帶)、Bifrost 三款產品... ......

    uj5u.com 2023-04-20 08:26:29 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:25:13 more
  • Redis 報”OutOfDirectMemoryError“(堆外記憶體溢位)

    Redis 報錯“OutOfDirectMemoryError(堆外記憶體溢位) ”問題如下: 一、報錯資訊: 使用 Redis 的業務介面 ,產生 OutOfDirectMemoryError(堆外記憶體溢位),如圖: 格式化后的報錯資訊: { "timestamp": "2023-04-17 22: ......

    uj5u.com 2023-04-20 08:24:54 more
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:24:03 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:23:11 more