實時同步是 ChunJun 的?個重要特性,指在資料同步程序中,資料源與?標系統之間的資料傳輸和更新?乎在同?時間進?,
在實時同步場景中我們更加關注源端,當源系統中的資料發?變化時,這些變化會?即傳輸并應?到?標系統,以保證兩個系統中的資料保持?致,這個特性需要作業運?程序中 source 插件不間斷地頻繁訪問源端,在?產場景下,對于這類?時間運?、資源可預估、需要穩定性的作業,我們推薦使? perjob 模式部署,
插件?持 JSON 腳本和 SQL 腳本兩種配置?式,具體的引數配置請參考「ChunJun連接器檔案」:https://sourl.cn/vxq6Zp
本文將為大家介紹如何使用 ChunJun 實時同步,以及 ChunJun ?持的 RDB 實時采集插件的特性、采集邏輯及其原理,幫助大家更好地理解 ChunJun 與實時同步,
如何使用 ChunJun 實時同步
為了讓?家能更深?了解如何使? ChunJun 做實時同步,我們假設有這樣?個場景:?個電商?站希望將其訂單資料從 MySQL 資料庫實時同步到 HBase 資料庫,以便于后續的資料分析和處理,
在這個場景中,我們將使? Kafka 作為中間訊息佇列,以實作 MySQL 和 HBase 之間的資料同步,這樣做的好處是 MySQL 表中變更可以實時同步到 HBase 結果表中,?不?擔?歷史資料被修改后 HBase 表未被同步,
如果在?家的實際應用場景中,不關?歷史資料是否變更(或者歷史資料根本不會變更),且業務表有?個遞增的主鍵,那么可以參考本?之后的 JDBC-Polling 模式?節的內容,
· 資料源組件的部署以及 ChunJun 的部署這?不做詳細描述
· 案例中的腳本均以 SQL 腳本為例,JSON 腳本也能實作相同功能,但在引數名上可能存在出?,使? JSON 的同學可以參考上文 「ChunJun 連接器」?檔中的引數介紹
采集 MySQL 資料到 Kafka
● 資料準備
?先,我們在 Kafka 中創建?個名為 order_dml 的 topic,然后在 MySQL 中創建?個訂單表,并插??些測驗資料,創建表的 SQL 陳述句如下:
-- 創建?個名為ecommerce_db的資料庫,?于存盤電商?站的資料
CREATE DATABASE IF NOT EXISTS ecommerce_db;
USE ecommerce_db;
-- 創建?個名為orders的表,?于存盤訂單資訊
CREATE TABLE IF NOT EXISTS orders (
id INT AUTO_INCREMENT PRIMARY KEY, -- ?增主鍵
order_id VARCHAR(50) NOT NULL, -- 訂單編號,不能為空
user_id INT NOT NULL, -- ?戶ID,不能為空
product_id INT NOT NULL, -- 產品ID,不能為空
quantity INT NOT NULL, -- 訂購數量,不能為空
order_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP -- 訂單?期,默認值為當前時間
戳,不能為空
);
-- 插??些測驗資料到orders表
INSERT INTO orders (order_id, user_id, product_id, quantity)
VALUES ('ORD123', 1, 101, 2),
('ORD124', 2, 102, 1),
('ORD125', 3, 103, 3),
('ORD126', 1, 104, 1),
('ORD127', 2, 105, 5);
● 使用 Binlog 插件采集資料到 Kafka
為了表示資料的變化型別和更好地處理資料變化,實時采集插件一般會用 RowData(Flink 內部資料結構)中的 RowKind 記錄?志中的資料事件(insert、delete 等)型別,binlog 插件也?樣,而當資料被打到 Kafka 中時,RowKind 資訊應該怎么處理呢?
這?我們就需要?到 upsert-kafka-x,upsert-kafka-x 會識別 RowKind,對各類時間的處理邏輯如下:
? insert 資料:序列化后直接打?
? delete 資料:只寫 key,value 置為 null
? update 資料:分為?條 delete 資料和 insert 資料處理,即先根據主鍵洗掉原本的資料,再寫? update 后的資料
在下?步中我們再解釋如何將 Kafka 中的資料還原到 HBase 或者其他?持 upsert 語意的資料庫中,接下來我們來撰寫 SQL 腳本,實作 MySQL 資料實時采集到 Kafka 中的功能,示例如下:
CREATE TABLE binlog_source (
id int,
order_id STRING,
user_id INT,
product_id int,
quantity int,
order_date TIMESTAMP(3)
) WITH (
'connector' = 'binlog-x',
'username' = 'root',
'password' = 'root',
'cat' = 'insert,delete,update',
'url' = 'jdbc:mysql://localhost:3306/ecommerce_db?useSSL=false',
'host' = 'localhost',
'port' = '3306',
'table' = 'ecommerce_db.orders',
'timestamp-format.standard' = 'SQL',
'scan.parallelism' = '1'
);
CREATE TABLE kafka_sink (
id int,
order_id STRING,
user_id INT,
product_id int,
quantity int,
order_date TIMESTAMP(3),PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka-x',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'json',
'value.format' = 'json',
'value.fields-include' = 'ALL',
'sink.parallelism' = '1'
);
insert into
kafka_sink
select
*
from
binlog_source u;
還原 Kafka 中的資料到 HBase
上述步驟中,我們通過 binlog-x 和 upsert-kafka-x,將 MySQL 中的資料實時采集到了 Kafka 中,解鈴還須系鈴?,我們可以通過 upsert-kafka-x 再去將 Kafka 中的資料決議成帶有 upsert 語意的資料,
upsert-kafka-x 作為 source 插件時,會判斷 Kafka 中資料的 value 是否為 null,如果 value 為 null 則標記這條資料的 RowKind 為 DELETE,否則將資料的 ROWKIND 標記為 INSERT,
ChunJun的 hbase-x 插件?前已經具備了 upsert 陳述句的能?,使? hbase-x 即可將 Kafka 中的資料還原到 hbase中,接下來是 SQL 腳本示例,為了?便在 HBase 中查看資料結果,我們將 int 資料 cast 為 string 型別:
CREATE TABLE kafka_source (
id int,
order_id STRING,
user_id INT,
product_id INT,
quantity INT,
order_date TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka-x',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'test_group',
'key.format' = 'json',
'value.format' = 'json',
'scan.parallelism' = '1'
);
CREATE TABLE hbase_sink(
rowkey STRING, order_info ROW < order_id STRING,
user_id STRING,
product_id STRING,
quantity STRING,
order_date STRING >,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH(
-- 這?以hbase14為例,如果hbase版本是2.x,我們可以使?hbase2-x插件代替
'connector' = 'hbase14-x',
'zookeeper.quorum' = 'localhost:2181',
'zookeeper.znode.parent' = '/hbase',
'table-name' = 'ecommerce_db:orders',
'sink.parallelism' = '1'
);
INSERT INTO
hbase_sink
SELECT
cast(id as STRING),
ROW(
cast(order_id as STRING),
cast(user_id as STRING),
cast(product_id as STRING),
cast(quantity as STRING),
cast(order_date as STRING)
)
FROM
kafka_source
Tips:如果我們不需要 Kafka 中間件,也可以使? binlog-x 插件直接對接 hbase-x 插件,
ChunJun 支持的 RDB 實時采集插件
本節主要介紹 ChunJun 的 RDB 實時采集插件的特性、采集邏輯及其原理,
ChunJun 的 RDB 實時采集可以實時監視資料庫中的更改,并在發?更改時讀取資料變化,例如插?、更新和洗掉操作,使? ChunJun 實時采集,我們可以實時獲取有關資料庫中更改的資訊,從?能夠及時回應這些更改,如此便可以幫助我們更好地管理和利? RDB 資料庫中的資料,
并且 ChunJun 提供了故障恢復和斷點續傳功能來確保資料的完整性,ChunJun 實時采集類插件的?致實作步驟如下:
· 連接資料庫,確認讀取點位,讀取點位可以理解為?個 offset,如 Binlog 中,指?志的?件名和?件的 position 資訊
· 根據讀取點位開始讀取 redolog,獲取其中關于資料變更相關的操作記錄
· 根據 tableName、操作事件(如insert、delete、update)等過濾資訊過濾出需要的 log ?志
· 決議 log ?志,決議后的事件資訊包括表名、資料庫名、操作型別(插?、更新或洗掉)和變更的資料?等
· 將決議出來的資料會加?為 ChunJun 內部統?的 DdlRowData 供下游使?
ChunJun ?前已?持的實時采集 Connector 有:binlog(mysql)、oceanbasecdc、oraclelogminer、sqlservercdc,
Binlog 簡介
ChunJun binlog 插件的主要功能是讀取 MySQL 的?進制?志(binlog)?件,這些?件記錄了所有對資料的更改操作,如插?、更新和洗掉等,?前,該插件依賴 Canal 組件來讀取 MySQL 的 binlog ?件,
核?操作步驟如下:
? 確認讀取點位:在 binlog 插件中,我們可以在腳本的 start 欄位中直接指定 journal-name(binlog ?件名)和 position(?件的特定位置)
? 讀取binlog:binlog 插件將?身偽裝成 MySQL 的 Slave 節點,向 MySQL Master 發送請求,要求將 binlog ?件的資料流發送給它
? 故障恢復和斷點續傳:故障時,插件會記錄當前的 binlog 位置資訊,從 checkpoint/savepoint 恢復后,我們可以從上次記錄的位置繼續讀取 binlog ?件,確保資料變化的完整性
使? binlog 所需的權限在「binlog插件使??檔」中有詳細說明,鏈接如下:
https://sourl.cn/mvae9m
OracleLogminer 簡介
Logminer 插件借助 Oracle 提供的 Logminer ?具通過讀取視圖的?式獲取 Oracle redolog 中的資訊,
核?操作步驟如下:
01 定位需讀取起始點位(start_scn)
?前 logminer ?持四種策略指定 StartScn:
· all:從 Oracle 資料庫中最早的歸檔?志組開始采集(不建議使?)
· current:任務運?時的 SCN 號
· time:指定時間點對應的 SCN 號
· scn:直接指定 SCN 號
02 定位需要讀取的結束點位(end_scn)
插件根據 start_scn 和 maxLogFileSize(默認5G)獲取可加載的 redolog ?件串列,end_scn 取這個?件串列中最?的 scn 值,
03 加載 redo ?志到 Logminer
通過?個存盤程序,將 scn 區間范圍內的 redolog 加載到 Logminer ?,
04 從視圖中讀取資料
以 scn > ? 作為 where 條件直接查詢 v$logmnr_contents 視圖內的資訊即可獲取 redolog 中的資料,
05 重復1-4步驟,實作不斷的讀取
如標題,
06 故障恢復和斷點續傳
在發?故障時,插件會保存當前消費的 scn 號,重啟時從上次的 scn 號開始讀取,確保資料完整性,
? 關于該插件原理的詳細介紹請參?「Oracle Logminer 實作原理說明?檔」:
https://sourl.cn/6vqz4b
? 使?lominer插件的前提條件詳?「Oracle配置LogMiner」:
https://sourl.cn/eteyZY
SqlServerCDC 簡介
SqlServerCDC 插件依賴 SQL Server 的 CDC Agent 服務提供的視圖獲取 redolog 中的資訊,
核?操作步驟如下:
01 定位需讀取起始點位(from_lsn)
?前 SqlserverCDC 僅?持直接配置 lsn 號,如果 lsn 號未配置,則取資料庫中當前最?的 lsn 號為 from_lsn,
02 定位需要讀取的結束點位(to_lsn)
SqlserverCDC 插件定期地(可通過 pollInterval 引數指定)獲取資料庫中的最? lsn 為 end_lsn,
03 從視圖中讀取資料
查詢 Agent 服務提供的視圖中 lsn 區間范圍內的資料,過濾出需要監聽的表及事件型別,
04 重復1-3步驟,實作不斷的讀取
如標題,
05 故障恢復和斷點續傳
在發?故障時,插件會保存當前消費的 lsn 號,重啟時從上次的 lsn 號開始讀取,確保資料完整性,
? 關于該插件原理的詳細介紹請參?「Sqlserver CDC 實作原理說明?檔」:
https://sourl.cn/5pQvEM
? 配置 SqlServer CDC Agent 服務詳?「Sqlserver 配置 CDC ?檔」:
https://sourl.cn/h5nd8j
OceanBaseCDC 簡介
OceanBase 是螞蟻集團開源的?款分布式關系型資料庫,它使??進制?志(binlog)記錄資料變更,OceanBaseCDC 的實作依賴于 OceanBase 提供的 LogProxy 服務,LogProxy 提供了基于發布-訂閱模型的服務,允許使? OceanBase 的 logclient 訂閱特定的 binlog 資料流,
OceanBaseCDC 啟動?個 Listener 執行緒,當 logclient 連接到 LogProxy 后,Listener 會訂閱經過資料過濾的 binlog,然后將其添加到內部維護的串列中,當收到 COMMIT 資訊后,Listener 會將?志變更資訊傳遞給?個阻塞佇列,由主執行緒消費并將其轉換為 ChunJun 內部的 DdlRowData,最終發送到下游,
JDBC-Polling 模式讀
JDBC 插件的 polling 讀取模式是基于 SQL 陳述句做資料讀取的,相對于基于重做?志的實時采集成本更低,但 jdbc 插件做實時同步對業務場景有更?的要求:
· 有?個數值型別或者時間型別的遞增主鍵
· 不更新歷史資料或者不關?歷史資料是否更新,僅關?新資料的獲取
實作原理簡介
? 設定遞增的業務主鍵作為 polling 模式依賴的增量鍵
? 在增量讀取的程序中,實時記錄 increColumn 對應的值(state),作為下?次資料讀取的起始點位
? 當?批資料讀取完后,間隔?段時間之后依據 state 讀取下?批資料
polling 依賴部分增量同步的邏輯,關于增量同步的更多介紹可以點擊:
https://sourl.cn/UC8n6K
如何配置?個 jdbc-polling 作業
先介紹?下開啟 polling 模式需要關注的配置項:
以 MySQL 為例,假設我們有?個存盤訂單資訊的歷史表,且訂單的 order_id 是遞增的,我們希望定期地獲取這張表的新增資料,
CREATE TABLE order.realtime_order_archive (
order_id INT PRIMARY KEY COMMENT '訂單唯?標識',
customer_id INT COMMENT '客戶唯?標識',
product_id INT COMMENT '產品唯?標識',
order_date TIMESTAMP COMMENT '訂單?期和時間',
payment_method VARCHAR(255) COMMENT '?付?式(信?卡、?付寶、微信?付等)',
shipping_method VARCHAR(255) COMMENT '配送?式(順豐速運、圓通速遞等)',
shipping_address VARCHAR(255) COMMENT '配送地址',
order_total DECIMAL(10,2) COMMENT '訂單總?額',
discount DECIMAL(10,2) COMMENT '折扣?額',
order_status VARCHAR(255) COMMENT '訂單狀態(已完成、已取消等)'
);
我們可以這樣配置 json 腳本的 reader 資訊,
"name": "mysqlreader",
"parameter": {
"column" : [
"*" //這?假設我們讀取所有欄位,可以填寫‘*’
],
"increColumn": "id",
"polling": true,
"pollingInterval": 3000,
"username": "username",
"password": "password",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://ip:3306/liuliu?useSSL=false"
],
"schema":"order",
"table": [
"realtime_order_archive" ]
}
]
}
}
《數堆疊產品白皮書》:https://fs80.cn/cw0iw1
《資料治理行業實踐白皮書》下載地址:https://fs80.cn/380a4b
想了解或咨詢更多有關袋鼠云大資料產品、行業解決方案、客戶案例的朋友,瀏覽袋鼠云官網:https://www.dtstack.com/?src=https://www.cnblogs.com/DTinsight/p/szbky
同時,歡迎對大資料開源專案有興趣的同學加入「袋鼠云開源框架釘釘技術qun」,交流最新開源技術資訊,qun號碼:30537511,專案地址:https://github.com/DTStack
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/551129.html
標籤:大數據
上一篇:輕松拿下PostgreSQL,這30個實用SQL陳述句你細品
下一篇:返回列表