版本:3.1.0-SNAPSHOT git地址:https://github.com/apache/shardingsphere-elasticjob Maven 坐標
<dependency> <groupId>org.apache.shardingsphere.elasticjob</groupId> <artifactId>elasticjob-lite-spring-boot-starter</artifactId> <version>${latest.version}</version> </dependency>Spring.factories配置
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.apache.shardingsphere.elasticjob.lite.spring.boot.job.ElasticJobLiteAutoConfiguration
在添加elasticjob-lite-spring-boot-starter啟動類的時候,會自動加載ElasticJobLiteAutoConfiguration,接下來看下ElasticJobLiteAutoConfiguration中所做的處理,
ElasticJobLiteAutoConfiguration.java
/** * ElasticJob-Lite auto configuration. */ @Configuration(proxyBeanMethods = false) @AutoConfigureAfter(DataSourceAutoConfiguration.class) /** * elastic job 開關 * elasticjob.enabled.ture默認為true */ @ConditionalOnProperty(name = "elasticjob.enabled", havingValue = "https://www.cnblogs.com/lingyujuan/archive/2023/04/26/true", matchIfMissing = true) /** * 匯入 * ElasticJobRegistryCenterConfiguration.class 注冊中心配置 * ElasticJobTracingConfiguration.class job事件追蹤配置 * ElasticJobSnapshotServiceConfiguration.class 快照配置 */ @Import({ElasticJobRegistryCenterConfiguration.class, ElasticJobTracingConfiguration.class, ElasticJobSnapshotServiceConfiguration.class}) /** * job相關配置資訊 */ @EnableConfigurationProperties(ElasticJobProperties.class) public class ElasticJobLiteAutoConfiguration { @Configuration(proxyBeanMethods = false) /** * ElasticJobBootstrapConfiguration.class 創建job beans 注入spring容器 * ScheduleJobBootstrapStartupRunner.class 執行型別為ScheduleJobBootstrap.class 的job開始運行 */ @Import({ElasticJobBootstrapConfiguration.class, ScheduleJobBootstrapStartupRunner.class}) protected static class ElasticJobConfiguration { } }Elastic-job 是利用zookeeper 實作分布式job的功能,所以在自動裝配的時候,需要有zookeeper注冊中心的配置, 自動裝配主要做了4件事事 1.配置zookeeper 客戶端資訊,啟動連接zookeeper. 2.配置事件追蹤資料庫,用于保存job運行記錄 3.決議所有job組態檔,將所有job的bean放置在spring 單例bean中 4.識別job型別,在zookeeper節點上處理job節點資料,運行定時任務job. 第一件事:配置zookeeper 客戶端資訊,啟動連接zookeeper. ZookeeperRegistryCenter.class
public void init() { log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists()); CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() //設定zookeeper 服務器地址 .connectString(zkConfig.getServerLists()) //設定重試機制 .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds())) //設定命名空間,zookeeper節點名稱 .namespace(zkConfig.getNamespace()); //設定session超時時間 if (0 != zkConfig.getSessionTimeoutMilliseconds()) { builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds()); } //設定連接超時時間 if (0 != zkConfig.getConnectionTimeoutMilliseconds()) { builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds()); } if (!Strings.isNullOrEmpty(zkConfig.getDigest())) { builder.authorization("digest", zkConfig.getDigest().getBytes(StandardCharsets.UTF_8)) .aclProvider(new ACLProvider() { @Override public List<ACL> getDefaultAcl() { return ZooDefs.Ids.CREATOR_ALL_ACL; } @Override public List<ACL> getAclForPath(final String path) { return ZooDefs.Ids.CREATOR_ALL_ACL; } }); } client = builder.build(); //zookeeper 客戶端開始啟動 client.start(); try { //zookeeper 客戶端一直連接 if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) { client.close(); throw new KeeperException.OperationTimeoutException(); } //CHECKSTYLE:OFF } catch (final Exception ex) { //CHECKSTYLE:ON RegExceptionHandler.handleException(ex); } }
第二件事: 配置事件追蹤資料庫,用于保存job運行記錄
ElasticJobTracingConfiguration.java
/** * Create a bean of tracing DataSource. * * @param tracingProperties tracing Properties * @return tracing DataSource */ @Bean("tracingDataSource") //spring中注入bean name 為tracingDataSource的job資料庫連接資訊 public DataSource tracingDataSource(final TracingProperties tracingProperties) { //獲取elastic-job 資料庫配置 DataSourceProperties dataSource = tracingProperties.getDataSource(); if (dataSource == null) { return null; } HikariDataSource tracingDataSource = new HikariDataSource(); tracingDataSource.setJdbcUrl(dataSource.getUrl()); BeanUtils.copyProperties(dataSource, tracingDataSource); return tracingDataSource; } /** * Create a bean of tracing configuration. * * @param dataSource required by constructor * @param tracingDataSource tracing ataSource * @return a bean of tracing configuration */ @Bean @ConditionalOnBean(DataSource.class) @ConditionalOnProperty(name = "elasticjob.tracing.type", havingValue = "https://www.cnblogs.com/lingyujuan/archive/2023/04/26/RDB") public TracingConfiguration<DataSource> tracingConfiguration(final DataSource dataSource, @Nullable final DataSource tracingDataSource) { /** * dataSource 是業務資料庫 * tracingDataSource 是job資料庫 * 當配置elasticjob.tracing.type = RDB時,如果單獨配置job資料庫是,默認使用job資料庫作為job運行軌跡的記錄 * 但這邊同時業務資料庫和job追蹤資料庫同時注入是,mybatis-plus 結合@Table 使用的時候,很有可能找不到正確對應的資料源 */ DataSource ds = tracingDataSource; if (ds == null) { ds = dataSource; } return new TracingConfiguration<>("RDB", ds); }
通過elasticjob.tracing.type=RDB的配置開啟事件追蹤功能,這邊job的事件追蹤資料源可以和業務資料源配置不一樣,
第三件事:決議所有job組態檔
ElasticJobBootstrapConfiguration.class
public void createJobBootstrapBeans() { //獲取job配置 ElasticJobProperties elasticJobProperties = applicationContext.getBean(ElasticJobProperties.class); //獲取單利注冊物件 SingletonBeanRegistry singletonBeanRegistry = ((ConfigurableApplicationContext) applicationContext).getBeanFactory(); //獲取注入zookeeper 客戶端 CoordinatorRegistryCenter registryCenter = applicationContext.getBean(CoordinatorRegistryCenter.class); //獲取job事件追蹤 TracingConfiguration<?> tracingConfig = getTracingConfiguration(); //構造JobBootstraps constructJobBootstraps(elasticJobProperties, singletonBeanRegistry, registryCenter, tracingConfig); }
重要的是constructJobBootstraps 這個方法,來看下
private void constructJobBootstraps(final ElasticJobProperties elasticJobProperties, final SingletonBeanRegistry singletonBeanRegistry, final CoordinatorRegistryCenter registryCenter, final TracingConfiguration<?> tracingConfig) { //遍歷配置的每一個job for (Map.Entry<String, ElasticJobConfigurationProperties> entry : elasticJobProperties.getJobs().entrySet()) { ElasticJobConfigurationProperties jobConfigurationProperties = entry.getValue(); //校驗 job class 和 type 都為空拋例外 Preconditions.checkArgument(null != jobConfigurationProperties.getElasticJobClass() || !Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType()), "Please specific [elasticJobClass] or [elasticJobType] under job configuration."); //校驗 job class 和 type 都有 報相互排斥 Preconditions.checkArgument(null == jobConfigurationProperties.getElasticJobClass() || Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType()), "[elasticJobClass] and [elasticJobType] are mutually exclusive."); if (null != jobConfigurationProperties.getElasticJobClass()) { //通過class 注入job registerClassedJob(entry.getKey(), entry.getValue().getJobBootstrapBeanName(), singletonBeanRegistry, registryCenter, tracingConfig, jobConfigurationProperties); } else if (!Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType())) { //通過type 注入job registerTypedJob(entry.getKey(), entry.getValue().getJobBootstrapBeanName(), singletonBeanRegistry, registryCenter, tracingConfig, jobConfigurationProperties); } } }Job 有兩種型別的注入,第一種是是class,配置成job的全路徑資訊注入 再來看看registerClassedJob 方法里的內容
private void registerClassedJob(final String jobName, final String jobBootstrapBeanName, final SingletonBeanRegistry singletonBeanRegistry, final CoordinatorRegistryCenter registryCenter, final TracingConfiguration<?> tracingConfig, final ElasticJobConfigurationProperties jobConfigurationProperties) { //獲取job配置 JobConfiguration jobConfig = jobConfigurationProperties.toJobConfiguration(jobName); //配置job事件追蹤 jobExtraConfigurations(jobConfig, tracingConfig); //獲取job型別 ElasticJob elasticJob = applicationContext.getBean(jobConfigurationProperties.getElasticJobClass()); //沒有配置cron運算式 就初始化為OneOffJobBootstrap物件,一次性任務 if (Strings.isNullOrEmpty(jobConfig.getCron())) { Preconditions.checkArgument(!Strings.isNullOrEmpty(jobBootstrapBeanName), "The property [jobBootstrapBeanName] is required for One-off job."); singletonBeanRegistry.registerSingleton(jobBootstrapBeanName, new OneOffJobBootstrap(registryCenter, elasticJob, jobConfig)); } else { //有配置cron運算式 就初始化為ScheduleJobBootstrap物件,定時任務 //設定bean name String beanName = !Strings.isNullOrEmpty(jobBootstrapBeanName) ? jobBootstrapBeanName : jobConfig.getJobName() + "ScheduleJobBootstrap"; //注入ScheduleJobBootstrap物件為單利物件 singletonBeanRegistry.registerSingleton(beanName, new ScheduleJobBootstrap(registryCenter, elasticJob, jobConfig)); } }Class 型別注入的job有兩種型別 1.ScheduleJobBootstrap:定時任務型別的job, 2.OneOffJobBootstrap:一定次job型別, 看下定義的new ScheduleJobBootstrap 方法
public JobScheduler(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig) { Preconditions.checkArgument(null != elasticJob, "Elastic job cannot be null."); this.regCenter = regCenter; //獲取job監聽器 Collection<ElasticJobListener> jobListeners = getElasticJobListeners(jobConfig); // 集成所有操作zookeeper 節點的services,job 監聽器 setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(), jobListeners); //獲取當前job名稱 String jobClassName = JobClassNameProviderFactory.getProvider().getJobClassName(elasticJob); //zookeeper節點 {namespace}/{jobclassname}/config 放置job配置資訊 this.jobConfig = setUpFacade.setUpJobConfiguration(jobClassName, jobConfig); // 集成所有操作zookeeper 節點的services schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName()); jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), jobListeners, findTracingConfiguration().orElse(null)); //檢驗job配置 validateJobProperties(); //定義job執行器 jobExecutor = new ElasticJobExecutor(elasticJob, this.jobConfig, jobFacade); //監聽器里注入GuaranteeService setGuaranteeServiceForElasticJobListeners(regCenter, jobListeners); //創建定時任務,開始執行 jobScheduleController = createJobScheduleController(); }
看下createJobScheduleController
private JobScheduleController createJobScheduleController() { JobScheduleController result = new JobScheduleController(createScheduler(), createJobDetail(), getJobConfig().getJobName()); //注冊job JobRegistry.getInstance().registerJob(getJobConfig().getJobName(), result); //注冊器開始運行 registerStartUpInfo(); return result; }
看下registerStartUpInfo方法
public void registerStartUpInfo(final boolean enabled) { //開始所有的監聽器 listenerManager.startAllListeners(); //選舉leader /{namespace}/leader/election/instance 放置選舉出來的服務器 leaderService.electLeader(); //{namespace}/{ipservers} 設定enable處理 serverService.persistOnline(enabled); //臨時節點 /{namespave}/instances 放置運行服務實體資訊 instanceService.persistOnline(); //開啟一個異步服務 if (!reconcileService.isRunning()) { reconcileService.startAsync(); } }這里實行的操作: 1.開啟所有監聽器處理 2.leader選舉 3.持久化節點資料 4.開啟異步服務 第四步:4.識別job型別,在zookeeper節點上處理job節點資料,運行定時任務job.
@Override public void run(final String... args) { log.info("Starting ElasticJob Bootstrap."); applicationContext.getBeansOfType(ScheduleJobBootstrap.class).values().forEach(ScheduleJobBootstrap::schedule); log.info("ElasticJob Bootstrap started."); }獲取到所有的定時任務job(ScheduleJobBootstrap型別),執行schedule方法,底層實際使用quartz框架運行定時任務,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/551286.html
標籤:其他
上一篇:java 多執行緒的start()和run()的理解
下一篇:返回列表