主頁 > 資料庫 > 基于袋鼠云實時開發平臺開發 FlinkSQL 任務的實踐探索

基于袋鼠云實時開發平臺開發 FlinkSQL 任務的實踐探索

2023-07-06 09:11:54 資料庫

隨著業務的發展,實時場景在各個?業中變得越來越重要,?論是?融、電商還是物流,實時資料處理都成為了其中的關鍵環節,Flink 憑借其強?的流處理特性、窗?操作以及對各種資料源的?持,成為實時場景下的?選開發?具,

FlinkSQL 通過 SQL 語??向資料開發提供了更友好的互動?式,但是其開發?式和離線開發 SparkSQL 仍然存在較?的差異,袋鼠云實時開發平臺StreamWorks,?直致?于降低 FlinkSQL 的開發門檻,讓更多的資料開發掌握實時開發能?,普及實時計算的應?,

本文將為大家簡單介紹在袋鼠云實時開發平臺開發 FlinkSQL 任務的四種?式,

腳本模式

該模式是最基礎的開發?式,資料開發人員在平臺 IDE 中通過 FlinkSQL 代碼,完成 Flink 表定義和業務邏輯加?,代碼如下:

-- 定義資料源表
CREATE TABLE server_logs (
client_ip STRING,
client_identity STRING,
userid STRING,
user_agent STRING,
log_time TIMESTAMP(3),
request_line STRING,
status_code STRING,
size INT
) WITH (
'connector ' = 'faker ',
'fields .client_ip .expression ' = '#{Internet .publicIpV4Address} ',
'fields .client_identity .expression ' =  '- ',
'fields .userid .expression ' =  '- ',
'fields .user_agent .expression ' = '#{Internet .userAgentAny} ',
'fields .log_time .expression ' =  '#{date .past ' '15 ' ', ' '5 ' ', ' 'SECONDS ' '} ',
'fields .request_line .expression ' = '#{regexify ' '(GET |POST |PUT |PATCH){1} ' '} #{regexify ' '(/search\ .html|/login\ .html|/prod\ .html|c
'fields .status_code .expression ' = '#{regexify ' '(200 |201 |204 |400 |401 |403 |301){1} ' '} ',
'fields .size .expression ' = '#{number .numberBetween ' '100 ' ', ' '10000000 ' '} '
);

-- 定義結果表,  實際應用中會選擇  Kafka、JDBC 等作為結果表
CREATE TABLE client_errors (
log_time TIMESTAMP(3),
request_line STRING,
status_code STRING,
size INT
) WITH (
'connector ' = 'stream-x '
);

-- 寫入資料到結果表
INSERT INTO client_errors
SELECT
log_time,
request_line,
status_code,
size
FROM server_logs
WHERE status_code SIMILAR TO '4[0-9][0-9] ';

腳本模式的優缺點

優點:靈活性?,

缺點:Flink表定義邏輯復雜,如果不熟悉資料源插件,很難記住需要維護哪些引數;如果該任務涉及多張表,代碼塊中存在?段表定義代碼,不?便排查業務邏輯,

向導模式

基于腳本模式存在的缺點,袋鼠云實時開發平臺將 Flink 表定義邏輯抽象成了可視化配置的功能,引導資料開發?員通過??配置化的?式完成 Flink 表定義,讓資料開發更專注在業務邏輯的加?,

file

向導模式是在開發??的配置項中根據??引導,完成 Flink 表的源表、維表、結果表的映射,然后在 IDE 中直接引?,讀寫對應的 Flink 表,完成邏輯開發,

· 平臺默認提供各類資料源的源表、維表、結果表常?配置項;

· 對于各種?級引數,平臺也提供了維護?定義引數的 key/value ?式來滿?靈活性要求,

Catalog 模式

在向導模式中,我們可以借助配置化的?式快速完成表映射,但同時也存在?個問題,這些映射表只能在當前任務中被引?,?法在不同的任務中復?,

但是在真實的實時數倉建設程序中,我們常會遇到下?這種場景:某?個 dws 層級的 kafka topic,會在多個 ads 任務中被作為源表使?,?在每個 ads 任務開發程序中,都需要為同?個 dws topic 做?次相同的 Flink 映射,

為了解決這種重復映射的開發?作,我們可以借助 Flink Catalog 功能,將映射表的元資料資訊進?持久化存盤,這樣就可以在不同的任務中重復引?,具體使??法如下(以平臺的 DT Catalog 為例):

Catalog ?錄維護

· 先在 DT Catalog 下創建?個名為 stream_warehouse 的 catalog

· 然后在該 catalog 下根據數倉層級或者業務域創建不同的 database

file

· ?式?:在?錄中 hover database,根據引導通過配置化?式完成 Flink 表映射

file

· ?式?:在 IDE 中,通過 Create DDL 完成創建,注意要指定對應的 catalog.database 路徑

CREATE TABLE stream_warehouse .dws .orders (
order_uid  BIGINT,
product_id BIGINT,
price      DECIMAL(32, 2),
order_time TIMESTAMP(3)
) WITH (
'connector ' = 'datagen '
);

FlinkSQL 任務開發

完成上面兩個步驟,?張元資料持久化存盤的 Flink 映射表就創建好了,我們在開發任務的時候,就可以直接通過 catalog.database.table 的?式,引?我們需要的表,

INSERT INTO stream_warehouse .ads_db .client_errors
SELECT
log_time,
request_line,
status_code,
size
FROM stream_warehouse .dws_db .server_logs

Demo 模式

學會了上?三種開發?式后,如果你還對 FlinkSQL 的開發邏輯?較陌?,那么建議你可以通過袋鼠云實時開發平臺的代碼模版中?去完成?個完整的任務開發,

在模版中?,我們提供了??余種常?的業務場景及其對應的 FlinkSQL 代碼邏輯,如各類窗?的寫法、各類 Join 的寫法等等,你可以根據真實的業務場景去套?模版,快速地完成任務開發,

file
file

總結

每種開發模式沒有絕對的好壞之分,需要根據不同企業的實時計算場景和階段,采?不同的開發模式,才能真正達到降本增效的目的,

· 當企業剛接觸實時計算,資料開發?員對 FlinkSQL 熟悉度較低時,DEMO 模式是最好的選擇;

· 當企業已經上?實時計算,但是任務量還不?時,腳本模式或者向導模式是不錯的選擇;

· 當企業實時計算達到?定規模,需要進?類似離線數倉的管理?式時,Catalog 模式是最優的選擇,

《數堆疊產品白皮書》:https://www.dtstack.com/resources/1004?src=https://www.cnblogs.com/DTinsight/p/szsm

《資料治理行業實踐白皮書》下載地址:https://www.dtstack.com/resources/1001?src=https://www.cnblogs.com/DTinsight/p/szsm

想了解或咨詢更多有關袋鼠云大資料產品、行業解決方案、客戶案例的朋友,瀏覽袋鼠云官網:https://www.dtstack.com/?src=https://www.cnblogs.com/DTinsight/p/szbky

同時,歡迎對大資料開源專案有興趣的同學加入「袋鼠云開源框架釘釘技術qun」,交流最新開源技術資訊,qun號碼:30537511,專案地址:https://github.com/DTStack

轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/556651.html

標籤:大數據

上一篇:ORA-20000: Unable to set values for index xxx: does not exist or insufficient privileges

下一篇:返回列表

標籤雲
其他(162108) Python(38266) JavaScript(25524) Java(18290) C(15238) 區塊鏈(8275) C#(7972) AI(7469) 爪哇(7425) MySQL(7285) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5876) 数组(5741) R(5409) Linux(5347) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4611) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2438) ASP.NET(2404) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) HtmlCss(1989) .NET技术(1985) 功能(1967) Web開發(1951) C++(1942) python-3.x(1918) 弹簧靴(1913) xml(1889) PostgreSQL(1882) .NETCore(1863) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • GPU虛擬機創建時間深度優化

    **?桔妹導讀:**GPU虛擬機實體創建速度慢是公有云面臨的普遍問題,由于通常情況下創建虛擬機屬于低頻操作而未引起業界的重視,實際生產中還是存在對GPU實體創建時間有苛刻要求的業務場景。本文將介紹滴滴云在解決該問題時的思路、方法、并展示最終的優化成果。 從公有云服務商那里購買過虛擬主機的資深用戶,一 ......

    uj5u.com 2020-09-10 06:09:13 more
  • 可編程網卡芯片在滴滴云網路的應用實踐

    **?桔妹導讀:**隨著云規模不斷擴大以及業務層面對延遲、帶寬的要求越來越高,采用DPDK 加速網路報文處理的方式在橫向縱向擴展都出現了局限性。可編程芯片成為業界熱點。本文主要講述了可編程網卡芯片在滴滴云網路中的應用實踐,遇到的問題、帶來的收益以及開源社區貢獻。 #1. 資料中心面臨的問題 隨著滴滴 ......

    uj5u.com 2020-09-10 06:10:21 more
  • 滴滴資料通道服務演進之路

    **?桔妹導讀:**滴滴資料通道引擎承載著全公司的資料同步,為下游實時和離線場景提供了必不可少的源資料。隨著任務量的不斷增加,資料通道的整體架構也隨之發生改變。本文介紹了滴滴資料通道的發展歷程,遇到的問題以及今后的規劃。 #1. 背景 資料,對于任何一家互聯網公司來說都是非常重要的資產,公司的大資料 ......

    uj5u.com 2020-09-10 06:11:05 more
  • 滴滴AI Labs斬獲國際機器翻譯大賽中譯英方向世界第三

    **桔妹導讀:**深耕人工智能領域,致力于探索AI讓出行更美好的滴滴AI Labs再次斬獲國際大獎,這次獲獎的專案是什么呢?一起來看看詳細報道吧! 近日,由國際計算語言學協會ACL(The Association for Computational Linguistics)舉辦的世界最具影響力的機器 ......

    uj5u.com 2020-09-10 06:11:29 more
  • MPP (Massively Parallel Processing)大規模并行處理

    1、什么是mpp? MPP (Massively Parallel Processing),即大規模并行處理,在資料庫非共享集群中,每個節點都有獨立的磁盤存盤系統和記憶體系統,業務資料根據資料庫模型和應用特點劃分到各個節點上,每臺資料節點通過專用網路或者商業通用網路互相連接,彼此協同計算,作為整體提供 ......

    uj5u.com 2020-09-10 06:11:41 more
  • 滴滴資料倉庫指標體系建設實踐

    **桔妹導讀:**指標體系是什么?如何使用OSM模型和AARRR模型搭建指標體系?如何統一流程、規范化、工具化管理指標體系?本文會對建設的方法論結合滴滴資料指標體系建設實踐進行解答分析。 #1. 什么是指標體系 ##1.1 指標體系定義 指標體系是將零散單點的具有相互聯系的指標,系統化的組織起來,通 ......

    uj5u.com 2020-09-10 06:12:52 more
  • 單表千萬行資料庫 LIKE 搜索優化手記

    我們經常在資料庫中使用 LIKE 運算子來完成對資料的模糊搜索,LIKE 運算子用于在 WHERE 子句中搜索列中的指定模式。 如果需要查找客戶表中所有姓氏是“張”的資料,可以使用下面的 SQL 陳述句: SELECT * FROM Customer WHERE Name LIKE '張%' 如果需要 ......

    uj5u.com 2020-09-10 06:13:25 more
  • 滴滴Ceph分布式存盤系統優化之鎖優化

    **桔妹導讀:**Ceph是國際知名的開源分布式存盤系統,在工業界和學術界都有著重要的影響。Ceph的架構和演算法設計發表在國際系統領域頂級會議OSDI、SOSP、SC等上。Ceph社區得到Red Hat、SUSE、Intel等大公司的大力支持。Ceph是國際云計算領域應用最廣泛的開源分布式存盤系統, ......

    uj5u.com 2020-09-10 06:14:51 more
  • es~通過ElasticsearchTemplate進行聚合~嵌套聚合

    之前寫過《es~通過ElasticsearchTemplate進行聚合操作》的文章,這一次主要寫一個嵌套的聚合,例如先對sex集合,再對desc聚合,最后再對age求和,共三層嵌套。 Aggregations的部分特性類似于SQL語言中的group by,avg,sum等函式,Aggregation ......

    uj5u.com 2020-09-10 06:14:59 more
  • 爬蟲日志監控 -- Elastc Stack(ELK)部署

    傻瓜式部署,只需替換IP與用戶 導讀: 現ELK四大組件分別為:Elasticsearch(核心)、logstash(處理)、filebeat(采集)、kibana(可視化) 下載均在https://www.elastic.co/cn/downloads/下tar包,各組件版本最好一致,配合fdm會 ......

    uj5u.com 2020-09-10 06:15:05 more
最新发布
  • 基于袋鼠云實時開發平臺開發 FlinkSQL 任務的實踐探索

    隨著業務的發展,[實時場景](https://www.dtstack.com/dtinsight/streamworks?src=https://www.cnblogs.com/DTinsight/p/szsm)在各個?業中變得越來越重要。?論是?融、電商還是物流,實時資料處理都成為了其中的關鍵環節。Flink 憑借其強?的[流處理特性](https://www.dts ......

    uj5u.com 2023-07-06 09:11:54 more
  • ORA-20000: Unable to set values for index xxx: does not exis

    使用expdp/impdp匯出匯入資料時,遇到ORA-2000錯誤,如下所示: Processing object type SCHEMA_EXPORT/TABLE/GRANT/OWNER_GRANT/OBJECT_GRANTProcessing object type SCHEMA_EXPORT/ ......

    uj5u.com 2023-07-05 09:04:50 more
  • “遠程客戶端操作hdfs創建檔案夾”,驗證環境是否配置成功,以及HDFS

    文章中包含我所遇到的錯誤,進行了HDFS錯誤整改,以及后面有操作創建“遠程客戶端操作hdfs創建檔案夾”,驗證環境是否配置成功的程序。 ......

    uj5u.com 2023-07-05 09:04:09 more
  • 數倉性能調優:大寬表關聯MERGE性能優化

    摘要:本文主要為大家講解在數倉性能調優程序中,關于大寬表關聯MERGE性能優化程序。 本文分享自華為云社區《GaussDB(DWS)性能調優:大寬表關聯MERGE性能優化》,作者:譡里個檔。 【業務背景】 如下MERGE陳述句執行耗時長達2034s MERGE INTO sdifin.hah_ae_l ......

    uj5u.com 2023-07-05 09:03:43 more
  • 數倉性能調優:大寬表關聯MERGE性能優化

    摘要:本文主要為大家講解在數倉性能調優程序中,關于大寬表關聯MERGE性能優化程序。 本文分享自華為云社區《GaussDB(DWS)性能調優:大寬表關聯MERGE性能優化》,作者:譡里個檔。 【業務背景】 如下MERGE陳述句執行耗時長達2034s MERGE INTO sdifin.hah_ae_l ......

    uj5u.com 2023-07-05 09:02:32 more
  • ORA-20000: Unable to set values for index xxx: does not exis

    使用expdp/impdp匯出匯入資料時,遇到ORA-2000錯誤,如下所示: Processing object type SCHEMA_EXPORT/TABLE/GRANT/OWNER_GRANT/OBJECT_GRANTProcessing object type SCHEMA_EXPORT/ ......

    uj5u.com 2023-07-05 09:02:08 more
  • “遠程客戶端操作hdfs創建檔案夾”,驗證環境是否配置成功,以及HDFS

    文章中包含我所遇到的錯誤,進行了HDFS錯誤整改,以及后面有操作創建“遠程客戶端操作hdfs創建檔案夾”,驗證環境是否配置成功的程序。 ......

    uj5u.com 2023-07-05 08:55:55 more
  • sql server 資料庫自動備份

    一丶打開客戶端: 物件資源管理器->管理->維護計劃(右鍵點擊)->維護計劃向導 二丶打開后點擊下一步, 填寫名稱與說明并更改備份計劃 三丶點下一步, 選擇維護任務 四丶點擊下一步, 選擇需要備份的資料庫, 和備份檔案路徑 五丶點擊下一步, 選擇報告檔案保存路徑 六丶點擊下一步, 查看維護計劃, 沒 ......

    uj5u.com 2023-07-04 09:24:50 more
  • Mysql基礎篇(四)之事務

    # 一. 事務簡介 **事務是一組操作的集合,它是一個不可分隔的作業單位,事務會把所有的操作作為一個整體一起向系統提交或撤銷操作請求,即這些操作要么同時成功,要么同時失敗。** **就比如:張三給李四轉賬1000塊錢,張三銀行賬戶的錢減少了1000,而李四銀行賬戶的錢要增加1000。這一組操作就必須 ......

    uj5u.com 2023-07-04 09:24:11 more
  • 完全兼容DynamoDB協議!GaussDB(for Cassandra)為NoSQL注入新活力

    摘要:DynamoDB是一款托管式的NoSQL資料庫服務,支持多種資料模型,廣泛應用于電商、社交媒體、游戲、IoT等場景。 本文分享自華為云社區《完全兼容DynamoDB協議!GaussDB(for Cassandra)為NoSQL注入新活力》,作者:GaussDB 資料庫 。 DynamoDB是一 ......

    uj5u.com 2023-07-04 09:23:41 more