logo头像

生而无畏,战至终章

Elastic_Job简介和使用

本文于426天之前发表,文中内容可能已经过时。

Elastic-Job介绍

Elastic-Job是当当开源分布式调度解决方案,功能非常丰富,支持任务分片,能充分利用资源,这和之前介绍Quartz所不能做到的地方。Elastic-Job由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-CLoud组成,一般来说我们使用Elastic-Job-Lite就能够满足我们的需求,Elastic-Job的官方教程写的很详细,参见,本片文章主要介绍Elastic-Job_lite,本文做使用的Elastic-Job版本为2.1.5

Elastic-Job特点

Elastic-Job具有分布式调度,作业高可用、任务分片以及定制化流程等特点,具体说明如下:

  1. 分布式调度
    Elastic-Job重写了Quartz基于数据库的分布式功能,改用Zookeeper实现注册中心,其作用仅用于作业注册和监控存储,而主作业点仅用于处理分片和清理等

  2. 作业高可用
    Elastic-Job提供了非常安全的执行作业方式,如果将分片总数设置为1,并使用大于1台的服务器执行作业,则作业会按照以1主n从的方式执行,一旦执行作业的服务器崩溃,等待执行的服务器将在下次作业启动时自动替补执行,后面的示例会展示

  3. 任务分片执行
    Elastic-Job提供了更灵活且更强大的作业吞吐量方式,就是任务的分布式执行,它将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项
    在实际使用过程中,我们通常将分片项设置为大于服务器的数量,最好是大于服务器倍数的数量。
    例如,有3台服务器,被分成10片,那分片的结果可能为:服务器A=0,1,2,服务器B=3,4,5,服务器C=6,7,8,9,若服务器C崩溃了,则分片项的分配结果为:服务器A=0,1,2,3,4,服务器B=5,6,7,8,9

  4. 定制化流程任务
    Elastic-Job提供了简单类型的和数据流类型的两种模式,简单类型的实现SimpleJob接口,数据流类型实现DataflowJob接口,后面会介绍
    数据流又分为高吞吐处理模式和顺序性处理模式,其中高吞吐处理模式可以开启足够多的线程快速的处理数据,而顺序性处理模式将每个分片项分配到一个独立的线程上,用于保证同一分片的顺序性

Elastic-Job实现

首先我们分两种方式来实现,先介绍第一种

1. 通过API的方式实现

  • maven引入相关jar包

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-core</artifactId>
    <version>5.0.2.RELEASE</version>
    </dependency>
    <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>5.0.2.RELEASE</version>
    </dependency>
    <dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-core</artifactId>
    <version>2.1.5</version>
    </dependency>
  • 定义任务类

1
2
3
4
5
6
public class HelloElasticJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
System.out.println(String.format("---Thread ID: %s,任务总片数:%s,当前分片项:%s---",
Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem()));
}
  • 使用API接口配置作业信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 配置Zookeeper配置中心
*/
private static CoordinatorRegistryCenter createRegistryCenter() {
CoordinatorRegistryCenter coordinatorRegistryCenter =
new ZookeeperRegistryCenter(new ZookeeperConfiguration("127.0.0.1:2181","helloElasticJob"));
coordinatorRegistryCenter.init();
return coordinatorRegistryCenter;

}
/**
* 创建作业配置
* @return
*/
private static LiteJobConfiguration createJobConfiguration() {
//定义作业的核心配置
JobCoreConfiguration coreConfiguration = JobCoreConfiguration.newBuilder("helloElasticJob", "0/10 * * * * ?", 1).build();
//定义simple类型的配置
SimpleJobConfiguration jobConfiguration = new SimpleJobConfiguration(coreConfiguration, HelloElasticJob.class.getCanonicalName());
//定义Lite作业配置
LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(jobConfiguration).build();
return liteJobConfiguration;
}
  • 创建测试类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class App {
public static void main(String[] args) {
new JobScheduler(createRegistryCenter(),createJobConfiguration()).init();
}

/**
* 配置Zookeeper配置中心
*/
private static CoordinatorRegistryCenter createRegistryCenter() {
CoordinatorRegistryCenter coordinatorRegistryCenter =
new ZookeeperRegistryCenter(new ZookeeperConfiguration("127.0.0.1:2181","helloElasticJob"));
coordinatorRegistryCenter.init();
return coordinatorRegistryCenter;

}
/**
* 创建作业配置
* @return
*/
private static LiteJobConfiguration createJobConfiguration() {
//定义作业的核心配置
JobCoreConfiguration coreConfiguration = JobCoreConfiguration.newBuilder("helloElasticJob", "0/10 * * * * ?", 1).build();
//定义simple类型的配置
SimpleJobConfiguration jobConfiguration = new SimpleJobConfiguration(coreConfiguration, HelloElasticJob.class.getCanonicalName());
//定义Lite作业配置
LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(jobConfiguration).build();
return liteJobConfiguration;
}
}

在启动Zookeeper后,运行就能看到效果

1
2
3
4
5
6
7
8
9
10
11
---Thread ID: 18,任务总片数:1,当前分片项:0---
---Thread ID: 18,任务总片数:1,当前分片项:0---
---Thread ID: 18,任务总片数:1,当前分片项:0---
---Thread ID: 18,任务总片数:1,当前分片项:0---
---Thread ID: 18,任务总片数:1,当前分片项:0---
---Thread ID: 18,任务总片数:1,当前分片项:0---
---Thread ID: 18,任务总片数:1,当前分片项:0---
---Thread ID: 18,任务总片数:1,当前分片项:0---
---Thread ID: 18,任务总片数:1,当前分片项:0---
---Thread ID: 18,任务总片数:1,当前分片项:0---
---Thread ID: 18,任务总片数:1,当前分片项:0---

2. 通过与Spring集成方式实现

  • 引入Mavne配置文件,在原pom文件中只需引入下面的依赖即可
1
2
3
4
5
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>
  • 在spring配置文件中配置作业spring-elasticJob.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
xmlns:job="http://www.dangdang.com/schema/ddframe/job"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd">

<!--配置作业注册中心-->
<reg:zookeeper id="regCenter" server-lists="127.0.0.1:2181" namespace="hello" base-sleep-time-milliseconds="3000" max-retries="3"/>

<!--配置作业-->
<job:simple id="helloElasticJob" class="top.starlin.elasticJobDemo.HelloElasticJob"
registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3"/>
</beans>
  • 启动作业
1
2
3
4
5
6
7
public class App {
public static void main(String[] args) throws IOException {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring-elasticJob.xml"});
context.start();
System.in.read();
}
}

单台启动的效果如下:

1
2
3
4
5
6
---Thread ID: 28,任务总片数:3,当前分片项:1---
---Thread ID: 27,任务总片数:3,当前分片项:0---
---Thread ID: 29,任务总片数:3,当前分片项:2---
---Thread ID: 30,任务总片数:3,当前分片项:0---
---Thread ID: 31,任务总片数:3,当前分片项:1---
---Thread ID: 32,任务总片数:3,当前分片项:2---

为了模拟分布式环境,我将此工程copy一份后,两台同时启动后的效果,当然我们也可以利用虚拟机来部署

第一个工程,启动结果

1
2
3
4
---Thread ID: 18,任务总片数:3,当前分片项:1---
---Thread ID: 18,任务总片数:3,当前分片项:1---
---Thread ID: 18,任务总片数:3,当前分片项:1---
---Thread ID: 18,任务总片数:3,当前分片项:1---

第二个工程,启动结果:

1
2
3
4
5
6
7
8
---Thread ID: 28,任务总片数:3,当前分片项:2---
---Thread ID: 27,任务总片数:3,当前分片项:0---
---Thread ID: 29,任务总片数:3,当前分片项:0---
---Thread ID: 30,任务总片数:3,当前分片项:2---
---Thread ID: 32,任务总片数:3,当前分片项:2---
---Thread ID: 31,任务总片数:3,当前分片项:0---
---Thread ID: 34,任务总片数:3,当前分片项:2---
---Thread ID: 33,任务总片数:3,当前分片项:0---

我们能够发现,一共3个任务片数,分了2台机器来执行

如果将正在启动中的一台关闭(模拟服务器崩溃的情况),会怎样了
结果显示会自动替补执行

1
2
3
4
5
6
7
---Thread ID: 18,任务总片数:3,当前分片项:1---
---Thread ID: 18,任务总片数:3,当前分片项:1---
---Thread ID: 18,任务总片数:3,当前分片项:1---

---Thread ID: 27,任务总片数:3,当前分片项:0---
---Thread ID: 28,任务总片数:3,当前分片项:1---
---Thread ID: 29,任务总片数:3,当前分片项:2---

Elastic-Job作业类型

Elastic-Job作业类型分为3种,包括我们前面提到的SimpleJob,DataFlowJob这2种,还有一种为Script作业类型

  1. Simple作业类型
    简单实现的未经过任何封装的类型作业,需要实现SimpleJob接口,该接口提供了一个excute方法用于作业调度,它与原始的Quartz原生接口类似,但提供了可伸缩任务和任务分片功能

  2. DataFlow作业类型
    用于处理数据流,需要实现DataFlowJob接口,该接口提供了2个方法,分别用于抓取(fetchData)数据和处理(processData)数据,可以通过DataflowJobConfiguration配置是否流式处理。
    需要注意的是,一旦开启的流式处理,只有fetchData方法的返回值为null或者集合的长度为空时,作业才会停止,否则作业会一直执行下去,非流式的处理则只会在每次作业执行的过程中执行一次fetchData和processData方法

如果采用DataFlow类型作业处理,建议processData在处理数据后更新其状态,避免fetchData再次抓取到重复的数据,使得作业永远执行下去

伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class HelloJob implements DataflowJob {
@Override
public List<User> fetchData(ShardingContext shardingContext) {
List<User> users = null;
/** get users from database **/
return users;
}

@Override
public void processData(ShardingContext shardingContext, List<User> list) {
for (User user : data) {
System.out.println("");
user.setStatus(1);
/**
* update user
*/

}
}
}
  1. Script作业类型
    为脚本作业类型,支持Shell,Python,Perl等所有类型的脚本,这里不多介绍,平时用的也少,感兴趣的童鞋可以参考官网

其他

上述介绍的是最精简常用的功能。elastic-job的功能集还不止这些,比如像作业事件追踪、任务监听等,另外,elastic-job-lite-console作为一个独立的运维平台还提供了用来查询和操作任务的web页面(这里就不介绍了)