尊重原創著作權: https://www.gewuweb.com/hot/18198.html
百度十年架構師帶你深入剖析RocketMQ原始碼-NameServer
尊重原創著作權:https://www.gewuweb.com/sitemap.html
一、RocketMQ架構簡介
1.1 邏輯部署圖
(圖片來自網路)
1.2 核心組件說明
通過上圖可以看到,RocketMQ的核心組件主要包括4個,分別是NameServer、Broker、Producer和Consumer,下面我們先依次簡單說明下這四個核心組件:
** NameServer ** :NameServer充當路由資訊的提供者,生產者或消費者能夠通過NameServer查找各Topic相應的Broker
IP串列,多個Namesrver實體組成集群,但相互獨立,沒有資訊交換,
** Broker **
:訊息中轉角色,負責存盤訊息、轉發訊息,Broker服務器在RocketMQ系統中負責接收從生產者發送來的訊息并存盤、同時為消費者的拉取請求作準備,Broker服務器也存盤訊息相關的元資料,包括消費者組、消費進度偏移和主題和佇列訊息等,
** Producer **
:負責生產訊息,一般由業務系統負責生產訊息,一個訊息生產者會把業務應用系統里產生的訊息發送到Broker服務器,RocketMQ提供多種發送方式,同步發送、異步發送、順序發送、單向發送,同步和異步方式均需要Broker回傳確認資訊,單向發送不需要,
** Consumer **
:負責消費訊息,一般是后臺系統負責異步消費,一個訊息消費者會從Broker服務器拉取訊息、并將其提供給應用程式,從用戶應用的角度而言提供了兩種消費形式:拉取式消費、推動式消費,
除了上面說的三個核心組件外,還有Topic這個概念下面也會多次提到:
** Topic **
:表示一類訊息的集合,每個Topic包含若干條訊息,每條訊息只能屬于一個Topic,是RocketMQ進行訊息訂閱的基本單位,一個Topic可以分片在多個Broker集群上,每一個Topic分片包含多個queue,具體結構可以參考下圖:
1.3 設計理念
RocketMQ是基于主題的發布與訂閱模式,核心功能包括訊息發送、訊息存盤、訊息消費,整體設計追求簡單與性能第一,歸納來說主要是下面三種:
- NameServer取代ZK充當注冊中心,NameServer集群間互不通信,容忍路由資訊在集群內分鐘級不一致,更加輕量級;
- 使用記憶體映射機制實作高效的IO存盤,達到高吞吐量;
- 容忍設計缺陷,通過ACK確保訊息至少消費一次,但是如果ACK丟失,可能訊息重復消費,這種情況設計上允許,交給使用者自己保證,
本文重點介紹的就是NameServer,我們下面一起來看下NameServer是如何啟動以及如何進行路由管理的,
二、NameServer架構設計
在第一章已經簡單介紹了NameServer取代zk作為一種更輕量級的注冊中心充當路由資訊的提供者,那么具體是如何來實作路由資訊管理的呢?我們先看下圖:
上面的圖描述了NameServer進行路由注冊、路由剔除和路由發現的核心原理,
** 路由注冊 **
:Broker服務器在啟動的時候會想NameServer集群中所有的NameServer發送心跳信號進行注冊,并會每隔30秒向nameserver發送心跳,告訴NameServer自己活著,NameServer接收到Broker發送的心跳包之后,會記錄該broker資訊,并保存最近一次收到心跳包的時間,
** 路由剔除 **
:NameServer和每個Broker保持長連接,每隔30秒接收Broker發送的心跳包,同時自身每個10秒掃描BrokerLiveTable,比較上次收到心跳時間和當前時間比較是否大于120秒,如果超過,那么認為Broker不可用,剔除路由表中該Broker相關資訊,
** 路由發現 **
:路由發現不是實時的,路由變化后,NameServer不主動推給客戶端,等待producer定期拉取最新路由資訊,這樣的設計方式降低了NameServer實作的復雜性,當路由發生變化時通過在訊息發送端的容錯機制來保證訊息發送的高可用(這塊內容會在后續介紹producer訊息發送時介紹,本文不展開講解),
** 高可用 **
:NameServer通過部署多臺NameServer服務器來保證自身的高可用,同時多個NameServer服務器之間不進行通信,這樣路由資訊發生變化時,各個NameServer服務器之間資料可能不是完全相同的,但是通過發送端的容錯機制保證訊息發送的高可用,這個也正是NameServer追求簡單高效的目的所在,
三、 啟動流程
在整理了解了NameServer的架構設計之后,我們先來看下NameServer到底是如何啟動的呢?
既然是原始碼解讀,那么我們先來看下代碼入口:org.apache.rocketmq.namesrv.NamesrvStartup#main(String[]
args),實際呼叫的是main0()方法,
代碼如下:
public static NamesrvController main0(String[] args) {
try {
//創建namesrvController
NamesrvController controller = createNamesrvController(args);
//初始化并啟動NamesrvController
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
?
return null;
}
通過main方法啟動NameServer,主要分為兩大步,先創建NamesrvController,然后再初始化并啟動NamesrvController,我們分別展開來分析,
3.1 時序圖
具體展開閱讀代碼之前,我們先通過一個序列圖對整體流程有個了解,如下圖:
3.2 創建NamesrvController
先來看核心代碼,如下:
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
// 設定版本號為當前版本號
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();
//構造org.apache.commons.cli.Options,并添加-h -n引數,-h引數是列印幫助資訊,-n引數是指定namesrvAddr
Options options = ServerUtil.buildCommandlineOptions(new Options());
//初始化commandLine,并在options中添加-c -p引數,-c指定nameserver的組態檔路徑,-p標識列印配置資訊
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
//nameserver配置類,業務引數
final NamesrvConfig namesrvConfig = new NamesrvConfig();
//netty服務器配置類,網路引數
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
//設定nameserver的埠號
nettyServerConfig.setListenPort(9876);
//命令帶有-c引數,說明指定組態檔,需要根據組態檔路徑讀取組態檔內容,并將檔案中配置資訊賦值給NamesrvConfig和NettyServerConfig
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
//反射的方式
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
//設定組態檔路徑
namesrvConfig.setConfigStorePath(file);
?
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
//命令列帶有-p,說明是列印引數的命令,那么就列印出NamesrvConfig和NettyServerConfig的屬性,在啟動NameServer時可以先使用./mqnameserver -c configFile -p列印當前加載的配置屬性
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
//列印引數命令不需要啟動nameserver服務,只需要列印引數即可
System.exit(0);
}
//決議命令列引數,并加載到namesrvConfig中
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
//檢查ROCKETMQ_HOME,不能為空
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
//初始化logback日志工廠,rocketmq默認使用logback作為日志輸出
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
?
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
?
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
//創建NamesrvController
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
?
//將全域Properties的內容復制到NamesrvController.Configuration.allConfigs中
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
?
return controller;
}
通過上面對每一行代碼的注釋,可以看出來,創建NamesrvController的程序主要分為兩步:
Step1:通過命令列中獲取配置,賦值給NamesrvConfig和NettyServerConfig類,
Step2:根據配置類NamesrvConfig和NettyServerConfig構造一個NamesrvController實體,
可見NamesrvConfig和NettyServerConfig是想當重要的,這兩個類分別是NameServer的業務引數和網路引數,我們分別看下這兩個類里面有哪些屬性:
** NamesrvConfig **
** NettyServerConfig **
注:Apache Commons CLI是開源的命令列決議工具,它可以幫助開發者快速構建啟動命令,并且幫助你組織命令的引數、以及輸出串列等,
3.3 初始化并啟動
創建了NamesrvController實體之后,開始初始化并啟動NameServer,
首先進行初始化,代碼入口是NamesrvController#initialize,
public boolean initialize() {
//加載kvConfigPath下kvConfig.json組態檔里的KV配置,然后將這些配置放到KVConfigManager#configTable屬性中
this.kvConfigManager.load();
//根據nettyServerConfig初始化一個netty服務器,
//brokerHousekeepingService是在NamesrvController實體化時建構式里實體化的,該類負責Broker連接事件的處理,實作了ChannelEventListener,主要用來管理RouteInfoManager的brokerLiveTable
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
//初始化負責處理Netty網路互動資料的執行緒池,默認執行緒數是8個
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
//注冊Netty服務端業務處理邏輯,如果開啟了clusterTest,那么注冊的請求處理類是ClusterTestRequestProcessor,否則請求處理類是DefaultRequestProcessor
this.registerProcessor();
//注冊心跳機制執行緒池,延遲5秒啟動,每隔10秒遍歷RouteInfoManager#brokerLiveTable這個屬性,用來掃描不存活的broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
?
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
//注冊列印KV配置執行緒池,延遲1分鐘啟動、每10分鐘列印出kvConfig配置
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
?
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
//rocketmq可以通過開啟TLS來提高資料傳輸的安全性,如果開啟了,那么需要注冊一個監聽器來重新加載SslContext
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}
private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
}
});
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}
?
return true;
}
上面的代碼是NameServer初始化流程,通過每行代碼的注釋,可以看出來,主要有5步驟操作:
- Step1:加載KV配置,并寫入到KVConfigManager的configTable屬性中;
- Step2:初始化netty服務器;
- Step3:初始化處理netty網路互動資料的執行緒池;
- Step4:注冊心跳機制執行緒池,啟動5秒后每隔10秒檢測一次Broker的存活情況;
- Step5:注冊列印KV配置的執行緒池,啟動1分鐘后,每隔10分鐘列印一次KV配置,
RocketMQ的開發團隊還使用了一個常用的編程技巧,就是使用JVM鉤子函式對NameServer進行優雅停機,這樣在JVM行程關閉前,會先執行shutdown操作,
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
執行start函式,啟動NameServer,代碼比較簡單,就是將第一步中創建的netty
server進行啟動,其中remotingServer.start()方法不展開詳細說明了,需要對netty比較熟悉,不是本篇文章重點,有興趣的同學可以自行下載原始碼閱讀,
public void start() throws Exception {
//啟動netty服務
this.remotingServer.start();
//如果開啟了TLS
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}
四、路由管理
我們在第二章開篇有了解到NameServer作為一個輕量級的注冊中心,主要是為訊息生產者和消費者提供Topic的路由資訊,并對這些路由資訊和Broker節點進行管理,主要包括路由注冊、路由剔除和路由發現,
本章將會通過原始碼的角度來具體分析NameServer是如果進行路由資訊管理的,核心代碼主要都在org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager中實作,
4.1 路由元資訊
在了解路由資訊管理之前,我們首先需要了解NameServer到底存盤了哪些路由元資訊,資料結構分別是什么樣的,
查看代碼我們可以看到主要通過5個屬性來維護路由元資訊,如下:
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
我們依次對這5個屬性進行展開說明,
4.1.1 TopicQueueTable
說明:Topic訊息佇列路由資訊,訊息發送時根據路由表進行負載均衡,
資料結構:HashMap結構,key是Topic名字,value是一個型別是QueueData的佇列集合,在第一章就講過,一個Topic中有多個佇列,QueueData的資料結構如下:
** 資料結構示例: **
topicQueueTable:{
"topic1": [
{
"brokerName": "broker-a",
"readQueueNums":4,
"writeQueueNums":4,
"perm":6,
"topicSynFlag":0,
},
{
"brokerName": "broker-b",
"readQueueNums":4,
"writeQueueNums":4,
"perm":6,
"topicSynFlag":0,
}
]
}
4.1.2 BrokerAddrTable
說明:Broker基礎資訊,包含BrokerName、所屬集群名稱、主備Broker地址,
** 資料結構 **
:HashMap結構,key是BrokerName,value是一個型別是BrokerData的物件,BrokerData的資料結構如下(可以結合下面Broker主從結構邏輯圖來理解):
Broker主從結構邏輯圖:
資料結構示例:
brokerAddrTable:{
"broker-a": {
"cluster": "c1",
"brokerName": "broker-a",
"brokerAddrs": {
0: "192.168.1.1:10000",
1: "192.168.1.2:10000"
}
},
"broker-b": {
"cluster": "c1",
"brokerName": "broker-b",
"brokerAddrs": {
0: "192.168.1.3:10000",
1: "192.168.1.4:10000"
}
}
}
4.1.3 ClusterAddrTable
說明:Broker集群資訊,存盤集群中所有Broker名稱,
資料結構:HashMap結構,key是ClusterName,value是存盤BrokerName的Set結構,
資料結構示例:
clusterAddrTable:{
"c1": ["broker-a","broker-b"]
}
4.1.4 BrokerLiveTable
說明:Broker狀態資訊,NameServer每次收到心跳包時會替換該資訊
** 資料結構 **
:HashMap結構,key是Broker的地址,value是BrokerLiveInfo結構的該Broker資訊物件,BrokerLiveInfo的資料結構如下:
資料結構示例:
brokerLiveTable:{
"192.168.1.1:10000": {
"lastUpdateTimestamp": 1518270318980,
"dataVersion":versionObj1,
"channel":channelObj,
"haServerAddr":""
},
"192.168.1.2:10000": {
"lastUpdateTimestamp": 1518270318980,
"dataVersion":versionObj1,
"channel":channelObj,
"haServerAddr":"192.168.1.1:10000"
},
"192.168.1.3:10000": {
"lastUpdateTimestamp": 1518270318980,
"dataVersion":versionObj1,
"channel":channelObj,
"haServerAddr":""
},
"192.168.1.4:10000": {
"lastUpdateTimestamp": 1518270318980,
"dataVersion":versionObj1,
"channel":channelObj,
"haServerAddr":"192.168.1.3:10000"
}
}
4.1.5 filterServerTable
說明:Broker上的FilterServer串列,訊息過濾服務器串列,后續介紹Consumer時會介紹,consumer拉取資料是通過filterServer拉取,consumer向Broker注冊,
** 資料結構 ** :HashMap結構,key是Broker地址,value是記錄了filterServer地址的List集合,
4.2 路由注冊
路由注冊是通過Broker和NameServer之間的心跳功能來實作的,主要分為兩步:
** Step1: **
Broker啟動時向集群中所有NameServer發送心跳陳述句,每隔30秒(默認30s,時間間隔在10秒到60秒之間)再發一次,
** Step2: **
NameServer收到心跳包更新topicQueueTable,brokerAddrTable,brokerLiveTable,clusterAddrTable,filterServerTable,
我們分別展開分析這兩步,
4.2.1 Broker發送心跳包
發送心跳包的核心邏輯是在Broker啟動邏輯里,代碼入口是org.apache.rocketmq.broker.BrokerController#start,本篇文章重點關注的是發送心跳包的邏輯實作,只列出發送心跳包的核心代碼,如下:
1)創建了一個執行緒池注冊Broker,程式啟動10秒后執行,每隔30秒(默認30s,時間間隔在10秒到60秒之間,BrokerConfig.getRegisterNameServerPeriod()的默認值是30秒)執行一次,
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
?
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
2)封裝Topic配置和版本號之后,進行實際的路由注冊(注:封裝Topic配置不是本篇重點,會在介紹Broker原始碼時重點講解),實際路由注冊是在org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll中實作,核心代碼如下:
public List<RegisterBrokerResult> registerBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
final int timeoutMills,
final boolean compressed) {
?
final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
//獲取nameserver地址串列
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
/**
*封裝請求包頭start
*封裝請求包頭,主要封裝broker相關資訊
**/
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
//封裝requestBody,包括topic和filterServerList相關資訊
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
/**
*封裝請求包頭end
**/
//開啟多執行緒到每個nameserver進行注冊
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
//實際進行注冊方法
RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
if (result != null) {
//封裝nameserver回傳的資訊
registerBrokerResultList.add(result);
}
?
log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
}
});
}
?
try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}
?
return registerBrokerResultList;
}
從上面代碼來看,也比較簡單,首先需要封裝請求包頭和requestBody,然后開啟多執行緒到每個NameServer服務器去注冊,
請求包頭型別為RegisterBrokerRequestHeader,主要包括如下欄位:
requestBody型別是RegisterBrokerBody,主要包括如下欄位:
1)實際的路由注冊是通過registerBroker方法實作,核心代碼如下:
private RegisterBrokerResult registerBroker(
final String namesrvAddr,
final boolean oneway,
final int timeoutMills,
final RegisterBrokerRequestHeader requestHeader,
final byte[] body
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException {
//創建請求指令,需要注意RequestCode.REGISTER_BROKER,nameserver端的網路處理器會根據requestCode進行相應的業務處理
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
request.setBody(body);
//基于netty進行網路傳輸
if (oneway) {
//如果是單向呼叫,沒有回傳值,不回傳nameserver回傳結果
try {
this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
} catch (RemotingTooMuchRequestException e) {
// Ignore
}
return null;
}
//異步呼叫向nameserver發起注冊,獲取nameserver的回傳資訊
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
//獲取回傳的reponseHeader
RegisterBrokerResponseHeader responseHeader =
(RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
//重新封裝回傳結果,更新masterAddr和haServerAddr
RegisterBrokerResult result = new RegisterBrokerResult();
result.setMasterAddr(responseHeader.getMasterAddr());
result.setHaServerAddr(responseHeader.getHaServerAddr());
if (response.getBody() != null) {
result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
}
return result;
}
default:
break;
}
?
throw new MQBrokerException(response.getCode(), response.getRemark(), requestHeader == null ? null : requestHeader.getBrokerAddr());
}
borker和NameServer之間通過netty進行網路傳輸,Broker向NameServer發起注冊時會在請求中添加注冊碼RequestCode.REGISTER_BROKER,這是一種網路跟蹤方法,RocketMQ的每個請求都會定義一個requestCode,服務端的網路處理器會根據不同的requestCode進行影響的業務處理,
4.2.2 NameServer處理心跳包
Broker發出路由注冊的心跳包之后,NameServer會根據心跳包中的requestCode進行處理,NameServer的默認網路處理器是DefaultRequestProcessor,具體代碼如下:
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
if (ctx != null) {
log.debug("receive request, {} {} {}",
request.getCode(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
request);
}
switch (request.getCode()) {
......
//,如果是RequestCode.REGISTER_BROKER,進行broker注冊
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
......
default:
break;
}
return null;
}
判斷requestCode,如果是RequestCode.REGISTER_BROKER,那么確定業務處理邏輯是注冊Broker,根據Broker版本號選擇不同的方法,我們已V3_0_11以上為例,呼叫registerBrokerWithFilterServer方法進行注冊主要步驟分為三步:
** Step1 ** :
決議requestHeader并驗簽(基于crc32),判斷資料是否正確;
** Step2 ** :
決議Topic資訊;
** Step3 ** :
呼叫RouteInfoManager#registerBroker來進行Broker注冊;
核心注冊邏輯是由RouteInfoManager#registerBroker來實作,核心代碼如下:
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
//加寫鎖,防止并發寫RoutInfoManager中的路由表資訊,
this.lock.writeLock().lockInterruptibly();
//根據clusterName從clusterAddrTable中獲取所有broker名字集合
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
//如果沒有獲取到,說明broker所屬集群還沒記錄,那么需要創建,并將brokerName加入到集群的broker集合中
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);
boolean registerFirst = false;
//根據brokerName嘗試從brokerAddrTable中獲取brokerData
BrokerData brokerData = https://www.cnblogs.com/lihanlin/archive/2022/04/28/this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
//如果沒獲取到brokerData,新建BrokerData并放入brokerAddrTable,registerFirst設為true;
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap());
this.brokerAddrTable.put(brokerName, brokerData);
}
//更新brokerData中的brokerAddrs
Map brokerAddrsMap = brokerData.getBrokerAddrs();
//考慮到可能出現master掛了,slave變成master的情況,這時候brokerId會變成0,這時候需要把老的brokerAddr給洗掉
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
Iterator> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry item = it.next();
if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
it.remove();
}
}
//更新brokerAddrs,根據回傳的oldAddr判斷是否是第一次注冊的broker
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
?
//如過Broker是Master,并且Broker的Topic配置資訊發生變化或者是首次注冊,需要創建或更新Topic路由元資料,填充topicQueueTable
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry entry : tcTable.entrySet()) {
//創建或更新Topic路由元資料
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
//更新BrokerLivelnfo,BrokeLivelnfo是執行路由洗掉的重要依據
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
//注冊Broker的filterServer地址串列
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
//如果此Broker為從節點,則需要查找Broker Master的節點資訊,并更新對應masterAddr屬性
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}
?
return result;
}
通過上面的原始碼分析,可以分解出一個Broker的注冊主要分7步:
- Step1:加寫鎖,防止并發寫RoutInfoManager中的路由表資訊;
- Step2:判斷Broker所屬集群是否存在,不存在需要創建,并將Broker名加入到集群Broker集合中;
- Step3:維護BrokerData;
- Step4:如過Broker是Master,并且Broker的Topic配置資訊發生變化或者是首次注冊,需要創建或更新Topic路由元資料,填充TopicQueueTable;
- Step5:更新BrokerLivelnfo;
- Step6:注冊Broker的filterServer地址串列;
- Step7:如果此Broker為從節點,則需要查找Broker Master的節點資訊,并更新對應masterAddr屬性,并回傳給Broker端,
4.3 路由剔除
4.3.1 觸發條件
路由剔除的觸發條件主要有兩個:
NameServer每隔10s掃描BrokerLiveTable,連續120s沒收到心跳包,則移除該Broker并關閉socket連接;
Broker正常關閉時觸發路由洗掉,
4.3.2 原始碼決議
上面描述的觸發點最終洗掉路由的邏輯是一樣的,統一在RouteInfoManager#onChannelDestroy
中實作,核心代碼如下:
public void onChannelDestroy(String remoteAddr, Channel channel) {
String brokerAddrFound = null;
if (channel != null) {
try {
try {
//加讀鎖
this.lock.readLock().lockInterruptibly();
//通過channel從brokerLiveTable中找出對應的Broker地址
Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
this.brokerLiveTable.entrySet().iterator();
while (itBrokerLiveTable.hasNext()) {
Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();
if (entry.getValue().getChannel() == channel) {
brokerAddrFound = entry.getKey();
break;
}
}
} finally {
//釋放讀鎖
this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("onChannelDestroy Exception", e);
}
}
//若該Broker已經從存活的Broker地址串列中被清除,則直接使用remoteAddr
if (null == brokerAddrFound) {
brokerAddrFound = remoteAddr;
} else {
log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);
}
?
if (brokerAddrFound != null && brokerAddrFound.length() > 0) {
?
try {
try {
//申請寫鎖
this.lock.writeLock().lockInterruptibly();
//根據brokerAddress,將這個brokerAddress從brokerLiveTable和filterServerTable中移除
this.brokerLiveTable.remove(brokerAddrFound);
this.filterServerTable.remove(brokerAddrFound);
String brokerNameFound = null;
boolean removeBrokerName = false;
Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
this.brokerAddrTable.entrySet().iterator();
//遍歷brokerAddrTable
while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
BrokerData brokerData = https://www.cnblogs.com/lihanlin/archive/2022/04/28/itBrokerAddrTable.next().getValue();
?
Iterator> it = brokerData.getBrokerAddrs().entrySet().iterator();
while (it.hasNext()) {
Entry entry = it.next();
Long brokerId = entry.getKey();
String brokerAddr = entry.getValue();
//根據brokerAddress找到對應的brokerData,并將brokerData中對應的brokerAddress移除
if (brokerAddr.equals(brokerAddrFound)) {
brokerNameFound = brokerData.getBrokerName();
it.remove();
log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
brokerId, brokerAddr);
break;
}
}
//如果移除后,整個brokerData的brokerAddress空了,那么將整個brokerData移除
if (brokerData.getBrokerAddrs().isEmpty()) {
removeBrokerName = true;
itBrokerAddrTable.remove();
log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
brokerData.getBrokerName());
}
}
?
if (brokerNameFound != null && removeBrokerName) {
//遍歷clusterAddrTable
Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, Set<String>> entry = it.next();
String clusterName = entry.getKey();
Set<String> brokerNames = entry.getValue();
//根據第三步中獲取的需要移除的brokerName,將對應的brokerName移除了
boolean removed = brokerNames.remove(brokerNameFound);
if (removed) {
log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
brokerNameFound, clusterName);
//如果移除后,該集合為空,那么將整個集群從clusterAddrTable中移除
if (brokerNames.isEmpty()) {
log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
clusterName);
it.remove();
}
?
break;
}
}
}
?
if (removeBrokerName) {
Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
this.topicQueueTable.entrySet().iterator();
//遍歷topicQueueTable
while (itTopicQueueTable.hasNext()) {
Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
String topic = entry.getKey();
List<QueueData> queueDataList = entry.getValue();
?
Iterator<QueueData> itQueueData = https://www.cnblogs.com/lihanlin/archive/2022/04/28/queueDataList.iterator();
while (itQueueData.hasNext()) {
QueueData queueData = itQueueData.next();
//根據brokerName,將topic下對應的broker移除掉
if (queueData.getBrokerName().equals(brokerNameFound)) {
itQueueData.remove();
log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
topic, queueData);
}
}
//如果該topic下只有一個待移除的broker,那么該topic也從table中移除
if (queueDataList.isEmpty()) {
itTopicQueueTable.remove();
log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
topic);
}
}
}
} finally {
//釋放寫鎖
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("onChannelDestroy Exception", e);
}
}
}
路由洗掉整體邏輯主要分為6步:
- Step1:加readlock,通過channel從BrokerLiveTable中找出對應的Broker地址,釋放readlock,若該Broker已經從存活的Broker地址串列中被清除,則直接使用remoteAddr,
- Step2:申請寫鎖,根據BrokerAddress從BrokerLiveTable、filterServerTable移除,
- Step3:遍歷BrokerAddrTable,根據BrokerAddress找到對應的brokerData,并將brokerData中對應的brokerAddress移除,如果移除后,整個brokerData的brokerAddress空了,那么將整個brokerData移除,
- Step4:遍歷clusterAddrTable,根據第三步中獲取的需要移除的BrokerName,將對應的brokerName移除了,如果移除后,該集合為空,那么將整個集群從clusterAddrTable中移除,
- Step5:遍歷TopicQueueTable,根據BrokerName,將Topic下對應的Broker移除掉,如果該Topic下只有一個待移除的Broker,那么該Topic也從table中移除,
- Step6:釋放寫鎖,
從上面可以看出,路由剔除的整體邏輯比較簡單,就是單純地針對路由元資訊的資料結構進行操作,為了大家能夠更好地理解這塊代碼,建議大家對照4.1中介紹的路由元資訊的資料結構來進行代碼走讀,
4.4 路由發現
當路由資訊發生變化之后,NameServer不會主動推送給客戶端,而是等待客戶端定期到nameserver主動拉取最新路由資訊,這種設計方式降低了NameServer實作的復雜性,
4.4.1 producer主動拉取
producer在啟動后會開啟一系列定時任務,其中有一個任務就是定期從NameServer獲取Topic路由資訊,代碼入口是MQClientInstance#start-
ScheduledTask(),核心代碼如下:
private void startScheduledTask() {
......
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
?
@Override
public void run() {
try {
//從nameserver更新最新的topic路由資訊
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
?
......
}
?
/**
* 從nameserver獲取topic路由資訊
*/
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
......
//向nameserver發送請求包,requestCode為RequestCode.GET_ROUTEINFO_BY_TOPIC
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);
......
}
producer和NameServer之間通過netty進行網路傳輸,producer向NameServer發起的請求中添加注冊碼
RequestCode.GET_ROUTEINFO_BY_TOPIC,
4.4.2 NameServer回傳路由資訊
NameServer收到producer發送的請求后,會根據請求中的requestCode進行處理,處理requestCode同樣是在默認的網路處理器DefaultRequestProcessor中進行處理,最終通過RouteInfoManager#pickupTopicRouteData來實作,
** TopicRouteData結構 **
在正式決議原始碼前,我們先看下NameServer回傳給producer的資料結構,通過代碼可以看到,回傳的是一個TopicRouteData物件,具體結構如下:
其中QueueData,BrokerData,filterServerTable在4.1章節介紹路由元資訊時有介紹,
** 原始碼分析 **
在了解了回傳給producer的TopicRouteData結構后,我們進入RouteInfoManager#pickupTopicRouteData方法來看下具體如何實作,
public TopicRouteData pickupTopicRouteData(final String topic) {
TopicRouteData topicRouteData = https://www.cnblogs.com/lihanlin/archive/2022/04/28/new TopicRouteData();
boolean foundQueueData = false;
boolean foundBrokerData = false;
Set brokerNameSet = new HashSet();
List brokerDataList = new LinkedList();
topicRouteData.setBrokerDatas(brokerDataList);
?
HashMap> filterServerMap = new HashMap>();
topicRouteData.setFilterServerTable(filterServerMap);
?
try {
try {
//加讀鎖
this.lock.readLock().lockInterruptibly();
//從元資料topicQueueTable中根據topic名字獲取佇列集合
List queueDataList = this.topicQueueTable.get(topic);
if (queueDataList != null) {
//將獲取到的佇列集合寫入topicRouteData的queueDatas中
topicRouteData.setQueueDatas(queueDataList);
foundQueueData = true;
?
Iterator it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
brokerNameSet.add(qd.getBrokerName());
}
//遍歷從QueueData集合中提取的brokerName
for (String brokerName : brokerNameSet) {
//根據brokerName從brokerAddrTable獲取brokerData
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null != brokerData) {
//克隆brokerData物件,并寫入到topicRouteData的brokerDatas中
BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap) brokerData.getBrokerAddrs().clone());
brokerDataList.add(brokerDataClone);
foundBrokerData = true;
//遍歷brokerAddrs
for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
//根據brokerAddr獲取filterServerList,封裝后寫入到topicRouteData的filterServerTable中
List filterServerList = this.filterServerTable.get(brokerAddr);
filterServerMap.put(brokerAddr, filterServerList);
}
}
}
}
} finally {
//釋放讀鎖
this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("pickupTopicRouteData Exception", e);
}
?
log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
?
if (foundBrokerData && foundQueueData) {
return topicRouteData;
}
?
return null;
}
上面代碼封裝了TopicRouteData的queueDatas、BrokerDatas和filterServerTable,還有orderTopicConf欄位沒封裝,我們再看下這個欄位是在什么時候封裝的,我們向上看RouteInfoManager#pickupTopicRouteData的呼叫方法DefaultRequestProcessor#getRouteInfoByTopic如下:
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
......
//這塊代碼就是上面決議的代碼,獲取到topicRouteData物件
TopicRouteData topicRouteData = https://www.cnblogs.com/lihanlin/archive/2022/04/28/this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
?
if (topicRouteData != null) {
//判斷nameserver的orderMessageEnable配置是否打開
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
//如果配置打開了,根據namespace和topic名字獲取kvConfig組態檔中順序訊息配置內容
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
requestHeader.getTopic());
//封裝orderTopicConf
topicRouteData.setOrderTopicConf(orderTopicConf);
}
?
byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
//如果沒有獲取到topic路由,那么reponseCode為TOPIC_NOT_EXIST
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
結合這兩個方法,我們可以總結出查找Topic路由主要分為3個步驟:
呼叫RouteInfoManager#pickupTopicRouteData,從topicQueueTable,brokerAddrTabl,filterServerTable中獲取資訊,分別填充queue-
Datas、BrokerDatas、filterServerTable,如果topic為順序訊息,那么從KVconfig中獲取關于順序訊息先關的配置填充到orderTopicConf中,
如果找不到路由資訊,那么回傳code為ResponseCode.TOPIC_NOT_EXIST,
五、小結
本篇文章主要是從原始碼的角度給大家介紹了RocketMQ的NameServer,包括NameServer的啟動流程、路由注冊、路由剔除和路由發現,我們在了解了NameServer的設計原理之后,也可以回過頭思考下在設計程序中一些值得我們學習的小技巧,在此我拋磚引玉提出兩點:
- 啟動流程注冊JVM鉤子用于優雅停機,這是一個編程技巧,我們在實際開發程序中,如果有使用執行緒池或者一些常駐執行緒任務時,可以考慮通過注冊JVM鉤子的方式,在JVM關閉前釋放資源或者完成一些事情來保證優雅停機,
- 更新路由表時需要通過加鎖防止并發操作,這里使用的是鎖粒度較少的讀寫鎖,允許多個訊息發送者并發讀,保證訊息發送時的高并發,但同一時刻NameServer只處理一個Broker心跳包,多個心跳包請求串行執行,這也是讀寫鎖經典使用場景,
經典使用場景,
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/467096.html
標籤:其他