日志分段切分條件
日志分段檔案切分包含以下4個條件,滿足其一即可:
- 當前日志分段檔案的大小超過了broker端引數
log.segment.bytes
配置的值,log.segment.bytes
引數的默認值為1073741824
,即1GB - 當前日志分段中訊息的最小時間戳與當前系統的時間戳的差值大于
log.roll.ms
或log.roll.hours
引數配置的值,如果同時配置了log.roll.ms
和log.roll.hours
引數,那么log.roll.ms
的優先級高,默認情況下,只配置了log.roll.hours
引數,其值為168,即7天, - 偏移量索引檔案或時間戳索引檔案的大小達到 broker 端引數
log.index.size.max.bytes
配置的值,log.index.size .max.bytes
的默認值為10485760
,即10MB - 追加的訊息的偏移量與當前日志分段的起始偏移量之間的差值大于
Integer.MAX_VALUE
, 即要追加的訊息的偏移量不能轉變為相對偏移量(offset - baseOffset > Integer.MAX_VALUE),
什么是Controller
Controller作為Kafka集群中的核心組件,它的主要作用是在Apache ZooKeeper的幫助下管理和協調整個Kafka集群,
Controller與Zookeeper進行互動,獲取與更新集群中的元資料資訊,其他broker并不直接與zookeeper進行通信,而是與Controller進行通信并同步Controller中的元資料資訊,
Kafka集群中每個節點都可以充當Controller節點,但集群中同時只能有一個Controller節點,
Controller簡單來說,就是kafka集群的狀態管理者
controller競選機制:簡單說,先來先上!
Broker 在啟動時,會嘗試去 ZooKeeper 中創建 /controller 節點,Kafka 當前選舉控制器的規則是:第一個成功創建 /controller 節點的 Broker 會被指定為控制器,
在Kafka集群中會有一個或者多個broker,其中有一個broker會被選舉為控制器(Kafka Controller),它負責維護整個集群中所有磁區和副本的狀態及磁區leader的選舉,
當某個磁區的leader副本出現故障時,由控制器負責為該磁區選舉新的leader副本,當檢測到某個磁區的ISR集合發生變化時,由控制器負責通知所有broker更新其元資料資訊,當使用kafka-topics.sh腳本為某個topic增加磁區數量時,同樣還是由控制器負責磁區的重新分配,
Kafka中的控制器選舉的作業依賴于Zookeeper,成功競選為控制器的broker會在Zookeeper中創建/controller這個臨時(EPHEMERAL)節點,此臨時節點的內容參考如下:
{"version":1,"brokerid":0,"timestamp":"1529210278988"}
其中version在目前版本中固定為1,brokerid表示成為控制器的broker的id編號,timestamp表示競選成為控制器時的時間戳,
在任意時刻,集群中有且僅有一個控制器,每個broker啟動的時候會去嘗試去讀取zookeeper上的/controller節點的brokerid的值,如果讀取到brokerid的值不為-1,則表示已經有其它broker節點成功競選為控制器,所以當前broker就會放棄競選;如果Zookeeper中不存在/controller這個節點,或者這個節點中的資料例外,那么就會嘗試去創建/controller這個節點,當前broker去創建節點的時候,也有可能其他broker同時去嘗試創建這個節點,只有創建成功的那個broker才會成為控制器,而創建失敗的broker則表示競選失敗,每個broker都會在記憶體中保存當前控制器的brokerid值,這個值可以標識為activeControllerId,
controller的職責
- 監聽partition相關變化
對Zookeeper中的/admin/reassign_partitions節點注冊PartitionReassignmentListener,用來處理磁區重分配的動作,
對Zookeeper中的/isr_change_notification節點注冊IsrChangeNotificetionListener,用來處理ISR集合變更的動作,
對Zookeeper中的/admin/preferred-replica-election節點添加PreferredReplicaElectionListener,用來處理優先副本選舉,
- 監聽topic增減變化
對Zookeeper中的/brokers/topics節點添加TopicChangeListener,用來處理topic增減的變化;
對Zookeeper中的/admin/delete_topics節點添加TopicDeletionListener,用來處理洗掉topic的動作
- 監聽broker相關的變化
對Zookeeper中的/brokers/ids/節點添加BrokerChangeListener,用來處理broker增減的變化
- 更新集群的元資料資訊
從Zookeeper中讀取獲取當前所有與topic、partition以及broker有關的資訊并進行相應的管理,
對各topic所對應的Zookeeper中的/brokers/topics/[topic]節點添加PartitionModificationsListener,用來監聽topic中的磁區分配變化,并將最新資訊同步給其他所有broker,
- 啟動并管理磁區狀態機和副本狀態機,
- 如果引數auto.leader.rebalance.enable設定為true,則還會開啟一個名為“auto-leader-rebalance-task”的定時任務來負責維護磁區的leader副本的均衡,
磁區的負載分布
客戶端請求創建一個topic時,每一個磁區副本在broker上的分配,是由集群controller來決定;
結論:里面會創建出來兩個亂數
第一個亂數確定0號磁區leader的位置,往后1號磁區2號磁區的leader依次往后順延1
第二個亂數確定每個磁區的第一個副本的位置 在leader所在機器上往后順延(亂數+1)臺機器,該臺機器就是第一個副本的位置,剩余副本依次往后順延1
// 舉例:
// broker_id = 0~19 一共20臺機器
// 磁區數20,副本數10
// 第一個亂數:19
// 第二個亂數:0
(0,ArrayBuffer(19, 0, 1, 2, 3, 4, 5, 6, 7, 8))
(1,ArrayBuffer(0, 1, 2, 3, 4, 5, 6, 7, 8, 9))
(2,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
(3,ArrayBuffer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11))
(4,ArrayBuffer(3, 4, 5, 6, 7, 8, 9, 10, 11, 12))
(5,ArrayBuffer(4, 5, 6, 7, 8, 9, 10, 11, 12, 13))
(6,ArrayBuffer(5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
(7,ArrayBuffer(6, 7, 8, 9, 10, 11, 12, 13, 14, 15))
(8,ArrayBuffer(7, 8, 9, 10, 11, 12, 13, 14, 15, 16))
(9,ArrayBuffer(8, 9, 10, 11, 12, 13, 14, 15, 16, 17))
(10,ArrayBuffer(9, 10, 11, 12, 13, 14, 15, 16, 17, 18))
(11,ArrayBuffer(10, 11, 12, 13, 14, 15, 16, 17, 18, 19))
(12,ArrayBuffer(11, 12, 13, 14, 15, 16, 17, 18, 19, 0))
(13,ArrayBuffer(12, 13, 14, 15, 16, 17, 18, 19, 0, 1))
(14,ArrayBuffer(13, 14, 15, 16, 17, 18, 19, 0, 1, 2))
(15,ArrayBuffer(14, 15, 16, 17, 18, 19, 0, 1, 2, 3))
(16,ArrayBuffer(15, 16, 17, 18, 19, 0, 1, 2, 3, 4))
(17,ArrayBuffer(16, 17, 18, 19, 0, 1, 2, 3, 4, 5))
(18,ArrayBuffer(17, 18, 19, 0, 1, 2, 3, 4, 5, 6))
(19,ArrayBuffer(18, 19, 0, 1, 2, 3, 4, 5, 6, 7))
// 其分布策略原始碼如下:
private def assignReplicasToBrokersRackUnaware(
nPartitions: Int, //磁區的個數 10
replicationFactor: Int, //副本的個數 5
brokerList: Seq[Int],//broker的集合 8 0~7
fixedStartIndex: Int//默認值是-1 固定開始的索引位置
startPartitionId: Int): Map[Int, Seq[Int]] //默認值是-1 磁區開始的位置
= {
val ret = mutable.Map[Int, Seq[Int]]()
val brokerArray = brokerList.toArray
val startIndex = if (fixedStartIndex >= 0) {
fixedStartIndex
}else {
rand.nextInt(brokerArray.length)
}
var currentPartitionId = math.max(0, startPartitionId)
var nextReplicaShift = if (fixedStartIndex >= 0) {
fixedStartIndex
}else {
rand.nextInt(brokerArray.length)
}
for (_ <- 0 until nPartitions) {
if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0)){
nextReplicaShift += 1
}
val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
for (j <- 0 until replicationFactor - 1) {
replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
}
ret.put(currentPartitionId, replicaBuffer)
currentPartitionId += 1
}
ret
}
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
(firstReplicaIndex + shift) % nBrokers
}
-
副本因子不能大于 Broker 的個數(報錯:Replication factor: 4 larger than available brokers: 3.);
-
partition_0的第1個副本(leader副本)放置位置是隨機從 brokerList 選擇的;
-
其他磁區的第1個副本(leader)放置位置相對于paritition_0磁區依次往后移(也就是如果我們有5個 Broker,5個磁區,假設partition0磁區放在broker4上,那么partition1將會放在broker5上;patition2將會放在broker1上;partition3在broker2,依次類);
-
各磁區剩余的副本相對于磁區前一個副本偏移亂數nextReplicaShift+1,然后后面的副本依次加1
磁區Leader的選舉機制
磁區 leader 副本的選舉由控制器controller負責具體實施,
當創建磁區(創建主題或增加磁區都有創建磁區的動作)或Leader下線(此時磁區需要選舉一個新的leader上線來對外提供服務)的時候都需要執行 leader 的選舉動作,
選舉策略:按照 ISR集合中副本的順序查找第一個存活的副本,并且這個副本在 ISR 集合中
一個磁區的AR集合在partition分配的時候就被指定,并且只要不發生重分配的情況,集合內部副本的順序是保持不變的,而磁區的 ISR 集合中副本的順序可能會改變;
生產者原理決議
生產者作業流程圖:
一個生產者客戶端由兩個執行緒協調運行,這兩個執行緒分別為主執行緒和 Sender 執行緒 ,
在主執行緒中由kafkaProducer創建訊息,然后通過可能的攔截器、序列化器和磁區器的作用之后快取到訊息累加器(RecordAccumulator, 也稱為訊息收集器)中,
Sender 執行緒負責從RecordAccumulator 獲取訊息并將其發送到 Kafka 中;
RecordAccumulator主要用來快取訊息以便Sender 執行緒可以批量發送,進而減少網路傳輸的資源消耗以提升性能,RecordAccumulator快取的大小可以通過生產者客戶端引數buffer.memory
配置,默認值為33554432B
,即32M,如果生產者發送訊息的速度超過發送到服務器的速度,則會導致生產者空間不足,這個時候 KafkaProducer.send()方法呼叫要么被阻塞,要么拋出例外,這個取決于引數max.block.ms
的配置,此引數的默認值為60000
,即60秒,
主執行緒中發送過來的訊息都會被迫加到 RecordAccumulator 的某個雙端佇列( Deque )中,
RecordAccumulator內部為每個磁區都維護了一個雙端佇列,即Deque<ProducerBatch>,
訊息寫入快取時,追加到雙端佇列的尾部;
Sender讀取訊息時,從雙端佇列的頭部讀取,注意:ProducerBatch 是指一個訊息批次;
與此同時,會將較小的 ProducerBatch 湊成一個較大 ProducerBatch ,也可以減少網路請求的次數以提升整體的吞吐量,
ProducerBatch 大小和batch.size
引數也有著密切的關系,當一條訊息(ProducerRecord ) 流入 RecordAccumulator 時,會先尋找與訊息磁區所對應的雙端佇列(如果沒有則新建),再從這個雙端佇列的尾部獲取一個ProducerBatch (如果沒有則新建),查看 ProducerBatch中是否還可以寫入這個ProducerRecord,如果可以寫入就直接寫入,如果不可以則需要創建一個新的Producer Batch,在新建 ProducerBatch時評估這條訊息的大小是否超過 batch.size
引數大小,如果不超過,那么就以 batch.size
引數的大小來創建 ProducerBatch,
如果生產者客戶端需要向很多磁區發送訊息, 則可以將buffer.memory引數適當調大以增加整體的吞吐量,
Sender從 RecordAccumulator 獲取快取的訊息之后,會進一步將<磁區,Deque<Producer Batch>的形式轉變成<Node,List<ProducerBatch>>的形式,其中Node表示Kafka集群broker節點,對于網路連接來說,生產者客戶端是與具體broker節點建立的連接,也就是向具體的broker節點發送訊息,而并不關心訊息屬于哪一個磁區;而對于KafkaProducer的應用邏輯而言,我們只關注向哪個磁區中發送哪些訊息,所以在這里需要做一個應用邏輯層面到網路I/O層面的轉換,
在轉換成<Node, List<ProducerBatch>>的形式之后, Sender會進一步封裝成<Node,Request> 的形式,這樣就可以將 Request 請求發往各個Node了,這里的Request是Kafka各種協議請求;
請求在從sender執行緒發往Kafka之前還會保存到InFlightRequests中,InFlightRequests保存物件的具體形式為 Map<Nodeld, Deque<request>>,它的主要作用是快取了已經發出去但還沒有收到服務端回應的請求(Nodeld 是一個 String 型別,表示節點的 id 編號),與此同時,InFlightRequests 還提供了許多管理類的方法,并且通過配置引數還可以限制每個連接(也就是客戶端與 Node之間的連接)最多快取的請求數,這個配置引數為 max.in.flight.request.per.connection
,默認值為5,即每個連接最多只能快取5個未回應的請求,超過該數值之后就不能再向這個連接發送更多的請求了,除非有快取的請求收到了回應( Response ),通過比較 Deque<Request> 的size與這個引數的大小來判斷對應的 Node中是否己經堆積了很多未回應的訊息,如果真是如此,那么說明這個 Node 節點負載較大或網路連接有問題,再繼續發送請求會增大請求超時的可能,
Producer往Broker發送訊息應答機制
kafka 在 producer 里面提供了訊息確認機制,我們可以通過配置來決定訊息發送到對應磁區的幾個副本才算訊息發送成功,可以在構造producer 時通過acks引數指定(在 0.8.2.X 前是通過 request.required.acks 引數設定的),這個引數支持以下三種值:
-
acks = 0:意味著如果生產者能夠通過網路把訊息發送出去,那么就認為訊息已成功寫入 kafka ,在這種情況下還是有可能發生錯誤,比如發送的物件不能被序列化或者網卡發生故障,但如果是磁區離線或整個集群長時間不可用,那就不會收到任何錯誤,在 acks=0 模式下的運行速度是非常快的(這就是為什么很多基準測驗都是基于這個模式),你可以得到驚人的吞吐量和帶寬利用率,不過如果選擇了這種模式,大概率會丟失一些訊息,
-
acks = 1:意味著leader 在收到訊息并把它寫入到磁區資料文件(不一定同步到磁盤上)時會回傳確認或錯誤回應,在這個模式下,如果發生正常的 leader 選舉,生產者會在選舉時收到一個 LeaderNotAvailableException 例外,如果生產者能恰當地處理這個錯誤,它會重試發送悄息,最終訊息會安全到達新的 leader 那里,不過在這個模式下仍然有可能丟失資料,比如訊息已經成功寫入 leader,但在訊息被復制到 follower 副本之前 leader發生崩潰,
-
acks = all(這個和 request.required.acks = -1 含義一樣):意味著 leader 在回傳確認或錯誤回應之前,會等待所有同步副本都收到悄息,如果和 min.insync.replicas 引數結合起來,就可以決定在回傳確認前至少有多少個副本能夠收到悄息,生產者會一直重試直到訊息被成功提交,不過這也是最慢的做法,因為生產者在繼續發送其他訊息之前需要等待所有副本都收到當前的訊息,
acks | 含義 |
---|---|
0 | Producer往集群發送資料不需要等到集群的確認資訊,不確保訊息發送成功,安全性最低但是效率最高, |
1 | Producer往集群發送資料只要 leader成功寫入訊息就可以發送下一條,只確保Leader 接收成功, |
-1 或 all | Producer往集群發送資料需要所有的ISR Follower 都完成從 Leader 的同步才會發送下一條,確保 Leader發送成功和所有的副本都成功接收,安全性最高,但是效率最低, |
生產者將acks設定為all,是否就一定不會丟資料呢?
否!如果在某個時刻ISR串列只剩leader自己了,那么就算acks=all,收到這條資料還是只有一個點;
可以配合另外一個引數緩解此情況: 最小同步副本數>=2
BROKER端引數: min.insync.replicas(默認1)
生產者的ack=all,也不能完全保證資料發送的100%可靠性
為什么?因為,如果服務端目標partition的同步副本只有leader自己了,此時,它收到資料就會給生產者反饋成功!
可以修改服務端的一個引數(磁區最小ISR數[min.insync.replicas]>=2),來避免此問題;
其他的生產者引數
- acks
acks是控制kafka服務端向生產者應答訊息寫入成功的條件;生產者根據得到的確認資訊,來判斷訊息發送是否成功;
- max.request.size
這個引數用來限制生產者客戶端能發送的訊息的最大值,默認值為 1048576B ,即 lMB
一般情況下,這個默認值就可以滿足大多數的應用場景了,
這個引數還涉及一些其它引數的聯動,比如 broker 端(topic級別引數)的 message.max.bytes引數(默認1000012),如果配置錯誤可能會引起一些不必要的例外;比如將 broker 端的 message.max.bytes 引數配置為10B ,而 max.request.size引數配置為20B,那么當發送一條大小為 15B 的訊息時,生產者客戶端就會報出例外;
- retries和retry.backoff.ms
retries引數用來配置生產者重試的次數,默認值為2147483647,即在發生例外的時候不進行任何重試動作,
訊息在從生產者發出到成功寫入服務器之前可能發生一些臨時性的例外,比如網路抖動、 leader 副本的選舉等,這種例外往往是可以自行恢復的,生產者可以通過配置 retries大于0的值,以此通過內部重試來恢復而不是一味地將例外拋給生產者的應用程式,如果重試達到設定的次數,那么生產者就會放棄重試并回傳例外,重試還和另一個引數 retry.backoff.ms 有關,這個引數的默認值為100,它用來設定兩次重試之間的時間間隔,避免無效的頻繁重試 ,如果將 retries引數配置為非零值,并且 max .in.flight.requests.per.connection 引數配置為大于1的值,那可能會出現錯序的現象:如果批次1訊息寫入失敗,而批次2訊息寫入成功,那么生產者會重試發送批次1的訊息,此時如果批次1的訊息寫入成功,那么這兩個批次的訊息就出現了錯序,
對于某些應用來說,順序性非常重要 ,比如MySQL binlog的傳輸,如果出現錯誤就會造成非常嚴重的后果;一般而言,在需要保證訊息順序的場合建議把引數max.in.flight.requests.per.connection 配置為1 ,而不是把retries配置為0,不過這樣也會影響整體的吞吐,
- compression.type
這個引數用來指定訊息的壓縮方式,默認值為“none",即默認情況下,訊息不會被壓縮,該引數還可以配置為 "gzip","snappy" 和 "lz4",對訊息進行壓縮可以極大地減少網路傳輸、降低網路I/O,從而提高整體的性能 ,訊息壓縮是一種以時間換空間的優化方式,如果對時延有一定的要求,則不推薦對訊息進行壓縮;
- batch.size
每個Batch要存放batch.size大小的資料后,才可以發送出去,比如說batch.size默認值是16KB,那么里面湊夠16KB的資料才會發送,理論上來說,提升batch.size的大小,可以允許更多的資料緩沖在recordAccumulator里面,那么一次Request發送出去的資料量就更多了,這樣吞吐量可能會有所提升,但是batch.size也不能過大,要是資料老是緩沖在Batch里遲遲不發送出去,那么發送訊息的延遲就會很高,一般可以嘗試把這個引數調節大些,利用生產環境發訊息負載測驗一下,
- linger.ms
這個引數用來指定生產者發送 ProducerBatch 之前等待更多訊息( ProducerRecord )加入
ProducerBatch 時間,默認值為0,生產者客戶端會在ProducerBatch填滿或等待時間超過linger.ms 值時發送出去,
增大這個引數的值會增加訊息的延遲,但是同時能提升一定的吞吐量,
- enable.idempotence
冪等性,就是一個操作重復做,也不會影響最終的結果!
int a = 1;
a++; // 非冪等操作
val map = new HashMap()
map.put(“a”,1); // 冪等操作
在kafka中,同一條訊息,生產者如果多次重試發送,在服務器中的結果如果還是只有一條,這就是具備冪等性;否則,就不具備冪等性!
- partitioner.class
用來指定磁區器,默認:org.apache.kafka.internals.DefaultPartitioner
默認磁區器的磁區規則:
- 如果資料中有key,則按key的murmur hash值 % topic磁區總數得到目標磁區
- 如果資料只有value,則在各個磁區間輪詢(老版本,新版本是new出來的一個亂數)
自定義partitioner需要實作org.apache.kafka.clients.producer.Partitioner介面
消費者組再均衡磁區分配策略
會觸發rebalance(消費者)的事件可能是如下任意一種:
- 有新的消費者加入消費組,
- 有消費者宕機下線,消費者并不一定需要真正下線,例如遇到長時間的 GC 、網路延遲導致消費者長時間未向GroupCoordinator發送心跳等情況時,GroupCoordinator 會認為消費者己下線,
- 有消費者主動退出消費組(發送LeaveGroupRequest 請求):比如客戶端呼叫了unsubscrible()方法取消對某些主題的訂閱,
- 消費組所對應的 GroupCoorinator節點發生了變更,
- 消費組內所訂閱的任一主題或者主題的磁區數量發生變化,
將磁區的消費權從一個消費者移到另一個消費者稱為再均衡(rebalance),如何rebalance也涉及到磁區分配策略,
kafka有兩種的磁區分配策略:range(默認) 和 roundrobin(新版本中又新增了另外2種)
我們可以通過partition.assignment.strategy
引數選擇 range 或 roundrobin,
partition.assignment.strategy
引數默認的值是range,
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor
Range Strategy
- 先將消費者按照client.id字典排序,然后按topic逐個處理;
- 針對一個topic,將其partition總數/消費者數得到商n和 余數m,則每個consumer至少分到n個磁區,且前m個consumer每人多分一個磁區;
例1:
假設有TOPIC_A有5個磁區,由3個consumer(C1,C2,C3)來消費;5/3得到商1,余2,則每個消費者至少分1個磁區,前兩個消費者各多1個磁區C1: 2個磁區,C2:2個磁區,C3:1個磁區
接下來,就按照“區間”進行分配:
TOPIC_A-0 TOPIC_A-1 TOPIC_A-2 TOPIC_A_3 TOPIC_A-4
C1: TOPIC_A-0, TOPIC_A-1
C2 : TOPIC_A-2, TOPIC_A_3
C3: TOPIC_A-4
例2:
假設TOPIC_A有5個磁區,TOPIC_B有3個磁區,由2個consumer(C1,C2)來消費
- 先分配TOPIC_A:
5/2得到商2,余1,則C1有3個磁區,C2有2個磁區,得到結果
C1: TOPIC_A-0 TOPIC_A-1 TOPIC_A-2
C2: TOPIC_A-3 TOPIC_A-4
- 再分配TOPIC_B
3/2得到商1,余1,則C1有2個磁區,C2有1個磁區,得到結果
C1: TOPIC_B-0 TOPIC_B-1
C2: TOPIC_B-2
- 最終分配結果:
C1: TOPIC_A-0 TOPIC_A-1 TOPIC_A-2 TOPIC_B-0 TOPIC_B-1
C2: TOPIC_A-3 TOPIC_A-4 TOPIC_B-2
Round-Robin Strategy
- 將所有主題磁區組成TopicAndPartition串列,并對TopicAndPartition串列按照其hashCode 排序
- 然后,以輪詢的方式分配給各消費者
以上述“例2”來舉例:
- 先對TopicPartition的hashCode排序,假如排序結果如下:
TOPIC_A-0 TOPIC_B-0 TOPIC_A-1 TOPIC_A-2 TOPIC_B-1 TOPIC_A-3 TOPIC_A-4 TOPIC_B-2
- 然后按輪詢方式分配
C1: TOPIC_A-0 TOPIC_A-1 TOPIC_B-1
C2: TOPIC_B-0 TOPIC_A-2 TOPIC_A-3
C3 TOPIC_A-4
Sticky Strategy
對應的類叫做: org.apache.kafka.clients.consumer.StickyAssignor
sticky策略的特點:
- 要去達成最大化的均衡
- 盡可能保留各消費者原來分配的磁區
再均衡的程序中,還是會讓各消費者先取消自身的磁區,然后再重新分配(只不過是分配程序中會盡量讓原來屬于誰的磁區依然分配給誰)
Cooperative Sticky Strategy
對應的類叫做: org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
sticky策略的特點:
- 邏輯與sticky策略一致
- 支持cooperative再均衡機制(再均衡的程序中,不會讓所有消費者取消掉所有磁區然后再進行重分配)
費者組再均衡流程
消費組在消費資料的時候,有兩個角色進行組內的各事務的協調;
角色1: Group Coordinator (組協調器) 位于服務端(就是某個broker)
組協調器的定位:
coordinator在我們組記偏移量的__consumer_offsets磁區的leader所在broker上
查找Group Coordinator的方式:
先根據消費組groupid的hashcode值計算它應該所在__consumer_offsets 中的磁區編號; 磁區數
Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
groupMetadataTopicPartitionCount為__consumer_offsets的磁區總數,這個可以通過broker端引數offset.topic.num.partitions來配置,默認值是50;
找到對應的磁區號后,再尋找此磁區leader副本所在broker節點,則此節點即為自己的Grouping Coordinator;
角色2: Group Leader (組長) 位于消費端(就是消費組中的某個消費者)
組長的定位:隨機選的哦!!!
GroupCoordinator介紹
每個消費組在服務端對應一個GroupCoordinator其進行管理,GroupCoordinator是Kafka服務端中用于管理消費組的組件,
消費者客戶端中由ConsumerCoordinator組件負責與GroupCoordinator行互動;
ConsumerCoordinator和GroupCoordinator最重要的職責就是負責執行消費者rebalance操作
再均衡監聽器
如果想控制消費者在發生再均衡時執行一些特定的作業,可以通過訂閱主題時注冊“再均衡監聽器”來實作;
場景舉例:在發生再均衡時,處理消費位移
如果A消費者消費掉的一批訊息還沒來得及提交offset,而它所負責的磁區在rebalance中轉移給了B消費者,則有可能發生資料的重復消費處理,此情形下,可以通過再均衡監聽器做一定程度的補救;
代碼示例:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;
import java.util.Properties;
/**
* 消費組再均衡觀察
*/
public class ConsumerDemo2 {
public static void main(String[] args) {
//1.創建kafka的消費者物件,附帶著把組態檔搞定
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092,linux02:9092,linux03:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"g01");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//2.訂閱主題(確定需要消費哪一個或者多個主題)
//我現在想看看如果我的消費者組里面,多了一個消費者或者少了一個消費者,他有沒有給我做再均衡
consumer.subscribe(Arrays.asList("reb-1", "reb-2"), new ConsumerRebalanceListener() {
/**
* 這個方法是將原來的分配情況全部取消,或者說把所有的磁區全部回收了
* 這個全部取消很惡心,原來的消費者消費的好好的,他一下子就給他全部停掉了
* @param collection
*/
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
System.out.println("我原來的均衡情況是:"+collection + "我已經被回收了!!");
}
/**
* 這個方法是當上面的分配情況全部取消以后,呼叫這個方法,來再次分配,這是在均衡分配后的情況
* @param collection
*/
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
System.out.println("我是重新分配后的結果:"+collection);
}
});
while (true){
consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
}
}
}
kafka系統的CAP保證
CAP理論作為分布式系統的基礎理論,它描述的是一個分布式系統在以下三個特性中:
- 一致性(Consistency)
- 可用性(Availability)
- 磁區容錯性(Partition tolerance)
最多滿足其中的兩個特性,也就是下圖所描述的,分布式系統要么滿足CA,要么CP,要么AP,無法同時滿足CAP,
磁區容錯性:指的分布式系統中的某個節點或者網路磁區出現了故障的時候,整個系統仍然能對外提供滿足一致性和可用性的服務,也就是說部分故障不影響整體使用,事實上我們在設計分布式系統時都會考慮到bug,硬體,網路等各種原因造成的故障,所以即使部分節點或者網路出現故障,我們要求整個系統還是要繼續使用的(不繼續使用,相當于只有一個磁區,那么也就沒有后續的一致性和可用性了)
可用性:一直可以正常的做讀寫操作,簡單而言就是客戶端一直可以正常訪問并得到系統的正常回應,用戶角度來看就是不會出現系統操作失敗或者訪問超時等問題,
一致性:在分布式系統完成某寫操作后任何讀操作,都應該獲取到該寫操作寫入的那個最新的值,相當于要求分布式系統中的各節點時時刻刻保持資料的一致性,
Kafka 作為一個商業級訊息中間件,資料可靠性和可用性是優先考慮的重點,兼顧資料一致性;
參考檔案:https://www.cnblogs.com/lilpig/p/16840963.html
冪等性
冪等性要點
Kafka 0.11.0.0 版本開始引入了冪等性與事務這兩個特性,以此來實作 EOS ( exactly once semantics ,精確一次處理語意)
生產者在進行發送失敗后的重試時(retries),有可能會重復寫入訊息,而使用 Kafka冪等性功能之后就可以避免這種情況,
開啟冪等性功能,只需要顯式地將生產者引數 enable.idempotence
設定為 true (默認值為 false):
props.put("enable.idempotence",true);
在開啟冪等性功能時,如下幾個引數必須正確配置:
- retries > 0
- max.in.flight.requests.per.connection<=5
- acks = -1
如有違反,則會拋出ConfigException例外
;
kafka冪等性實作機制
- 每一個producer在初始化時會生成一個producer_id,并為每個目標磁區維護一個“訊息序列號”;
- producer每發送一條訊息,會將<producer_id,磁區>對應的“序列號”加1
- broker端會為每一對{producer_id,磁區}維護一個序列號,對于每收到的一條訊息,會判斷服務端的SN_OLD和接收到的訊息中的SN_NEW進行對比:
- 如果SN_OLD + 1 == SN_NEW,正常;
- 如果SN_NEW<SN_OLD+1,說明是重復寫入的資料,直接丟棄
- 如果SN_NEW>SN_OLD+1,說明中間有資料尚未寫入,或者是發生了亂序,或者是資料丟失,將拋出嚴重例外:OutOfOrderSequenceException
producer.send(“aaa”)
訊息aaa就擁有了一個唯一的序列號, 如果這條訊息發送失敗,producer內部自動重試(retry),此時序列號不變;
producer.send(“bbb”)
訊息bbb擁有一個新的序列號
注意:kafka只保證producer單個會話中的單個磁區冪等;
kafka事務(偽事務)
事務要點知識
- Kafka的事務控制原理
主要原理:
開始事務-->發送一個ControlBatch訊息(事務開始)
提交事務-->發送一個ControlBatch訊息(事務提交)
放棄事務-->發送一個ControlBatch訊息(事務終止)
- 開啟事務的必須配置引數(不支持資料得回滾,但是我能做到,一榮俱榮,一損俱損)
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"doit01:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// acks
props.setProperty(ProducerConfig.ACKS_CONFIG,"-1");
// 生產者的重試次數
props.setProperty(ProducerConfig.RETRIES_CONFIG,"3");
// 飛行中的請求快取最大數量
props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"3");
// 開啟冪等性
props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");
// 設定事務id
props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"trans_001");
事務控制的代碼模板
// 初始化事務
producer.initTransaction( )
// 開啟事務
producer.beginTransaction( )
// 干活
// 提交事務
producer.commitTransaction( )
// 例外回滾(放棄事務) catch里面
producer.abortTransaction( )
消費者api是會拉取到尚未提交事務的資料的;只不過可以選擇是否讓用戶看到!
是否讓用戶看到未提交事務的資料,可以通過消費者引數來配置:
isolation.level=read_uncommitted(默認值)
isolation.level=read_committed
- kafka還有一個“高級”事務控制,只針對一種場景:
用戶的程式,要從kafka讀取源資料,資料處理的結果又要寫入kafka
kafka能實作端到端的事務控制(比起上面的“基礎”事務,多了一個功能,通過producer可以將consumer的消費偏移量系結到事務上提交)
producer.sendOffsetsToTransaction(offsets,consumer_id)
事務api示例
為了實作事務,應用程式必須提供唯一transactional.id,并且開啟生產者的冪等性
properties.put ("transactional.id","transactionid00001");
properties.put ("enable.idempotence",true);
“消費kafka-處理-生產結果到kafka”典型場景下的代碼結構示例:
package com.doit.day04;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class Exercise_kafka2kafka {
public static void main(String[] args) {
Properties props = new Properties();
//消費者的
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "shouwei");
//自動提交偏移量
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
//寫生產者的一些屬性
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//設定ack 開啟冪等性必須設定的三個引數
props.setProperty(ProducerConfig.ACKS_CONFIG,"-1");
props.setProperty(ProducerConfig.RETRIES_CONFIG,"3");
props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"3");
//開啟冪等性
props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");
//開啟事務
props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"id_fro_39_19");
//消費資料
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
//初始化事務
producer.initTransactions();
//訂閱主題
consumer.subscribe(Arrays.asList("eventlog"));
while (true){
//拉取資料
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
try {
//開啟事務
producer.beginTransaction();
for (ConsumerRecord<String, String> record : poll) {
String value = https://www.cnblogs.com/paopaoT/archive/2023/06/09/record.value();
//將value的值寫入到另外一個topic中
producer.send(new ProducerRecord("k2k",value));
}
producer.flush();
//提交偏移量
consumer.commitAsync();
//提交事務
producer.commitTransaction();
} catch (ProducerFencedException e) {
//放棄事務
producer.abortTransaction();
}
}
}
}
Kafka速度快的原因
- 訊息順序追加(磁盤順序讀寫比記憶體的隨機讀寫還快)
- 頁快取等技術(資料交給作業系統的頁快取,并不真正刷入磁盤;而是定期刷入磁盤)
使用Zero-Copy (零拷貝)技術來進一步提升性能;
零拷貝是指將資料直接從磁盤檔案復制到網卡設備中,而不需要經由應用程式之手;
零拷貝大大提高了應用程式的性能,減少了內核和用戶模式之間的背景關系切換;對于Linux系統而言,零拷貝技術依賴于底層的 sendfile( )方法實作;對應于Java 語言,FileChannal.transferTo( )方法的底層實作就是 sendfile( )方法;
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/554839.html
標籤:其他
上一篇:華為云新一代分布式資料庫GaussDB,給世界一個更優選擇
下一篇:返回列表