主頁 > 軟體設計 > 百度十年架構師帶你深入剖析RocketMQ原始碼-NameServer

百度十年架構師帶你深入剖析RocketMQ原始碼-NameServer

2022-04-29 08:29:50 軟體設計

尊重原創著作權: https://www.gewuweb.com/hot/18198.html

百度十年架構師帶你深入剖析RocketMQ原始碼-NameServer

尊重原創著作權:https://www.gewuweb.com/sitemap.html

一、RocketMQ架構簡介

1.1 邏輯部署圖

百度十年架構師帶你深入剖析RocketMQ原始碼-NameServer

(圖片來自網路)

1.2 核心組件說明

通過上圖可以看到,RocketMQ的核心組件主要包括4個,分別是NameServer、Broker、Producer和Consumer,下面我們先依次簡單說明下這四個核心組件:

** NameServer ** :NameServer充當路由資訊的提供者,生產者或消費者能夠通過NameServer查找各Topic相應的Broker
IP串列,多個Namesrver實體組成集群,但相互獨立,沒有資訊交換,

** Broker **
:訊息中轉角色,負責存盤訊息、轉發訊息,Broker服務器在RocketMQ系統中負責接收從生產者發送來的訊息并存盤、同時為消費者的拉取請求作準備,Broker服務器也存盤訊息相關的元資料,包括消費者組、消費進度偏移和主題和佇列訊息等,

** Producer **
:負責生產訊息,一般由業務系統負責生產訊息,一個訊息生產者會把業務應用系統里產生的訊息發送到Broker服務器,RocketMQ提供多種發送方式,同步發送、異步發送、順序發送、單向發送,同步和異步方式均需要Broker回傳確認資訊,單向發送不需要,

** Consumer **
:負責消費訊息,一般是后臺系統負責異步消費,一個訊息消費者會從Broker服務器拉取訊息、并將其提供給應用程式,從用戶應用的角度而言提供了兩種消費形式:拉取式消費、推動式消費,

除了上面說的三個核心組件外,還有Topic這個概念下面也會多次提到:

** Topic **
:表示一類訊息的集合,每個Topic包含若干條訊息,每條訊息只能屬于一個Topic,是RocketMQ進行訊息訂閱的基本單位,一個Topic可以分片在多個Broker集群上,每一個Topic分片包含多個queue,具體結構可以參考下圖:

阿里十年架構師帶你深入剖析RocketMQ原始碼-NameServer

1.3 設計理念

RocketMQ是基于主題的發布與訂閱模式,核心功能包括訊息發送、訊息存盤、訊息消費,整體設計追求簡單與性能第一,歸納來說主要是下面三種:

  • NameServer取代ZK充當注冊中心,NameServer集群間互不通信,容忍路由資訊在集群內分鐘級不一致,更加輕量級;
  • 使用記憶體映射機制實作高效的IO存盤,達到高吞吐量;
  • 容忍設計缺陷,通過ACK確保訊息至少消費一次,但是如果ACK丟失,可能訊息重復消費,這種情況設計上允許,交給使用者自己保證,

本文重點介紹的就是NameServer,我們下面一起來看下NameServer是如何啟動以及如何進行路由管理的,

二、NameServer架構設計

在第一章已經簡單介紹了NameServer取代zk作為一種更輕量級的注冊中心充當路由資訊的提供者,那么具體是如何來實作路由資訊管理的呢?我們先看下圖:

阿里十年架構師帶你深入剖析RocketMQ原始碼-NameServer

上面的圖描述了NameServer進行路由注冊、路由剔除和路由發現的核心原理,

** 路由注冊 **
:Broker服務器在啟動的時候會想NameServer集群中所有的NameServer發送心跳信號進行注冊,并會每隔30秒向nameserver發送心跳,告訴NameServer自己活著,NameServer接收到Broker發送的心跳包之后,會記錄該broker資訊,并保存最近一次收到心跳包的時間,

** 路由剔除 **
:NameServer和每個Broker保持長連接,每隔30秒接收Broker發送的心跳包,同時自身每個10秒掃描BrokerLiveTable,比較上次收到心跳時間和當前時間比較是否大于120秒,如果超過,那么認為Broker不可用,剔除路由表中該Broker相關資訊,

** 路由發現 **
:路由發現不是實時的,路由變化后,NameServer不主動推給客戶端,等待producer定期拉取最新路由資訊,這樣的設計方式降低了NameServer實作的復雜性,當路由發生變化時通過在訊息發送端的容錯機制來保證訊息發送的高可用(這塊內容會在后續介紹producer訊息發送時介紹,本文不展開講解),

** 高可用 **
:NameServer通過部署多臺NameServer服務器來保證自身的高可用,同時多個NameServer服務器之間不進行通信,這樣路由資訊發生變化時,各個NameServer服務器之間資料可能不是完全相同的,但是通過發送端的容錯機制保證訊息發送的高可用,這個也正是NameServer追求簡單高效的目的所在,

三、 啟動流程

在整理了解了NameServer的架構設計之后,我們先來看下NameServer到底是如何啟動的呢?

既然是原始碼解讀,那么我們先來看下代碼入口:org.apache.rocketmq.namesrv.NamesrvStartup#main(String[]
args),實際呼叫的是main0()方法,

代碼如下:

public static NamesrvController main0(String[] args) {

    try {
        //創建namesrvController
        NamesrvController controller = createNamesrvController(args);
        //初始化并啟動NamesrvController
        start(controller);
        String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
        log.info(tip);
        System.out.printf("%s%n", tip);
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }
?
    return null;
}

通過main方法啟動NameServer,主要分為兩大步,先創建NamesrvController,然后再初始化并啟動NamesrvController,我們分別展開來分析,

3.1 時序圖

具體展開閱讀代碼之前,我們先通過一個序列圖對整體流程有個了解,如下圖:

阿里十年架構師帶你深入剖析RocketMQ原始碼-NameServer

3.2 創建NamesrvController

先來看核心代碼,如下:

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
    // 設定版本號為當前版本號
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
    //PackageConflictDetect.detectFastjson();
  //構造org.apache.commons.cli.Options,并添加-h -n引數,-h引數是列印幫助資訊,-n引數是指定namesrvAddr
    Options options = ServerUtil.buildCommandlineOptions(new Options());
    //初始化commandLine,并在options中添加-c -p引數,-c指定nameserver的組態檔路徑,-p標識列印配置資訊
    commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
    if (null == commandLine) {
        System.exit(-1);
        return null;
    }
  //nameserver配置類,業務引數
    final NamesrvConfig namesrvConfig = new NamesrvConfig();
    //netty服務器配置類,網路引數
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    //設定nameserver的埠號
    nettyServerConfig.setListenPort(9876);
    //命令帶有-c引數,說明指定組態檔,需要根據組態檔路徑讀取組態檔內容,并將檔案中配置資訊賦值給NamesrvConfig和NettyServerConfig
    if (commandLine.hasOption('c')) {
        String file = commandLine.getOptionValue('c');
        if (file != null) {
            InputStream in = new BufferedInputStream(new FileInputStream(file));
            properties = new Properties();
            properties.load(in);
            //反射的方式
            MixAll.properties2Object(properties, namesrvConfig);
            MixAll.properties2Object(properties, nettyServerConfig);
      //設定組態檔路徑
            namesrvConfig.setConfigStorePath(file);
?
            System.out.printf("load config properties file OK, %s%n", file);
            in.close();
        }
    }
  //命令列帶有-p,說明是列印引數的命令,那么就列印出NamesrvConfig和NettyServerConfig的屬性,在啟動NameServer時可以先使用./mqnameserver -c configFile -p列印當前加載的配置屬性 
    if (commandLine.hasOption('p')) {
        InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
        MixAll.printObjectProperties(console, namesrvConfig);
        MixAll.printObjectProperties(console, nettyServerConfig);
        //列印引數命令不需要啟動nameserver服務,只需要列印引數即可
        System.exit(0);
    }
  //決議命令列引數,并加載到namesrvConfig中
    MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
  //檢查ROCKETMQ_HOME,不能為空
    if (null == namesrvConfig.getRocketmqHome()) {
        System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
        System.exit(-2);
    }
  //初始化logback日志工廠,rocketmq默認使用logback作為日志輸出
    LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
    JoranConfigurator configurator = new JoranConfigurator();
    configurator.setContext(lc);
    lc.reset();
    configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
?
    log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
?
    MixAll.printObjectProperties(log, namesrvConfig);
    MixAll.printObjectProperties(log, nettyServerConfig);
  //創建NamesrvController
    final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
?
    //將全域Properties的內容復制到NamesrvController.Configuration.allConfigs中
    // remember all configs to prevent discard
    controller.getConfiguration().registerConfig(properties);
?
    return controller;
}

通過上面對每一行代碼的注釋,可以看出來,創建NamesrvController的程序主要分為兩步:

Step1:通過命令列中獲取配置,賦值給NamesrvConfig和NettyServerConfig類,

Step2:根據配置類NamesrvConfig和NettyServerConfig構造一個NamesrvController實體,

可見NamesrvConfig和NettyServerConfig是想當重要的,這兩個類分別是NameServer的業務引數和網路引數,我們分別看下這兩個類里面有哪些屬性:

** NamesrvConfig **

阿里十年架構師帶你深入剖析RocketMQ原始碼-NameServer

** NettyServerConfig **

阿里十年架構師帶你深入剖析RocketMQ原始碼-NameServer

注:Apache Commons CLI是開源的命令列決議工具,它可以幫助開發者快速構建啟動命令,并且幫助你組織命令的引數、以及輸出串列等,

3.3 初始化并啟動

創建了NamesrvController實體之后,開始初始化并啟動NameServer,

首先進行初始化,代碼入口是NamesrvController#initialize,

public boolean initialize() {
  //加載kvConfigPath下kvConfig.json組態檔里的KV配置,然后將這些配置放到KVConfigManager#configTable屬性中
    this.kvConfigManager.load();
  //根據nettyServerConfig初始化一個netty服務器,
    //brokerHousekeepingService是在NamesrvController實體化時建構式里實體化的,該類負責Broker連接事件的處理,實作了ChannelEventListener,主要用來管理RouteInfoManager的brokerLiveTable
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
  //初始化負責處理Netty網路互動資料的執行緒池,默認執行緒數是8個
    this.remotingExecutor =
        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
  //注冊Netty服務端業務處理邏輯,如果開啟了clusterTest,那么注冊的請求處理類是ClusterTestRequestProcessor,否則請求處理類是DefaultRequestProcessor
    this.registerProcessor();
  //注冊心跳機制執行緒池,延遲5秒啟動,每隔10秒遍歷RouteInfoManager#brokerLiveTable這個屬性,用來掃描不存活的broker
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
?
        @Override
        public void run() {
            NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);
  //注冊列印KV配置執行緒池,延遲1分鐘啟動、每10分鐘列印出kvConfig配置
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
?
        @Override
        public void run() {
            NamesrvController.this.kvConfigManager.printAllPeriodically();
        }
    }, 1, 10, TimeUnit.MINUTES);
  //rocketmq可以通過開啟TLS來提高資料傳輸的安全性,如果開啟了,那么需要注冊一個監聽器來重新加載SslContext
    if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
        // Register a listener to reload SslContext
        try {
            fileWatchService = new FileWatchService(
                new String[] {
                    TlsSystemConfig.tlsServerCertPath,
                    TlsSystemConfig.tlsServerKeyPath,
                    TlsSystemConfig.tlsServerTrustCertPath
                },
                new FileWatchService.Listener() {
                    boolean certChanged, keyChanged = false;
                    @Override
                    public void onChanged(String path) {
                        if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                            log.info("The trust certificate changed, reload the ssl context");
                            reloadServerSslContext();
                        }
                        if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                            certChanged = true;
                        }
                        if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                            keyChanged = true;
                        }
                        if (certChanged && keyChanged) {
                            log.info("The certificate and private key changed, reload the ssl context");
                            certChanged = keyChanged = false;
                            reloadServerSslContext();
                        }
                    }
                    private void reloadServerSslContext() {
                        ((NettyRemotingServer) remotingServer).loadSslContext();
                    }
                });
        } catch (Exception e) {
            log.warn("FileWatchService created error, can't load the certificate dynamically");
        }
    }
?
    return true;
}

上面的代碼是NameServer初始化流程,通過每行代碼的注釋,可以看出來,主要有5步驟操作:

  • Step1:加載KV配置,并寫入到KVConfigManager的configTable屬性中;
  • Step2:初始化netty服務器;
  • Step3:初始化處理netty網路互動資料的執行緒池;
  • Step4:注冊心跳機制執行緒池,啟動5秒后每隔10秒檢測一次Broker的存活情況;
  • Step5:注冊列印KV配置的執行緒池,啟動1分鐘后,每隔10分鐘列印一次KV配置,

RocketMQ的開發團隊還使用了一個常用的編程技巧,就是使用JVM鉤子函式對NameServer進行優雅停機,這樣在JVM行程關閉前,會先執行shutdown操作,

Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
    @Override
    public Void call() throws Exception {
        controller.shutdown();
        return null;
    }
}));

執行start函式,啟動NameServer,代碼比較簡單,就是將第一步中創建的netty
server進行啟動,其中remotingServer.start()方法不展開詳細說明了,需要對netty比較熟悉,不是本篇文章重點,有興趣的同學可以自行下載原始碼閱讀,

public void start() throws Exception {
    //啟動netty服務
    this.remotingServer.start();
  //如果開啟了TLS
    if (this.fileWatchService != null) {
        this.fileWatchService.start();
    }
}

四、路由管理

我們在第二章開篇有了解到NameServer作為一個輕量級的注冊中心,主要是為訊息生產者和消費者提供Topic的路由資訊,并對這些路由資訊和Broker節點進行管理,主要包括路由注冊、路由剔除和路由發現,

本章將會通過原始碼的角度來具體分析NameServer是如果進行路由資訊管理的,核心代碼主要都在org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager中實作,

4.1 路由元資訊

在了解路由資訊管理之前,我們首先需要了解NameServer到底存盤了哪些路由元資訊,資料結構分別是什么樣的,

查看代碼我們可以看到主要通過5個屬性來維護路由元資訊,如下:

private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

我們依次對這5個屬性進行展開說明,

4.1.1 TopicQueueTable

說明:Topic訊息佇列路由資訊,訊息發送時根據路由表進行負載均衡,

資料結構:HashMap結構,key是Topic名字,value是一個型別是QueueData的佇列集合,在第一章就講過,一個Topic中有多個佇列,QueueData的資料結構如下:

阿里十年架構師帶你深入剖析RocketMQ原始碼-NameServer

** 資料結構示例: **

topicQueueTable:{
    "topic1": [
        {
            "brokerName": "broker-a",
            "readQueueNums":4,
            "writeQueueNums":4,
            "perm":6,
            "topicSynFlag":0,
        },
        {
            "brokerName": "broker-b",
            "readQueueNums":4,
            "writeQueueNums":4,
            "perm":6,
            "topicSynFlag":0,
        }
    ]
}

4.1.2 BrokerAddrTable

說明:Broker基礎資訊,包含BrokerName、所屬集群名稱、主備Broker地址,

** 資料結構 **
:HashMap結構,key是BrokerName,value是一個型別是BrokerData的物件,BrokerData的資料結構如下(可以結合下面Broker主從結構邏輯圖來理解):

阿里十年架構師帶你深入剖析RocketMQ原始碼-NameServer

Broker主從結構邏輯圖:

阿里十年架構師帶你深入剖析RocketMQ原始碼-NameServer

資料結構示例:

brokerAddrTable:{
    "broker-a": {
        "cluster": "c1",
        "brokerName": "broker-a",
        "brokerAddrs": {
            0: "192.168.1.1:10000",
            1: "192.168.1.2:10000"
        }
    },
    "broker-b": {
        "cluster": "c1",
        "brokerName": "broker-b",
        "brokerAddrs": {
            0: "192.168.1.3:10000",
            1: "192.168.1.4:10000"
        }
    }
}

4.1.3 ClusterAddrTable

說明:Broker集群資訊,存盤集群中所有Broker名稱,

資料結構:HashMap結構,key是ClusterName,value是存盤BrokerName的Set結構,

資料結構示例:

clusterAddrTable:{
    "c1": ["broker-a","broker-b"]
}

4.1.4 BrokerLiveTable

說明:Broker狀態資訊,NameServer每次收到心跳包時會替換該資訊

** 資料結構 **
:HashMap結構,key是Broker的地址,value是BrokerLiveInfo結構的該Broker資訊物件,BrokerLiveInfo的資料結構如下:

阿里十年架構師帶你深入剖析RocketMQ原始碼-NameServer

資料結構示例:

brokerLiveTable:{
    "192.168.1.1:10000": {
            "lastUpdateTimestamp": 1518270318980,
            "dataVersion":versionObj1,
            "channel":channelObj,
            "haServerAddr":""
    },
    "192.168.1.2:10000": {
            "lastUpdateTimestamp": 1518270318980,
            "dataVersion":versionObj1,
            "channel":channelObj,
            "haServerAddr":"192.168.1.1:10000"
     },
    "192.168.1.3:10000": {
            "lastUpdateTimestamp": 1518270318980,
            "dataVersion":versionObj1,
            "channel":channelObj,
            "haServerAddr":""
     },
    "192.168.1.4:10000": {
            "lastUpdateTimestamp": 1518270318980,
            "dataVersion":versionObj1,
            "channel":channelObj,
            "haServerAddr":"192.168.1.3:10000"
     }
}

4.1.5 filterServerTable

說明:Broker上的FilterServer串列,訊息過濾服務器串列,后續介紹Consumer時會介紹,consumer拉取資料是通過filterServer拉取,consumer向Broker注冊,

** 資料結構 ** :HashMap結構,key是Broker地址,value是記錄了filterServer地址的List集合,

4.2 路由注冊

路由注冊是通過Broker和NameServer之間的心跳功能來實作的,主要分為兩步:

** Step1: **

Broker啟動時向集群中所有NameServer發送心跳陳述句,每隔30秒(默認30s,時間間隔在10秒到60秒之間)再發一次,

** Step2: **

NameServer收到心跳包更新topicQueueTable,brokerAddrTable,brokerLiveTable,clusterAddrTable,filterServerTable,

我們分別展開分析這兩步,

4.2.1 Broker發送心跳包

發送心跳包的核心邏輯是在Broker啟動邏輯里,代碼入口是org.apache.rocketmq.broker.BrokerController#start,本篇文章重點關注的是發送心跳包的邏輯實作,只列出發送心跳包的核心代碼,如下:

1)創建了一個執行緒池注冊Broker,程式啟動10秒后執行,每隔30秒(默認30s,時間間隔在10秒到60秒之間,BrokerConfig.getRegisterNameServerPeriod()的默認值是30秒)執行一次,

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
?
    @Override
    public void run() {
        try {
            BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
        } catch (Throwable e) {
            log.error("registerBrokerAll Exception", e);
        }
    }
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

2)封裝Topic配置和版本號之后,進行實際的路由注冊(注:封裝Topic配置不是本篇重點,會在介紹Broker原始碼時重點講解),實際路由注冊是在org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll中實作,核心代碼如下:

public List<RegisterBrokerResult> registerBrokerAll(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,
    final boolean oneway,
    final int timeoutMills,
    final boolean compressed) {
?
    final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
    //獲取nameserver地址串列
    List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
    if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
    /**
      *封裝請求包頭start
      *封裝請求包頭,主要封裝broker相關資訊
    **/
        final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
        requestHeader.setBrokerAddr(brokerAddr);
        requestHeader.setBrokerId(brokerId);
        requestHeader.setBrokerName(brokerName);
        requestHeader.setClusterName(clusterName);
        requestHeader.setHaServerAddr(haServerAddr);
        requestHeader.setCompressed(compressed);
    //封裝requestBody,包括topic和filterServerList相關資訊
        RegisterBrokerBody requestBody = new RegisterBrokerBody();
        requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
        requestBody.setFilterServerList(filterServerList);
        final byte[] body = requestBody.encode(compressed);
        final int bodyCrc32 = UtilAll.crc32(body);
        requestHeader.setBodyCrc32(bodyCrc32);
        /**
      *封裝請求包頭end
    **/
        //開啟多執行緒到每個nameserver進行注冊
        final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
        for (final String namesrvAddr : nameServerAddressList) {
            brokerOuterExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        //實際進行注冊方法
                        RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                        if (result != null) {
                            //封裝nameserver回傳的資訊
                            registerBrokerResultList.add(result);
                        }
?
                        log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                    } catch (Exception e) {
                        log.warn("registerBroker Exception, {}", namesrvAddr, e);
                    } finally {
                        countDownLatch.countDown();
                    }
                }
            });
        }
?
        try {
            countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
        }
    }
?
    return registerBrokerResultList;
}

從上面代碼來看,也比較簡單,首先需要封裝請求包頭和requestBody,然后開啟多執行緒到每個NameServer服務器去注冊,

請求包頭型別為RegisterBrokerRequestHeader,主要包括如下欄位:

阿里十年架構師帶你深入剖析RocketMQ原始碼-NameServer

requestBody型別是RegisterBrokerBody,主要包括如下欄位:

阿里十年架構師帶你深入剖析RocketMQ原始碼-NameServer

1)實際的路由注冊是通過registerBroker方法實作,核心代碼如下:

private RegisterBrokerResult registerBroker(
    final String namesrvAddr,
    final boolean oneway,
    final int timeoutMills,
    final RegisterBrokerRequestHeader requestHeader,
    final byte[] body
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException {
    //創建請求指令,需要注意RequestCode.REGISTER_BROKER,nameserver端的網路處理器會根據requestCode進行相應的業務處理
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
    request.setBody(body);
  //基于netty進行網路傳輸
    if (oneway) {
        //如果是單向呼叫,沒有回傳值,不回傳nameserver回傳結果
        try {
            this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
        } catch (RemotingTooMuchRequestException e) {
            // Ignore
        }
        return null;
    }
  //異步呼叫向nameserver發起注冊,獲取nameserver的回傳資訊
    RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            //獲取回傳的reponseHeader
            RegisterBrokerResponseHeader responseHeader =
                (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
            //重新封裝回傳結果,更新masterAddr和haServerAddr
            RegisterBrokerResult result = new RegisterBrokerResult();
            result.setMasterAddr(responseHeader.getMasterAddr());
            result.setHaServerAddr(responseHeader.getHaServerAddr());
            if (response.getBody() != null) {
                result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
            }
            return result;
        }
        default:
            break;
    }
?
    throw new MQBrokerException(response.getCode(), response.getRemark(), requestHeader == null ? null : requestHeader.getBrokerAddr());
}

borker和NameServer之間通過netty進行網路傳輸,Broker向NameServer發起注冊時會在請求中添加注冊碼RequestCode.REGISTER_BROKER,這是一種網路跟蹤方法,RocketMQ的每個請求都會定義一個requestCode,服務端的網路處理器會根據不同的requestCode進行影響的業務處理,

4.2.2 NameServer處理心跳包

Broker發出路由注冊的心跳包之后,NameServer會根據心跳包中的requestCode進行處理,NameServer的默認網路處理器是DefaultRequestProcessor,具體代碼如下:

public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
    if (ctx != null) {
        log.debug("receive request, {} {} {}",
                  request.getCode(),
                  RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                  request);
    }
    switch (request.getCode()) {
        ......
        //,如果是RequestCode.REGISTER_BROKER,進行broker注冊
        case RequestCode.REGISTER_BROKER:
            Version brokerVersion = MQVersion.value2Version(request.getVersion());
            if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                return this.registerBrokerWithFilterServer(ctx, request);
            } else {
                return this.registerBroker(ctx, request);
            }
        ......
        default:
            break;
    }
    return null;
}

判斷requestCode,如果是RequestCode.REGISTER_BROKER,那么確定業務處理邏輯是注冊Broker,根據Broker版本號選擇不同的方法,我們已V3_0_11以上為例,呼叫registerBrokerWithFilterServer方法進行注冊主要步驟分為三步:

** Step1 ** :

決議requestHeader并驗簽(基于crc32),判斷資料是否正確;

** Step2 ** :

決議Topic資訊;

** Step3 ** :

呼叫RouteInfoManager#registerBroker來進行Broker注冊;

核心注冊邏輯是由RouteInfoManager#registerBroker來實作,核心代碼如下:

public RegisterBrokerResult registerBroker(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,
    final Channel channel) {
    RegisterBrokerResult result = new RegisterBrokerResult();
    try {
        try {
            //加寫鎖,防止并發寫RoutInfoManager中的路由表資訊,
            this.lock.writeLock().lockInterruptibly();
      //根據clusterName從clusterAddrTable中獲取所有broker名字集合
            Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
            //如果沒有獲取到,說明broker所屬集群還沒記錄,那么需要創建,并將brokerName加入到集群的broker集合中
            if (null == brokerNames) {
                brokerNames = new HashSet<String>();
                this.clusterAddrTable.put(clusterName, brokerNames);
            }
            brokerNames.add(brokerName);
      
            boolean registerFirst = false;
      //根據brokerName嘗試從brokerAddrTable中獲取brokerData
            BrokerData brokerData = https://www.cnblogs.com/lihanlin/archive/2022/04/28/this.brokerAddrTable.get(brokerName);
            if (null == brokerData) {
                //如果沒獲取到brokerData,新建BrokerData并放入brokerAddrTable,registerFirst設為true;
                registerFirst = true;
                brokerData = new BrokerData(clusterName, brokerName, new HashMap());
                this.brokerAddrTable.put(brokerName, brokerData);
            }
            //更新brokerData中的brokerAddrs
            Map brokerAddrsMap = brokerData.getBrokerAddrs();
            //考慮到可能出現master掛了,slave變成master的情況,這時候brokerId會變成0,這時候需要把老的brokerAddr給洗掉
            //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
            //The same IP:PORT must only have one record in brokerAddrTable
            Iterator> it = brokerAddrsMap.entrySet().iterator();
            while (it.hasNext()) {
                Entry item = it.next();
                if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
                    it.remove();
                }
            }
      //更新brokerAddrs,根據回傳的oldAddr判斷是否是第一次注冊的broker
            String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
            registerFirst = registerFirst || (null == oldAddr);
?
            //如過Broker是Master,并且Broker的Topic配置資訊發生變化或者是首次注冊,需要創建或更新Topic路由元資料,填充topicQueueTable
            if (null != topicConfigWrapper
                && MixAll.MASTER_ID == brokerId) {
                if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                    || registerFirst) {
                    ConcurrentMap tcTable =
                        topicConfigWrapper.getTopicConfigTable();
                    if (tcTable != null) {
                        for (Map.Entry entry : tcTable.entrySet()) {
                            //創建或更新Topic路由元資料
                            this.createAndUpdateQueueData(brokerName, entry.getValue());
                        }
                    }
                }
            }
      //更新BrokerLivelnfo,BrokeLivelnfo是執行路由洗掉的重要依據
            BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                                                                         new BrokerLiveInfo(
                                                                             System.currentTimeMillis(),
                                                                             topicConfigWrapper.getDataVersion(),
                                                                             channel,
                                                                             haServerAddr));
            if (null == prevBrokerLiveInfo) {
                log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
            }
      //注冊Broker的filterServer地址串列
            if (filterServerList != null) {
                if (filterServerList.isEmpty()) {
                    this.filterServerTable.remove(brokerAddr);
                } else {
                    this.filterServerTable.put(brokerAddr, filterServerList);
                }
            }
      //如果此Broker為從節點,則需要查找Broker Master的節點資訊,并更新對應masterAddr屬性
            if (MixAll.MASTER_ID != brokerId) {
                String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                if (masterAddr != null) {
                    BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                    if (brokerLiveInfo != null) {
                        result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                        result.setMasterAddr(masterAddr);
                    }
                }
            }
        } finally {
            this.lock.writeLock().unlock();
        }
    } catch (Exception e) {
        log.error("registerBroker Exception", e);
    }
?
    return result;
}

通過上面的原始碼分析,可以分解出一個Broker的注冊主要分7步:

  • Step1:加寫鎖,防止并發寫RoutInfoManager中的路由表資訊;
  • Step2:判斷Broker所屬集群是否存在,不存在需要創建,并將Broker名加入到集群Broker集合中;
  • Step3:維護BrokerData;
  • Step4:如過Broker是Master,并且Broker的Topic配置資訊發生變化或者是首次注冊,需要創建或更新Topic路由元資料,填充TopicQueueTable;
  • Step5:更新BrokerLivelnfo;
  • Step6:注冊Broker的filterServer地址串列;
  • Step7:如果此Broker為從節點,則需要查找Broker Master的節點資訊,并更新對應masterAddr屬性,并回傳給Broker端,

4.3 路由剔除

4.3.1 觸發條件

路由剔除的觸發條件主要有兩個:

NameServer每隔10s掃描BrokerLiveTable,連續120s沒收到心跳包,則移除該Broker并關閉socket連接;

Broker正常關閉時觸發路由洗掉,

4.3.2 原始碼決議

上面描述的觸發點最終洗掉路由的邏輯是一樣的,統一在RouteInfoManager#onChannelDestroy

中實作,核心代碼如下:

public void onChannelDestroy(String remoteAddr, Channel channel) {
    String brokerAddrFound = null;
    if (channel != null) {
        try {
            try {
                //加讀鎖
                this.lock.readLock().lockInterruptibly();
                //通過channel從brokerLiveTable中找出對應的Broker地址
                Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
                    this.brokerLiveTable.entrySet().iterator();
                while (itBrokerLiveTable.hasNext()) {
                    Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();
                    if (entry.getValue().getChannel() == channel) {
                        brokerAddrFound = entry.getKey();
                        break;
                    }
                }
            } finally {
                //釋放讀鎖
                this.lock.readLock().unlock();
            }
        } catch (Exception e) {
            log.error("onChannelDestroy Exception", e);
        }
    }
  //若該Broker已經從存活的Broker地址串列中被清除,則直接使用remoteAddr
    if (null == brokerAddrFound) {
        brokerAddrFound = remoteAddr;
    } else {
        log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);
    }
?
    if (brokerAddrFound != null && brokerAddrFound.length() > 0) {
?
        try {
            try {
                //申請寫鎖
                this.lock.writeLock().lockInterruptibly();
                //根據brokerAddress,將這個brokerAddress從brokerLiveTable和filterServerTable中移除
                this.brokerLiveTable.remove(brokerAddrFound);
                this.filterServerTable.remove(brokerAddrFound);
                String brokerNameFound = null;
                boolean removeBrokerName = false;
                Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
                    this.brokerAddrTable.entrySet().iterator();
                //遍歷brokerAddrTable
                while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
                    BrokerData brokerData = https://www.cnblogs.com/lihanlin/archive/2022/04/28/itBrokerAddrTable.next().getValue();
?
                    Iterator> it = brokerData.getBrokerAddrs().entrySet().iterator();
                    while (it.hasNext()) {
                        Entry entry = it.next();
                        Long brokerId = entry.getKey();
                        String brokerAddr = entry.getValue();
                        //根據brokerAddress找到對應的brokerData,并將brokerData中對應的brokerAddress移除
                        if (brokerAddr.equals(brokerAddrFound)) {
                            brokerNameFound = brokerData.getBrokerName();
                            it.remove();
                            log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
                                     brokerId, brokerAddr);
                            break;
                        }
                    }
          //如果移除后,整個brokerData的brokerAddress空了,那么將整個brokerData移除
                    if (brokerData.getBrokerAddrs().isEmpty()) {
                        removeBrokerName = true;
                        itBrokerAddrTable.remove();
                        log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
                                 brokerData.getBrokerName());
                    }
                }
?
                if (brokerNameFound != null && removeBrokerName) {
                    //遍歷clusterAddrTable
                    Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
                    while (it.hasNext()) {
                        Entry<String, Set<String>> entry = it.next();
                        String clusterName = entry.getKey();
                        Set<String> brokerNames = entry.getValue();
                        //根據第三步中獲取的需要移除的brokerName,將對應的brokerName移除了
                        boolean removed = brokerNames.remove(brokerNameFound);
                        if (removed) {
                            log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
                                     brokerNameFound, clusterName);
              //如果移除后,該集合為空,那么將整個集群從clusterAddrTable中移除
                            if (brokerNames.isEmpty()) {
                                log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
                                         clusterName);
                                it.remove();
                            }
?
                            break;
                        }
                    }
                }
?
                if (removeBrokerName) {
                    Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
                        this.topicQueueTable.entrySet().iterator();
                    //遍歷topicQueueTable
                    while (itTopicQueueTable.hasNext()) {
                        Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
                        String topic = entry.getKey();
                        List<QueueData> queueDataList = entry.getValue();
?
                        Iterator<QueueData> itQueueData = https://www.cnblogs.com/lihanlin/archive/2022/04/28/queueDataList.iterator();
                        while (itQueueData.hasNext()) {
                            QueueData queueData = itQueueData.next();
                            //根據brokerName,將topic下對應的broker移除掉
                            if (queueData.getBrokerName().equals(brokerNameFound)) {
                                itQueueData.remove();
                                log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
                                         topic, queueData);
                            }
                        }
            //如果該topic下只有一個待移除的broker,那么該topic也從table中移除
                        if (queueDataList.isEmpty()) {
                            itTopicQueueTable.remove();
                            log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
                                     topic);
                        }
                    }
                }
            } finally {
                //釋放寫鎖
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("onChannelDestroy Exception", e);
        }
    }
}

路由洗掉整體邏輯主要分為6步:

  • Step1:加readlock,通過channel從BrokerLiveTable中找出對應的Broker地址,釋放readlock,若該Broker已經從存活的Broker地址串列中被清除,則直接使用remoteAddr,
  • Step2:申請寫鎖,根據BrokerAddress從BrokerLiveTable、filterServerTable移除,
  • Step3:遍歷BrokerAddrTable,根據BrokerAddress找到對應的brokerData,并將brokerData中對應的brokerAddress移除,如果移除后,整個brokerData的brokerAddress空了,那么將整個brokerData移除,
  • Step4:遍歷clusterAddrTable,根據第三步中獲取的需要移除的BrokerName,將對應的brokerName移除了,如果移除后,該集合為空,那么將整個集群從clusterAddrTable中移除,
  • Step5:遍歷TopicQueueTable,根據BrokerName,將Topic下對應的Broker移除掉,如果該Topic下只有一個待移除的Broker,那么該Topic也從table中移除,
  • Step6:釋放寫鎖,

從上面可以看出,路由剔除的整體邏輯比較簡單,就是單純地針對路由元資訊的資料結構進行操作,為了大家能夠更好地理解這塊代碼,建議大家對照4.1中介紹的路由元資訊的資料結構來進行代碼走讀,

4.4 路由發現

當路由資訊發生變化之后,NameServer不會主動推送給客戶端,而是等待客戶端定期到nameserver主動拉取最新路由資訊,這種設計方式降低了NameServer實作的復雜性,

4.4.1 producer主動拉取

producer在啟動后會開啟一系列定時任務,其中有一個任務就是定期從NameServer獲取Topic路由資訊,代碼入口是MQClientInstance#start-
ScheduledTask(),核心代碼如下:

private void startScheduledTask() {
    ......
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
?
        @Override
        public void run() {
            try {
                //從nameserver更新最新的topic路由資訊
                MQClientInstance.this.updateTopicRouteInfoFromNameServer();
            } catch (Exception e) {
                log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
            }
        }
    }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
?
    ......
}
?
/**
    * 從nameserver獲取topic路由資訊
    */
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
                                                      boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
    ......
    //向nameserver發送請求包,requestCode為RequestCode.GET_ROUTEINFO_BY_TOPIC
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);
  ......
}

producer和NameServer之間通過netty進行網路傳輸,producer向NameServer發起的請求中添加注冊碼

RequestCode.GET_ROUTEINFO_BY_TOPIC,

4.4.2 NameServer回傳路由資訊

NameServer收到producer發送的請求后,會根據請求中的requestCode進行處理,處理requestCode同樣是在默認的網路處理器DefaultRequestProcessor中進行處理,最終通過RouteInfoManager#pickupTopicRouteData來實作,

** TopicRouteData結構 **

在正式決議原始碼前,我們先看下NameServer回傳給producer的資料結構,通過代碼可以看到,回傳的是一個TopicRouteData物件,具體結構如下:

阿里十年架構師帶你深入剖析RocketMQ原始碼-NameServer

其中QueueData,BrokerData,filterServerTable在4.1章節介紹路由元資訊時有介紹,

** 原始碼分析 **

在了解了回傳給producer的TopicRouteData結構后,我們進入RouteInfoManager#pickupTopicRouteData方法來看下具體如何實作,

public TopicRouteData pickupTopicRouteData(final String topic) {
    TopicRouteData topicRouteData = https://www.cnblogs.com/lihanlin/archive/2022/04/28/new TopicRouteData();
    boolean foundQueueData = false;
    boolean foundBrokerData = false;
    Set brokerNameSet = new HashSet();
    List brokerDataList = new LinkedList();
    topicRouteData.setBrokerDatas(brokerDataList);
?
    HashMap> filterServerMap = new HashMap>();
    topicRouteData.setFilterServerTable(filterServerMap);
?
    try {
        try {
            //加讀鎖
            this.lock.readLock().lockInterruptibly();
            //從元資料topicQueueTable中根據topic名字獲取佇列集合
            List queueDataList = this.topicQueueTable.get(topic);
            if (queueDataList != null) {
                //將獲取到的佇列集合寫入topicRouteData的queueDatas中
                topicRouteData.setQueueDatas(queueDataList);
                foundQueueData = true;
?
                Iterator it = queueDataList.iterator();
                while (it.hasNext()) {
                    QueueData qd = it.next();
                    brokerNameSet.add(qd.getBrokerName());
                }
        //遍歷從QueueData集合中提取的brokerName
                for (String brokerName : brokerNameSet) {
                    //根據brokerName從brokerAddrTable獲取brokerData
                    BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                    if (null != brokerData) {
                        //克隆brokerData物件,并寫入到topicRouteData的brokerDatas中
                        BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap) brokerData.getBrokerAddrs().clone());
                        brokerDataList.add(brokerDataClone);
                        foundBrokerData = true;
                        //遍歷brokerAddrs
                        for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
                            //根據brokerAddr獲取filterServerList,封裝后寫入到topicRouteData的filterServerTable中
                            List filterServerList = this.filterServerTable.get(brokerAddr);
                            filterServerMap.put(brokerAddr, filterServerList);
                        }
                    }
                }
            }
        } finally {
            //釋放讀鎖
            this.lock.readLock().unlock();
        }
    } catch (Exception e) {
        log.error("pickupTopicRouteData Exception", e);
    }
?
    log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
?
    if (foundBrokerData && foundQueueData) {
        return topicRouteData;
    }
?
    return null;
}

上面代碼封裝了TopicRouteData的queueDatas、BrokerDatas和filterServerTable,還有orderTopicConf欄位沒封裝,我們再看下這個欄位是在什么時候封裝的,我們向上看RouteInfoManager#pickupTopicRouteData的呼叫方法DefaultRequestProcessor#getRouteInfoByTopic如下:

public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
                                           RemotingCommand request) throws RemotingCommandException {
    ......
  //這塊代碼就是上面決議的代碼,獲取到topicRouteData物件
    TopicRouteData topicRouteData = https://www.cnblogs.com/lihanlin/archive/2022/04/28/this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
?
    if (topicRouteData != null) {
        //判斷nameserver的orderMessageEnable配置是否打開
        if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
            //如果配置打開了,根據namespace和topic名字獲取kvConfig組態檔中順序訊息配置內容
            String orderTopicConf =
                this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
                                                                        requestHeader.getTopic());
            //封裝orderTopicConf
            topicRouteData.setOrderTopicConf(orderTopicConf);
        }
?
        byte[] content = topicRouteData.encode();
        response.setBody(content);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }
  //如果沒有獲取到topic路由,那么reponseCode為TOPIC_NOT_EXIST
    response.setCode(ResponseCode.TOPIC_NOT_EXIST);
    response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
                       + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
    return response;
}

結合這兩個方法,我們可以總結出查找Topic路由主要分為3個步驟:

呼叫RouteInfoManager#pickupTopicRouteData,從topicQueueTable,brokerAddrTabl,filterServerTable中獲取資訊,分別填充queue-
Datas、BrokerDatas、filterServerTable,

如果topic為順序訊息,那么從KVconfig中獲取關于順序訊息先關的配置填充到orderTopicConf中,

如果找不到路由資訊,那么回傳code為ResponseCode.TOPIC_NOT_EXIST,

五、小結

本篇文章主要是從原始碼的角度給大家介紹了RocketMQ的NameServer,包括NameServer的啟動流程、路由注冊、路由剔除和路由發現,我們在了解了NameServer的設計原理之后,也可以回過頭思考下在設計程序中一些值得我們學習的小技巧,在此我拋磚引玉提出兩點:

  • 啟動流程注冊JVM鉤子用于優雅停機,這是一個編程技巧,我們在實際開發程序中,如果有使用執行緒池或者一些常駐執行緒任務時,可以考慮通過注冊JVM鉤子的方式,在JVM關閉前釋放資源或者完成一些事情來保證優雅停機,
  • 更新路由表時需要通過加鎖防止并發操作,這里使用的是鎖粒度較少的讀寫鎖,允許多個訊息發送者并發讀,保證訊息發送時的高并發,但同一時刻NameServer只處理一個Broker心跳包,多個心跳包請求串行執行,這也是讀寫鎖經典使用場景,

經典使用場景,

轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/467096.html

標籤:其他

上一篇:主流作業流引擎 flowable 三種方式部署流程

下一篇:主流作業流引擎 flowable 三種方式部署流程

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 面試突擊第一季,第二季,第三季

    第一季必考 https://www.bilibili.com/video/BV1FE411y79Y?from=search&seid=15921726601957489746 第二季分布式 https://www.bilibili.com/video/BV13f4y127ee/?spm_id_fro ......

    uj5u.com 2020-09-10 05:35:24 more
  • 第三單元作業總結

    1.前言 這應該是本學期最后一次寫作業總結了吧。總體來說,對作業的節奏也差不多掌握了,作業做起來的效率也更高了。雖然和之前的作業一樣,作業中都要用到新的知識,但是相比之前,更加懂得了如何利用工具以及資料。雖然之間卡過殼,但總體而言,這幾次作業還算完成的比較好。 2.作業程序總結 相比前兩個單元,此單 ......

    uj5u.com 2020-09-10 05:35:41 more
  • 北航OO(2020)第四單元博客作業暨課程總結博客

    北航OO(2020)第四單元博客作業暨課程總結博客 本單元作業的架構設計 在本單元中,由于UML圖具有比較清晰的樹形結構,因此我對其中需要進行查詢操作的元素進行了包裝,在樹的父節點中存盤所有孩子的參考。考慮到性能問題,我采用了快取機制,一次查詢后盡可能快取已經遍歷過的資訊,以減少遍歷次數。 本單元我 ......

    uj5u.com 2020-09-10 05:35:48 more
  • BUAA_OO_第四單元

    一、UML決議器設計 ? 先看下題目:第四單元實作一個基于JDK 8帶有效性檢查的UML(Unified Modeling Language)類圖,順序圖,狀態圖分析器 MyUmlInteraction,實際上我們要建立一個有向圖模型,UML中的物件(元素)可能與同級元素連接,也可與低級元素相連形成 ......

    uj5u.com 2020-09-10 05:35:54 more
  • 6.1邏輯運算子

    邏輯運算子 1. && 短路與 運算式1 && 運算式2 01.運算式1為true并且運算式2也為true 整體回傳為true 02.運算式1為false,將不會執行運算式2 整體回傳為false 03.只要有一個運算式為false 整體回傳為false 2. || 短路或 運算式1 || 運算式2 ......

    uj5u.com 2020-09-10 05:35:56 more
  • BUAAOO 第四單元 & 課程總結

    1. 第四單元:StarUml檔案決議 本單元采用了圖模型決議UML。 UML檔案可以抽象為圖、子圖、邊的邏輯結構。 在實作中,圖的節點包括類、介面、屬性,子圖包括狀態圖、順序圖等。 采用了三次遍歷UML元素的方法建圖,第一遍遍歷建點,第二、三次遍歷設定屬性、連邊,實作圖物件的初始化。這里借鑒了一些 ......

    uj5u.com 2020-09-10 05:36:06 more
  • 談談我對C# 多型的理解

    面向物件三要素:封裝、繼承、多型。 封裝和繼承,這兩個比較好理解,但要理解多型的話,可就稍微有點難度了。今天,我們就來講講多型的理解。 我們應該經常會看到面試題目:請談談對多型的理解。 其實呢,多型非常簡單,就一句話:呼叫同一種方法產生了不同的結果。 具體實作方式有三種。 一、多載 多載很簡單。 p ......

    uj5u.com 2020-09-10 05:36:09 more
  • Python 資料驅動工具:DDT

    背景 python 的unittest 沒有自帶資料驅動功能。 所以如果使用unittest,同時又想使用資料驅動,那么就可以使用DDT來完成。 DDT是 “Data-Driven Tests”的縮寫。 資料:http://ddt.readthedocs.io/en/latest/ 使用方法 dd. ......

    uj5u.com 2020-09-10 05:36:13 more
  • Python里面的xlrd模塊詳解

    那我就一下面積個問題對xlrd模塊進行學習一下: 1.什么是xlrd模塊? 2.為什么使用xlrd模塊? 3.怎樣使用xlrd模塊? 1.什么是xlrd模塊? ?python操作excel主要用到xlrd和xlwt這兩個庫,即xlrd是讀excel,xlwt是寫excel的庫。 今天就先來說一下xl ......

    uj5u.com 2020-09-10 05:36:28 more
  • 當我們創建HashMap時,底層到底做了什么?

    jdk1.7中的底層實作程序(底層基于陣列+鏈表) 在我們new HashMap()時,底層創建了默認長度為16的一維陣列Entry[ ] table。當我們呼叫map.put(key1,value1)方法向HashMap里添加資料的時候: 首先,呼叫key1所在類的hashCode()計算key1 ......

    uj5u.com 2020-09-10 05:36:38 more
最新发布
  • 【中介者設計模式詳解】C/Java/JS/Go/Python/TS不同語言實作

    * 中介者模式是一種行為型設計模式,它可以用來減少類之間的直接依賴關系,
    * 將物件之間的通信封裝到一個中介者物件中,從而使得各個物件之間的關系更加松散。
    * 在中介者模式中,物件之間不再直接相互互動,而是通過中介者來中轉訊息。 ......

    uj5u.com 2023-04-20 08:20:47 more
  • 露天煤礦現場調研和交流案例分享

    他們集團的資訊化公司及研究院在一個礦區正在做智能礦山的統一平臺的 試點,專案投資大概1億,包括了礦山的各方面的內容,顯示得我們這次交流有點多余。他們2年前開始做智能礦山的規劃,有很多煤礦行業專家的加持,他們的描述是非常完美,但是去年底應該上線的平臺,現在還沒有看到影子。他們確實有很多場景需求,但是被... ......

    uj5u.com 2023-04-20 08:20:25 more
  • 《社區人員管理》實戰案例設計&個人案例分享

    設計是一個讓人夢想成真程序,開始編碼、測驗、除錯之前進行需求分析和架構設計,才能保證關鍵方面都做正確 ......

    uj5u.com 2023-04-20 08:20:17 more
  • 軟體架構生態化-多角色交付的探索實踐

    作為一個技術架構師,不僅僅要緊跟行業技術趨勢,還要結合研發團隊現狀及痛點,探索新的交付方案。在日常中,你是否遇到如下問題 “ 業務需求排期長研發是瓶頸;非研發角色感受不到研發技改提效的變化;引入ISV 團隊又擔心質量和安全,培訓周期長“等等,基于此我們探索了一種新的技術體系及交付方案來解決如上問題。 ......

    uj5u.com 2023-04-20 08:20:10 more
  • 【中介者設計模式詳解】C/Java/JS/Go/Python/TS不同語言實作

    * 中介者模式是一種行為型設計模式,它可以用來減少類之間的直接依賴關系,
    * 將物件之間的通信封裝到一個中介者物件中,從而使得各個物件之間的關系更加松散。
    * 在中介者模式中,物件之間不再直接相互互動,而是通過中介者來中轉訊息。 ......

    uj5u.com 2023-04-20 08:19:44 more
  • 露天煤礦現場調研和交流案例分享

    他們集團的資訊化公司及研究院在一個礦區正在做智能礦山的統一平臺的 試點,專案投資大概1億,包括了礦山的各方面的內容,顯示得我們這次交流有點多余。他們2年前開始做智能礦山的規劃,有很多煤礦行業專家的加持,他們的描述是非常完美,但是去年底應該上線的平臺,現在還沒有看到影子。他們確實有很多場景需求,但是被... ......

    uj5u.com 2023-04-20 08:19:07 more
  • 《社區人員管理》實戰案例設計&個人案例分享

    設計是一個讓人夢想成真程序,開始編碼、測驗、除錯之前進行需求分析和架構設計,才能保證關鍵方面都做正確 ......

    uj5u.com 2023-04-20 08:18:57 more
  • 軟體架構生態化-多角色交付的探索實踐

    作為一個技術架構師,不僅僅要緊跟行業技術趨勢,還要結合研發團隊現狀及痛點,探索新的交付方案。在日常中,你是否遇到如下問題 “ 業務需求排期長研發是瓶頸;非研發角色感受不到研發技改提效的變化;引入ISV 團隊又擔心質量和安全,培訓周期長“等等,基于此我們探索了一種新的技術體系及交付方案來解決如上問題。 ......

    uj5u.com 2023-04-20 08:18:49 more
  • 05單件模式

    #經典的單件模式 public class Singleton { private static Singleton uniqueInstance; //一個靜態變數持有Singleton類的唯一實體。 // 其他有用的實體變數寫在這里 //構造器宣告為私有,只有Singleton可以實體化這個類! ......

    uj5u.com 2023-04-19 08:42:51 more
  • 【架構與設計】常見微服務分層架構的區別和落地實踐

    軟體工程的方方面面都遵循一個最基本的道理:沒有銀彈,架構分層模型更是如此,每一種都有各自優缺點,所以請根據不同的業務場景,并遵循簡單、可演進這兩個重要的架構原則選擇合適的架構分層模型即可。 ......

    uj5u.com 2023-04-19 08:42:41 more