主頁 > 後端開發 > elastic-job原始碼(1)- job自動裝配

elastic-job原始碼(1)- job自動裝配

2023-04-27 07:29:24 後端開發

版本:3.1.0-SNAPSHOT git地址:https://github.com/apache/shardingsphere-elasticjob   Maven 坐標
<dependency>
    <groupId>org.apache.shardingsphere.elasticjob</groupId>
    <artifactId>elasticjob-lite-spring-boot-starter</artifactId>
    <version>${latest.version}</version>
</dependency>
  Spring.factories配置
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  org.apache.shardingsphere.elasticjob.lite.spring.boot.job.ElasticJobLiteAutoConfiguration

在添加elasticjob-lite-spring-boot-starter啟動類的時候,會自動加載ElasticJobLiteAutoConfiguration,接下來看下ElasticJobLiteAutoConfiguration中所做的處理,   ElasticJobLiteAutoConfiguration.java
/**
 * ElasticJob-Lite auto configuration.
 */
@Configuration(proxyBeanMethods = false)
@AutoConfigureAfter(DataSourceAutoConfiguration.class)


/**
 * elastic job 開關
 * elasticjob.enabled.ture默認為true
 */
@ConditionalOnProperty(name = "elasticjob.enabled", havingValue = "https://www.cnblogs.com/lingyujuan/archive/2023/04/26/true", matchIfMissing = true)


/**
 * 匯入
 * ElasticJobRegistryCenterConfiguration.class 注冊中心配置
 * ElasticJobTracingConfiguration.class job事件追蹤配置
 * ElasticJobSnapshotServiceConfiguration.class 快照配置
 */
@Import({ElasticJobRegistryCenterConfiguration.class, ElasticJobTracingConfiguration.class, ElasticJobSnapshotServiceConfiguration.class})


/**
 * job相關配置資訊
 */
@EnableConfigurationProperties(ElasticJobProperties.class)
public class ElasticJobLiteAutoConfiguration {
    
    @Configuration(proxyBeanMethods = false)
    /**
     * ElasticJobBootstrapConfiguration.class  創建job beans 注入spring容器
     * ScheduleJobBootstrapStartupRunner.class  執行型別為ScheduleJobBootstrap.class 的job開始運行
     */
    @Import({ElasticJobBootstrapConfiguration.class, ScheduleJobBootstrapStartupRunner.class})
    protected static class ElasticJobConfiguration {
    }
}

Elastic-job 是利用zookeeper 實作分布式job的功能,所以在自動裝配的時候,需要有zookeeper注冊中心的配置, 自動裝配主要做了4件事事 1.配置zookeeper 客戶端資訊,啟動連接zookeeper. 2.配置事件追蹤資料庫,用于保存job運行記錄 3.決議所有job組態檔,將所有job的bean放置在spring 單例bean中 4.識別job型別,在zookeeper節點上處理job節點資料,運行定時任務job.   第一件事:配置zookeeper 客戶端資訊,啟動連接zookeeper. ZookeeperRegistryCenter.class
public void init() {
    log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists());
    CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
            //設定zookeeper 服務器地址
            .connectString(zkConfig.getServerLists())
            //設定重試機制
            .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds()))
            //設定命名空間,zookeeper節點名稱
            .namespace(zkConfig.getNamespace());
    //設定session超時時間
    if (0 != zkConfig.getSessionTimeoutMilliseconds()) {
        builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds());
    }
    //設定連接超時時間
    if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {
        builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds());
    }
    if (!Strings.isNullOrEmpty(zkConfig.getDigest())) {
        builder.authorization("digest", zkConfig.getDigest().getBytes(StandardCharsets.UTF_8))
                .aclProvider(new ACLProvider() {
                
                    @Override
                    public List<ACL> getDefaultAcl() {
                        return ZooDefs.Ids.CREATOR_ALL_ACL;
                    }
                
                    @Override
                    public List<ACL> getAclForPath(final String path) {
                        return ZooDefs.Ids.CREATOR_ALL_ACL;
                    }
                });
    }
    client = builder.build();
    //zookeeper 客戶端開始啟動
    client.start();
    try {
        //zookeeper 客戶端一直連接
        if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {
            client.close();
            throw new KeeperException.OperationTimeoutException();
        }
        //CHECKSTYLE:OFF
    } catch (final Exception ex) {
        //CHECKSTYLE:ON
        RegExceptionHandler.handleException(ex);
    }
}

 

第二件事: 配置事件追蹤資料庫,用于保存job運行記錄

ElasticJobTracingConfiguration.java

 

/**
 * Create a bean of tracing DataSource.
 *
 * @param tracingProperties tracing Properties
 * @return tracing DataSource
 */
@Bean("tracingDataSource")
//spring中注入bean name 為tracingDataSource的job資料庫連接資訊
public DataSource tracingDataSource(final TracingProperties tracingProperties) {
    //獲取elastic-job 資料庫配置
    DataSourceProperties dataSource = tracingProperties.getDataSource();
    if (dataSource == null) {
        return null;
    }
    HikariDataSource tracingDataSource = new HikariDataSource();
    tracingDataSource.setJdbcUrl(dataSource.getUrl());
    BeanUtils.copyProperties(dataSource, tracingDataSource);
    return tracingDataSource;
}


/**
 * Create a bean of tracing configuration.
 *
 * @param dataSource required by constructor
 * @param tracingDataSource tracing ataSource
 * @return a bean of tracing configuration
 */
@Bean
@ConditionalOnBean(DataSource.class)
@ConditionalOnProperty(name = "elasticjob.tracing.type", havingValue = "https://www.cnblogs.com/lingyujuan/archive/2023/04/26/RDB")
public TracingConfiguration<DataSource> tracingConfiguration(final DataSource dataSource, @Nullable final DataSource tracingDataSource) {
    /**
     * dataSource 是業務資料庫
     * tracingDataSource 是job資料庫
     * 當配置elasticjob.tracing.type = RDB時,如果單獨配置job資料庫是,默認使用job資料庫作為job運行軌跡的記錄
     * 但這邊同時業務資料庫和job追蹤資料庫同時注入是,mybatis-plus 結合@Table 使用的時候,很有可能找不到正確對應的資料源
     */
    DataSource ds = tracingDataSource;
    if (ds == null) {
        ds = dataSource;
    }
    return new TracingConfiguration<>("RDB", ds);
}

 

通過elasticjob.tracing.type=RDB的配置開啟事件追蹤功能,這邊job的事件追蹤資料源可以和業務資料源配置不一樣,

 

第三件事:決議所有job組態檔

ElasticJobBootstrapConfiguration.class

 

public void createJobBootstrapBeans() {
    //獲取job配置
    ElasticJobProperties elasticJobProperties = applicationContext.getBean(ElasticJobProperties.class);
    //獲取單利注冊物件
    SingletonBeanRegistry singletonBeanRegistry = ((ConfigurableApplicationContext) applicationContext).getBeanFactory();
    //獲取注入zookeeper 客戶端
    CoordinatorRegistryCenter registryCenter = applicationContext.getBean(CoordinatorRegistryCenter.class);
    //獲取job事件追蹤
    TracingConfiguration<?> tracingConfig = getTracingConfiguration();
    //構造JobBootstraps
    constructJobBootstraps(elasticJobProperties, singletonBeanRegistry, registryCenter, tracingConfig);
}

重要的是constructJobBootstraps 這個方法,來看下

private void constructJobBootstraps(final ElasticJobProperties elasticJobProperties, final SingletonBeanRegistry singletonBeanRegistry,
                                    final CoordinatorRegistryCenter registryCenter, final TracingConfiguration<?> tracingConfig) {
    //遍歷配置的每一個job
    for (Map.Entry<String, ElasticJobConfigurationProperties> entry : elasticJobProperties.getJobs().entrySet()) {
        ElasticJobConfigurationProperties jobConfigurationProperties = entry.getValue();
        //校驗 job class 和 type 都為空拋例外
        Preconditions.checkArgument(null != jobConfigurationProperties.getElasticJobClass()
                        || !Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType()),
                "Please specific [elasticJobClass] or [elasticJobType] under job configuration.");
        //校驗 job class 和 type 都有 報相互排斥
        Preconditions.checkArgument(null == jobConfigurationProperties.getElasticJobClass()
                        || Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType()),
                "[elasticJobClass] and [elasticJobType] are mutually exclusive.");


        if (null != jobConfigurationProperties.getElasticJobClass()) {
            //通過class 注入job
            registerClassedJob(entry.getKey(), entry.getValue().getJobBootstrapBeanName(), singletonBeanRegistry, registryCenter, tracingConfig, jobConfigurationProperties);
        } else if (!Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType())) {
            //通過type 注入job
            registerTypedJob(entry.getKey(), entry.getValue().getJobBootstrapBeanName(), singletonBeanRegistry, registryCenter, tracingConfig, jobConfigurationProperties);
        }
    }
}

Job 有兩種型別的注入,第一種是是class,配置成job的全路徑資訊注入   再來看看registerClassedJob 方法里的內容
private void registerClassedJob(final String jobName, final String jobBootstrapBeanName, final SingletonBeanRegistry singletonBeanRegistry, final CoordinatorRegistryCenter registryCenter,
                                final TracingConfiguration<?> tracingConfig, final ElasticJobConfigurationProperties jobConfigurationProperties) {
    //獲取job配置
    JobConfiguration jobConfig = jobConfigurationProperties.toJobConfiguration(jobName);
    //配置job事件追蹤
    jobExtraConfigurations(jobConfig, tracingConfig);
    //獲取job型別
    ElasticJob elasticJob = applicationContext.getBean(jobConfigurationProperties.getElasticJobClass());
    //沒有配置cron運算式 就初始化為OneOffJobBootstrap物件,一次性任務
    if (Strings.isNullOrEmpty(jobConfig.getCron())) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(jobBootstrapBeanName), "The property [jobBootstrapBeanName] is required for One-off job.");
        singletonBeanRegistry.registerSingleton(jobBootstrapBeanName, new OneOffJobBootstrap(registryCenter, elasticJob, jobConfig));
    } else {
        //有配置cron運算式 就初始化為ScheduleJobBootstrap物件,定時任務
        //設定bean name
        String beanName = !Strings.isNullOrEmpty(jobBootstrapBeanName) ? jobBootstrapBeanName : jobConfig.getJobName() + "ScheduleJobBootstrap";
        //注入ScheduleJobBootstrap物件為單利物件
        singletonBeanRegistry.registerSingleton(beanName, new ScheduleJobBootstrap(registryCenter, elasticJob, jobConfig));
    }
}

Class 型別注入的job有兩種型別 1.ScheduleJobBootstrap:定時任務型別的job, 2.OneOffJobBootstrap:一定次job型別,   看下定義的new ScheduleJobBootstrap 方法
public JobScheduler(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig) {
    Preconditions.checkArgument(null != elasticJob, "Elastic job cannot be null.");
    this.regCenter = regCenter;
    //獲取job監聽器
    Collection<ElasticJobListener> jobListeners = getElasticJobListeners(jobConfig);
    // 集成所有操作zookeeper 節點的services,job 監聽器
    setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(), jobListeners);
    //獲取當前job名稱
    String jobClassName = JobClassNameProviderFactory.getProvider().getJobClassName(elasticJob);
    //zookeeper節點 {namespace}/{jobclassname}/config 放置job配置資訊
    this.jobConfig = setUpFacade.setUpJobConfiguration(jobClassName, jobConfig);
    // 集成所有操作zookeeper 節點的services
    schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName());
    jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), jobListeners, findTracingConfiguration().orElse(null));
    //檢驗job配置
    validateJobProperties();
    //定義job執行器
    jobExecutor = new ElasticJobExecutor(elasticJob, this.jobConfig, jobFacade);
    //監聽器里注入GuaranteeService
    setGuaranteeServiceForElasticJobListeners(regCenter, jobListeners);
    //創建定時任務,開始執行
    jobScheduleController = createJobScheduleController();
}

 

看下createJobScheduleController

private JobScheduleController createJobScheduleController() {
    JobScheduleController result = new JobScheduleController(createScheduler(), createJobDetail(), getJobConfig().getJobName());
    //注冊job
    JobRegistry.getInstance().registerJob(getJobConfig().getJobName(), result);
    //注冊器開始運行
    registerStartUpInfo();
    return result;
}

看下registerStartUpInfo方法

public void registerStartUpInfo(final boolean enabled) {
    //開始所有的監聽器
    listenerManager.startAllListeners();
    //選舉leader /{namespace}/leader/election/instance 放置選舉出來的服務器
    leaderService.electLeader();
    //{namespace}/{ipservers} 設定enable處理
    serverService.persistOnline(enabled);
    //臨時節點   /{namespave}/instances 放置運行服務實體資訊
    instanceService.persistOnline();
    //開啟一個異步服務
    if (!reconcileService.isRunning()) {
        reconcileService.startAsync();
    }
}

這里實行的操作: 1.開啟所有監聽器處理 2.leader選舉 3.持久化節點資料 4.開啟異步服務   第四步:4.識別job型別,在zookeeper節點上處理job節點資料,運行定時任務job.  
@Override
public void run(final String... args) {
    log.info("Starting ElasticJob Bootstrap.");
    applicationContext.getBeansOfType(ScheduleJobBootstrap.class).values().forEach(ScheduleJobBootstrap::schedule);
    log.info("ElasticJob Bootstrap started.");
}

獲取到所有的定時任務job(ScheduleJobBootstrap型別),執行schedule方法,底層實際使用quartz框架運行定時任務,          

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/551286.html

標籤:其他

上一篇:java 多執行緒的start()和run()的理解

下一篇:返回列表

標籤雲
其他(158170) Python(38107) JavaScript(25394) Java(18001) C(15217) 區塊鏈(8260) C#(7972) AI(7469) 爪哇(7425) MySQL(7148) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5870) 数组(5741) R(5409) Linux(5329) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4562) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2431) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1960) Web開發(1951) HtmlCss(1927) python-3.x(1918) 弹簧靴(1913) C++(1912) xml(1889) PostgreSQL(1874) .NETCore(1855) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • elastic-job原始碼(1)- job自動裝配

    版本:3.1.0-SNAPSHOT git地址:https://github.com/apache/shardingsphere-elasticjob Maven 坐標 <dependency> <groupId>org.apache.shardingsphere.elasticjob</group ......

    uj5u.com 2023-04-27 07:29:24 more
  • java 多執行緒的start()和run()的理解

    run()方法中是各個執行緒要執行的具體內容。所以當一個執行緒直接呼叫run()時那么直接開始執行方法體,這是在main執行緒中的多個執行緒只能時按照順序的等待前面的執行緒結束run()方法的執行。 而呼叫start方法只是執行緒進入準備階段(Ready),并沒有真正執行,這需要JVM進行分配時間片進行輪轉執行緒 ......

    uj5u.com 2023-04-27 07:29:20 more
  • 面向物件可視化工具:UML類圖

    1. UML類圖 UML(Unified Modeling Language,統一建模語言),用來描述軟體模型和架構的圖形化語言。 常用的UML工具軟體有PowerDesinger、Rose和Enterprise Architect。 UML工具軟體不僅可以繪制軟體開發中所需的各種圖表,還可以生成對 ......

    uj5u.com 2023-04-27 07:29:14 more
  • Java中關于String類以及字串拼接的問題

    String類部分原始碼 //被final修飾不可被繼承 public final class String implements java.io.Serializable, Comparable<String>, CharSequence { //String維護char[] 所以不可修改 priv ......

    uj5u.com 2023-04-27 07:29:06 more
  • Python生成亂數的一個標準庫-random

    1.介紹 Random庫Python中用于生成亂數的一個標準庫。計算機沒有辦法產生真正的亂數,但它可以產生偽亂數。 偽亂數是計算機按照一定的運算規則產生的一些資料,只不過這些資料表現為亂數的形式。計算機中采用梅森旋轉演算法生成為隨機序列,序列中的每一個元素就是偽亂數,由于計算機不能產生真正 ......

    uj5u.com 2023-04-27 07:29:01 more
  • go中 for回圈的坑

    go中 for回圈的坑 在使用for回圈修改結構體切片中的值時,發現并沒有修改成功。 type Dog struct { name string } func (d *Dog) setNewName(name string) { d.name = name } func main() { d := ......

    uj5u.com 2023-04-27 07:28:56 more
  • sourceTree合并一次提交的內容

    sourceTree合并一次提交的內容 在基于git的開發中,經常遇到不同分支需要合并某一次特定的提交的代碼,而不是合并整個代碼。 場景:A分支是通用分支,B分支是私有化分支,現在A分支修改了一個通用的功能,需要合并到B分支上,功能在一次提交上。B分支只需要這次提交的代碼,對A分支上改動的其他代碼都 ......

    uj5u.com 2023-04-27 07:28:49 more
  • 16例外處理

    例外處理 例外 例外即是一個事件,該事件會在程式執行程序中發生,影響了程式的正常執行。 一般情況下,在Python無法正常處理程式時就會發生一個例外。 例外是Python物件,表示一個錯誤。 當Python腳本發生例外時我們需要捕獲處理它,否則程式會終止執行。 捕獲例外 例外型別捕獲 # 捕獲常規異 ......

    uj5u.com 2023-04-27 07:28:42 more
  • Java中抽象類和介面的區別?

    什么是抽象類? 抽象類是對具體概念的抽象 抽象類本質是為了繼承 只能被public或默認修飾 行為層面抽象出來抽象方法 抽象類的注意事項 抽象類不可以被直接實體化 抽象類中可以存在構造方法 抽象類可以存在普通方法 抽象方法的注意 抽象方法必須定義在抽象類中 僅宣告 實作需要交給子類 抽象方法不能用p ......

    uj5u.com 2023-04-27 07:28:38 more
  • Java8 教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Java 8 (又稱為 jdk 1.8) 是 Java 語言開發的一個主要版本。 Java 8 是oracle公司于2014年3月發布,可以看成是自Java 5 以來最具革命性的版本。Java 8為Java語言、編譯器、類別庫、開發工具與JVM帶來了大量新特性。 Java 8入門教程 - 從 ......

    uj5u.com 2023-04-27 07:28:33 more