kafka原始碼主內容決議
一、原始碼環境準備
原始碼下載地址
安裝 JDK&Scala:
需要在 Windows本地安裝 JDK 8或者 JDK8 以上
版本,
需要在 Windows本地安裝 Scala2.12
,
加載原始碼:
將 kafka-3.0.0-src.tgz
原始碼包,解壓到非中文
目錄,
例如:D:\01_software\kafka-3.0.0-src,
打開 IDEA
,點擊 File->Open
…->原始碼包解壓的位置
,
安裝 gradle:
Gradle是類似于 maven 的代碼管理工具,安卓程式管理通常采用 Gradle,
IDEA 自動幫你下載安裝,下載的時間比較長(網路慢,需要 1 天時間,有 VPN 需要幾分鐘),
二、生產者原始碼
1、初始化
生產者sender執行緒初始化:
程式入口:
從用戶自己撰寫的 main 方法開始閱讀
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomProducer {
public static void main(String[] args) {
// 0 配置
Properties properties = new Properties();
// 連接集群 bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102
:9092,hadoop103:9092");
// 指定對應的 key 和 value 的序列化型別 key.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,Strin
gSerializer.class.getName());
// 1 創建 kafka 生產者物件
// "" hello
KafkaProducer<String, String> kafkaProducer = new
KafkaProducer<>(properties);
// 2 發送資料
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new
ProducerRecord<>("first","atguigu"+i));
}
// 3 關閉資源
kafkaProducer.close();
}
}
生產者 main 執行緒初始化:
點擊 main()方法中的 KafkaProducer()
,
KafkaProducer.java:
跳轉到 KafkaProducer構造方法:
生產者 sender 執行緒初始化:
發送資料到緩沖區:
發送總體流程:
磁區選擇:
發送訊息 大小 校驗:
記憶體池:
sender 執行緒 發送 資料:
三、消費者原始碼
消費者組初始化流程:
消費者組詳細消費流程:
消費者初始化:
程式入口:
消費者初始化:
消費者訂閱主題:
消費者拉取 和處理 資料:
消費 總體流程:
消費者/ 消費者組初始化:
拉取資料:
發送請求并抓取資料:
把資料按照磁區封裝好后,一次處理 最大條數認 默認 500 :
攔截器處理資料:
消費者 Offset 提交:
手動同步提交 Offset:
手動異步提交 Offset:
四、服務器原始碼
Kafka Broker總體作業流程:
程式入口:
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/438659.html
標籤:其他
上一篇:一小時學會用Opencv做貪吃蛇游戲(Python版)
下一篇:Likely root cause: java.nio.file.NoSuchFileException: /usr/local/es/plugins/ik/plugin-descriptor(踩坑)