任务调度-使用quartz

文章目标:

1、了解任务调度的应用场景和 Quartz 的基本特性

2、掌握 Quartz Java 编程和 Spring 集成的使用

3、掌握 Quartz 动态调度和集群部署的实现

1、任务调度场景

在业务系统中有很多这样的场景:

1、账单日或者还款日上午 10 点,给每个信用卡客户发送账单通知,还款通知。如 何判断客户的账单日、还款日,完成通知的发送

2、银行业务系统,夜间要完成跑批的一系列流程,清理数据,下载文件,解析文件, 对账清算、切换结算日期等等。如何触发一系列流程的执行

3、金融机构跟人民银行二代支付系统对接,人民银行要求低于 5W 的金额(小额支付)半个小时打一次包发送,以缓解并发压力。所以,银行的跨行转账分成了多个流程: 录入、复核、发送。如何把半个小时以内的所有数据一次性发送

类似于这种

  • 基于准确的时刻或者固定的时间间隔触发的任务
  • 有批量数据需要处理
  • 要实现两个动作解耦的场景

我们都可以用任务调度来实现。

2、任务调度需求分析

  1. 可以定义触发的规则,比如基于时刻、时间间隔、表达式。
  2. 可以定义需要执行的任务。比如执行一个脚本或者一段代码。任务和规则是 分开的。
  3. 集中管理配置,持久配置。不用把规则写在代码里面,可以看到所有的任务 配置,方便维护。重启之后任务可以再次调度——配置文件或者配置中心。
  4. 支持任务的串行执行,例如执行 A 任务后再执行 B 任务再执行 C 任务。
  5. 支持多个任务并发执行,互不干扰(例如 ScheduledThreadPoolExecutor)。
  6. 有自己的调度器,可以启动、中断、停止任务。
  7. 容易集成到 Spring。

3、任务调度工具和框架

  • Linux crontab,Windows 计划任务

  • MySQL、Oracle

  • Kettle

  • jdk自带的Timer、ScheduledThreadPool

  • Spring Task--@Scheduled 也是使用ScheduledThreadPool实现的
  • quartz,XXL-JOB,Elastic-Job

4、Quartz使用入门

4.1 基本介绍

官网

Quartz 的意思是石英,像石英表一样精确。

Quartz 是一个老牌的任务调度系统,98 年构思,01 年发布到 sourceforge。现在更新比较慢,因为已经非常成熟了。

Quartz 的目的就是让任务调度更加简单,开发人员只需要关注业务即可。他是用 Java 语言编写的(也有.NET 的版本)。Java 代码能做的任何事情,Quartz 都可以调度。

特点:

  • 精确到毫秒级别的调度
  • 可以独立运行,也可以集成到容器中
  • 支持事务(JobStoreCMT )
  • 支持集群
  • 支持持久化

4.2 API使用

引入依赖:

1
2
3
4
5
<dependency> 
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.0</version>
</dependency>

默认配置:

org.quartz.core 包下,有一个默认的配置文件,quartz.properties。当我们没有 定义一个同名的配置文件的时候,就会使用默认配置文件里面的配置。

1
2
3
4
5
6
7
8
9
10
org.quartz.scheduler.instanceName: DefaultQuartzScheduler 
org.quartz.scheduler.rmi.export: false
org.quartz.scheduler.rmi.proxy: false
org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 10
org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
org.quartz.jobStore.misfireThreshold: 60000
org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore

创建三大对象:

  • 创建Job:任务
1
2
3
4
5
public class MyJob implements Job {
public void execute(JobExecutionContext context) throws JobExecutionException {
System.out.println("假发在哪里买的");
}
}

在使用到 Job时,需要进一步包装成 JobDetail。必须要指定 JobName 和 groupName,两个合起 来是唯一标识符。可以携带 KV 的数据(JobDataMap),用于扩展属性,在运行的时候可以从context 获取到。

  • 创建Trigger:就是调度策略

  • 创建Scheduler:调度器

通过 Factory 获取调度器的实例,把 JobDetail 和 Trigger 绑定,注册到容器中。Scheduler 先启动后启动无所谓,只要有 Trigger 到达触发条件,就会执行任务。

详细代码请见:

4.3 体系结构总结

总体架构

4.3.1 JobDetail

我们创建一个实现 Job 接口的类,使用 JobBuilder 包装成 JobDetail,它可以携带 KV 的数据。

4.3.2 Trigger

定义任务的触发规则,Trigger,使用 TriggerBuilder 来构建。

JobDetail 跟 Trigger 是 1:N 的关系。

思考:为什么任务和规则要解耦?

因为任务调度规则复杂的时候可以配置多个trigger来实现。

Trigger 接口在 Quartz 有 4 个继承的子接口:

子接口 描述 特点
SimpleTrigger 简单触发器 固定时刻或时间间隔,毫秒
CalendarIntervalTrigger 基于日历的触发器 比简单触发器更多时间单位,支持非固定时 间的触发,例如一年可能 365/366,一个月 可能28/29/30/31
DailyTimeIntervalTrigger 基于日期的触发器 每天的某个时间段
CronTrigger 基于 Cron 表达式的触发器 最常用的
  • SimpleTrigger

    SimpleTrigger 可以定义固定时刻或者固定时间间隔的调度规则(精确到毫秒)。 例如:每天 9 点钟运行;每隔 30 分钟运行一次。

  • CalendarIntervalTrigger

    CalendarIntervalTrigger 可以定义更多时间单位的调度需求,精确到秒。 好处是不需要去计算时间间隔,比如 1 个小时等于多少毫秒。 例如每年、每个月、每周、每天、每小时、每分钟、每秒。 每年的月数和每个月的天数不是固定的,这种情况也适用。

  • DailyTimeIntervalTrigger

    每天的某个时间段内,以一定的时间间隔执行任务。

    例如:每天早上 9 点到晚上 9 点,每隔半个小时执行一次,并且只在周一到周六执 行。

  • CronTrigger

    CronTirgger 可以定义基于 Cron 表达式的调度规则,是最常用的触发器类型。

Trigger示例代码

Cron 表达式

位置 时间域 取值范围 特殊值
1 0-59 ,-*/
2 分钟 0-59 ,-*/
3 小时 0-23 ,-*/
4 日期 1-31 ,-*?/ L W C
5 月份 1-12 ,-*/
6 星期 1-7 ,-*?/ L W C
7 年份(可选) 1-31 ,-*/

星号(*):可用在所有字段中,表示对应时间域的每一个时刻,例如,在分钟字段时,表示“每分钟”;

问号(?):该字符只在日期和星期字段中使用,它通常指定为“无意义的值”,相当于点位符;

减号(-):表达一个范围,如在小时字段中使用“10-12”,则表示从 10 到 12 点,即 10,11,12;

逗号(,):表达一个列表值,如在星期字段中使用“MON,WED,FRI”,则表示星期一,星期三和星期五;

斜杠(/):x/y 表达一个等步长序列,x 为起始值,y 为增量步长值。如在分钟字段中使用 0/15,则表示为 0,15,30 和45 秒,而 5/15 在分钟字段中表示 5,20,35,50,你也可以使用*/y,它等同于 0/y;

L:该字符只在日期和星期字段中使用,代表“Last”的意思,但它在两个字段中意思不同。L 在日期字段中,表示 这个月份的最后一天,如一月的 31 号,非闰年二月的 28 号;如果 L 用在星期中,则表示星期六,等同于 7。但是,如 果 L 出现在星期字段里,而且在前面有一个数值 X,则表示“这个月的最后 X 天”,例如,6L 表示该月的最后星期五;

W:该字符只能出现在日期字段里,是对前导日期的修饰,表示离该日期最近的工作日。例如 15W 表示离该月 15 号最近的工作日,如果该月 15 号是星期六,则匹配 14 号星期五;如果 15 日是星期日,则匹配 16 号星期一;如果 15 号是星期二,那结果就是 15 号星期二。但必须注意关联的匹配日期不能够跨月,如你指定 1W,如果 1 号是星期六, 结果匹配的是 3 号星期一,而非上个月最后的那天。W 字符串只能指定单一日期,而不能指定日期范围;

LW 组合:在日期字段可以组合使用 LW,它的意思是当月的最后一个工作日;

井号(#):该字符只能在星期字段中使用,表示当月某个工作日。如 6#3 表示当月的第三个星期五(6 表示星期五, #3 表示当前的第三个),而 4#5 表示当月的第五个星期三,假设当月没有第五个星期三,忽略不触发;

C:该字符只在日期和星期字段中使用,代表“Calendar”的意思。它的意思是计划所关联的日期,如果日期没有 被关联,则相当于日历中所有日期。例如 5C 在日期字段中就相当于日历 5 日以后的第一天。1C 在星期字段中相当于 星期日后的第一天。

Cron 表达式对特殊字符的大小写不敏感,对代表星期的缩写英文大小写也不敏感。

上面我们定义的都是在什么时间执行,但是我们有一些在什么时间不执行的需求, 比如:理财周末和法定假日购买不计息;证券公司周末和法定假日休市。

基于 Calendar 的排除规则

如果要在触发器的基础上,排除一些时间区间不执行任务,就要用到 Quartz 的 Calendar 类(注意不是 JDK 的 Calendar)。可以按年、月、周、日、特定日期、Cron 表达式排除。

调用 Trigger 的 modifiedByCalendar()添加到触发器中,并且调用调度器的 addCalendar()方法注册排除规则。

Calendar的种类:

  • BaseCalendar 为高级的 Calendar 实现了基本的功能,实现了 org.quartz.Calendar 接口
  • AnnualCalendar 排除年中一天或多天
  • CronCalendar 日历的这种实现排除了由给定的 CronExpression 表达的时间集合。 例如, 您可以使用此日历使用表达式“* * 0-7,18-23?* *”每天排除所有营业时间(上午 8 点至下午 5 点)。 如果 CronTrigger 具有给定的 cron 表达式并且与具有相同表达式的 CronCalendar 相关联,则日历将排除触发器包含的所有时间,并且它们将彼此抵消。
  • DailyCalendar 您可以使用此日历来排除营业时间(上午 8 点 - 5 点)每天。 每个 DailyCalendar 仅允许指定单个时间范围,并且该时间范围可能不会跨越每 日边界(即,您不能指定从上午 8 点至凌晨 5 点的时间范围)。 如果属 性 invertTimeRange 为 false(默认),则时间范围定义触发器不允许触发 的时间范围。 如果 invertTimeRange 为 true,则时间范围被反转 - 也就是 排除在定义的时间范围之外的所有时间。
  • HolidayCalendar 特别的用于从 Trigger 中排除节假日
  • MonthlyCalendar 排除月份中的指定数天,例如,可用于排除每月的最后一天
  • WeeklyCalendar 排除星期中的任意周几,例如,可用于排除周末,默认周六和周日

Calendar示例代码

4.3.3 Scheduler

调度器,是 Quartz 的指挥官,由 StdSchedulerFactory 产生。它是单例的。

并且是 Quartz 中最重要的 API,默认是实现类是 StdScheduler,里面包含了一个 QuartzScheduler。QuartzScheduler 里面又包含了一个 QuartzSchedulerThread。

Scheduler 中的方法主要分为三大类:

  1. 操作调度器本身,例如调度器的启动 start()、调度器的关闭 shutdown()。
  2. 操作 Trigger,例如 pauseTriggers()、resumeTrigger()。
  3. 操作 Job,例如 scheduleJob()、unscheduleJob()、rescheduleJob()

这些方法非常重要,可以实现任务的动态调度。

Scheduler示例代码

4.3.4 Listener

我们有这么一种需求,在每个任务运行结束之后发送通知给运维管理员。那是不是 要在每个任务的最后添加一行代码呢?这种方式对原来的代码造成了入侵,不利于维护。 如果代码不是写在任务代码的最后一行,怎么知道任务执行完了呢?或者说,怎么监测 到任务的生命周期呢?

观察者模式:定义对象间一种一对多的依赖关系,使得每当一个对象改变状态,则 所有依赖它的对象都会得到通知并自动更新。

Quartz 中提供了三种 Listener,监听 Scheduler 的,监听 Trigger 的,监听 Job 的。 只需要创建类实现相应的接口,并在 Scheduler 上注册 Listener,便可实现对核心对象的监听。

  1. JobListener

定义了4个方法:

方法 作用或执行实际
getName() 返回 JobListener 的名称
jobToBeExecuted() Scheduler 在 JobDetail 将要被执行时调用这个方法
jobExecutionVetoed() Scheduler 在 JobDetail 即将被执行,但又被 TriggerListener 否决了时调用这个 方法
jobWasExecuted() Scheduler 在 JobDetail 被执行之后调用这个方法
  1. TriggerListener
方法 作用或执行实际
getName() 获取名称
triggerFired() Trigger 被触发,Job 上的 execute() 方法将要被执行时,Scheduler 就调用这个 方法
vetoJobExecution() 在 Trigger 触发后,Job 将要被执行时由 Scheduler 调用这个方法。 TriggerListener 给了一个选择去否决 Job 的执行。假如这个方法返回 true,这 个 Job 将不会为此次 Trigger 触发而得到执行
triggerMisfired() Trigger 错过触发时调用
triggerComplete() Trigger 被触发并且完成了 Job 的执行时,Scheduler 调用这个方法
  1. SchedulerListener

方法很多,省略

Listener示例代码

4.3.5 JobStore

Jobstore 用来存储任务和触发器相关的信息,例如所有任务的名称、数量、状态等 等。Quartz 中有两种存储任务的方式,一种在内存,一种是在数据库。

  • RAMJobStore

Quartz 默认的 JobStore 是 RAMJobstore,也就是把任务和触发器信息运行的信息 存储在内存中,用到了 HashMap、TreeSet、HashSet 等等数据结构。

如果程序崩溃或重启,所有存储在内存中的数据都会丢失。所以我们需要把这些数 据持久化到磁盘。

  • JDBCJobStore

JDBCJobStore 可以通过 JDBC 接口,将任务运行数据保存在数据库中。

JDBC 的实现方式有两种,JobStoreSupport 类的两个子类:

JobStoreTX:在独立的程序中使用,自己管理事务,不参与外部事务。

JobStoreCMT:(Container Managed Transactions (CMT),如果需要容器管理事务时,使用它。

使用 JDBCJobSotre 时,需要配置数据库信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
org.quartz.jobStore.class:org.quartz.impl.jdbcjobstore.JobStoreTX 
org.quartz.jobStore.driverDelegateClass:org.quartz.impl.jdbcjobstore.StdJDBCDelegate
# 使用 quartz.properties,不使用默认配置
org.quartz.jobStore.useProperties:true
#数据库中 quartz 表的表名前缀
org.quartz.jobStore.tablePrefix:QRTZ_
org.quartz.jobStore.dataSource:myDS

#配置数据源
org.quartz.dataSource.myDS.driver:com.mysql.jdbc.Driver
org.quartz.dataSource.myDS.URL:jdbc:mysql://localhost:3306/quartz?useUnicode=true&characterEncoding=utf8 org.quartz.dataSource.myDS.user:root
org.quartz.dataSource.myDS.password:123456
org.quartz.dataSource.myDS.validationQuery=select 0 from dual

问题来了?需要建什么表?表里面有什么字段?字段类型和长度是什么?

在官网的 Downloads 链接中,提供了 11 张表的建表语句: quartz-2.2.3-distribution\quartz-2.2.3\docs\dbTables

2.3 的版本在这个路径下:src\org\quartz\impl\jdbcjobstore

表名与作用:

表名 作用
QRTZ_BLOB_TRIGGERS Trigger 作为 Blob 类型存储
QRTZ_CALENDARS 存储 Quartz 的 Calendar 信息
QRTZ_CRON_TRIGGERS 存储 CronTrigger,包括 Cron 表达式和时区信息
QRTZ_FIRED_TRIGGERS 存储与已触发的 Trigger 相关的状态信息,以及相关 Job 的执行信息
QRTZ_JOB_DETAILS 存储每一个已配置的 Job 的详细信息
QRTZ_LOCKS 存储程序的悲观锁的信息
QRTZ_PAUSED_TRIGGER_GRPS 存储已暂停的 Trigger 组的信息
QRTZ_SCHEDULER_STATE 存储少量的有关 Scheduler 的状态信息,和别的 Scheduler 实例
QRTZ_SIMPLE_TRIGGERS 存储 SimpleTrigger 的信息,包括重复次数、间隔、以及已触的次数
QRTZ_SIMPROP_TRIGGERS 存储 CalendarIntervalTrigger 和 DailyTimeIntervalTrigger 两种类型的触发器
QRTZ_TRIGGERS 存储已配置的 Trigger 的信息

5、与spring集成

Spring-quartz 工程

Spring 在 spring-context-support.jar 中直接提供了对 Quartz 的支持。

使用三个FactoryBean配置核心对象

  • JobDetailFactoryBean 实现的是FactoryBean
  • FactoryBean 对不同的Trigger实现有不同的FactoryBean
  • SchedulerFactoryBean 实现的是FactoryBean

配置方式可以选择xml方式或者java注解配置。

6、动态调度

传统的 Spring 方式集成,由于任务信息全部配置在 xml 文件中,如果需要操作任务 或者修改任务运行频率,只能重新编译、打包、部署、重启,如果有紧急问题需要处理, 会浪费很多的时间。

有没有可以动态调度任务的方法?比如停止一个 Job?启动一个 Job?修改 Job 的 触发频率?

读取配置文件、写入配置文件、重启 Scheduler 或重启应用明显是不可取的。 对于这种频繁变更并且需要实时生效的配置信息,我们可以放到哪里?

ZK、Redis、DB tables。

并且,可以提供一个界面,实现对数据表的轻松操作。

6.1 配置管理

这里我们用最简单的数据库的实现。

建一张什么样的表?参考 JobDetail 的属性。

1
2
3
4
5
6
7
8
9
10
11
CREATE TABLE `sys_job` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`job_name` varchar(512) NOT NULL COMMENT '任务名称',
`job_group` varchar(512) NOT NULL COMMENT '任务组名',
`job_cron` varchar(512) NOT NULL COMMENT '时间表达式',
`job_class_path` varchar(1024) NOT NULL COMMENT '类路径,全类型',
`job_data_map` varchar(1024) DEFAULT NULL COMMENT '传递 map 参数',
`job_status` int(2) NOT NULL COMMENT '状态:1 启用 0 停用',
`job_describe` varchar(1024) DEFAULT NULL COMMENT '任务功能描述',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=25 DEFAULT CHARSET=utf8;

6.2 数据操作与任务调度

操作数据表非常简单,SSM 增删改查。 但是在修改了表的数据之后,怎么让调度器知道呢?

调度器的接口:Scheduler 在我们的需求中,我们需要做的事情:

1、 新增一个任务
2、 删除一个任务
3、 启动、停止一个任务
4、 修改任务的信息(包括调度规律)

因此可以把相关的操作封装到一个工具类中。 代码

6.3 容器启动与Service注入

6.3.1 容器启动

因为任务没有定义在 ApplicationContext.xml 中,而是放到了数据库中,Spring Boot 启动时,怎么读取任务信息?

或者,怎么在 Spring 启动完成的时候做一些事情?

创建一个类,实现 CommandLineRunner 接口,实现 run 方法。

从表中查出状态是 1 的任务,然后构建。

6.3.2 Service 类注入到 Job 中

Spring的 Bean 如何注入到实现了 Job 接口的类中?

例如在 SendMailTask 中,需要注入 ISysJobService,查询数据库发送邮件。 如果没有任何配置,注入会报空指针异常。

原因:

因为定时任务 Job 对象的实例化过程是在 Quartz 中进行的,而 Service Bean 是由Spring 容器管理的,Quartz 察觉不到 Service Bean 的存在,所以无法将 Service Bean 装配到 Job 对象中。

分析:

Quartz 集成到 Spring 中,用到 SchedulerFactoryBean,其实现了 InitializingBean 方法,在唯一的方法 afterPropertiesSet()在 Bean 的属性初始化后调用。

调度器用 AdaptableJobFactory 对 Job 对象进行实例化。所以,如果我们可以把这 个 JobFactory 指定为我们自定义的工厂的话,就可以在 Job 实例化完成之后,把 Job 纳入到 Spring 容器中管理。

解决这个问题的步骤:

1、定义一个 AdaptableJobFactory,实现 JobFactory 接口,实现接口定义的newJob 方法,在这里面返回 Job 实例

2、定义一个 MyJobFactory,继承 AdaptableJobFactory。使用 Spring 的 AutowireCapableBeanFactory,把 Job 实例注入到容器中。

3、指定 Scheduler 的 JobFactory 为自定义的 JobFactory。

7、集群部署

为什么需要集群?

1、防止单点故障,减少对业务的影响

2、减少节点的压力,例如在 10 点要触发 1000 个任务,如果有 10 个节点,则每个 节点之需要执行 100 个任务

集群需要解决的问题?

1、任务重跑,因为节点部署的内容是一样的,到 10 点的时候,每个节点都会执行 相同的操作,引起数据混乱。比如跑批,绝对不能执行多次。

2、任务漏跑,假如任务是平均分配的,本来应该在某个节点上执行的任务,因为节 点故障,一直没有得到执行。

3、水平集群需要注意时间同步问题

4、Quartz 使用的是随机的负载均衡算法,不能指定节点执行

如何解决?

所以必须要有一种共享数据或者通信的机制。在分布式系统的不同节点中,我们可 以采用什么样的方式,实现数据共享?

两两通信,或者基于分布式的服务,实现数据共享。

例如:ZK、Redis、DB。

在 Quartz 中,提供了一种简单的方式,基于数据库共享任务执行信息。也就是说,一个节点执行任务的时候,会操作数据库,其他的节点查询数据库,便可以感知到了。

同样的问题:建什么表?哪些字段?

依旧使用quartz系统自带的 11 张表。

集群配置与验证

quartz.properties 配置

四个配置:集群实例 ID、集群开关、数据库持久化、数据源信息

注意先清空 quartz 所有表、改端口、两个任务频率改成一样

验证 1:先后启动 2 个节点,任务是否重跑

验证 2:停掉一个节点,任务是否漏跑