java-quartz计划任务

quartz计划任务是开源的任务调度服务,本文包含简单实例分析与源码分析

简单入门

入门DEMO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
SchedulerFactory schedFact = new org.quartz.impl.StdSchedulerFactory();

Scheduler sched = schedFact.getScheduler();

sched.start();

// define the job and tie it to our HelloJob class
JobDetail job = newJob(HelloJob.class)
.withIdentity("myJob", "group1")// name "myJob", group "group1"
.build();

// Trigger the job to run now, and then every 40 seconds
Trigger trigger = newTrigger()
.withIdentity("myTrigger", "group1")
.startNow()
.withSchedule(simpleSchedule()
.withIntervalInSeconds(40)
.repeatForever())
.build();

// Tell quartz to schedule the job using our trigger
sched.scheduleJob(job, trigger);
1
2
3
4
5
6
7
8
9
10
11
public class HelloJob implements Job {

public HelloJob() {
}

public void execute(JobExecutionContext context)
throws JobExecutionException
{
System.err.println("Hello! HelloJob is executing.");
}
}

关键介绍

主要API:

  • Scheduler 主API负责任务交互
  • Job Job是接口,其中只有一个execute方法,我们需要实现并且重写,正在需要执行的任务
  • JobDetail 任务细节,是任务的定义,会引用Job
  • Trigger 定义执行任务的规则,什么时候哪个Job需要实现CronTrigger,SimpleTrigger
  • JobBuilder DSL风格,通过Builder实现,建造者模式
  • TriggerBuilder DSL风格,通过Builder实现,建造者模式

name和group

JobDetail和Trigger都有name和group。

name是它们在这个sheduler里面的唯一标识。如果我们要更新一个JobDetail定义,只需要设置一个name相同的JobDetail实例即可。

group是一个组织单元,sheduler会提供一些对整组操作的API,比如 scheduler.resumeJobs()。

Misfire(错失触发)策略

类似的Scheduler资源不足的时候,或者机器崩溃重启等,有可能某一些Trigger在应该触发的时间点没有被触发,也就是Miss Fire了。这个时候Trigger需要一个策略来处理这种情况。每种Trigger可选的策略各不相同

Calendar

可以用来指定执行时间,或者排除特定的时间

JobDataMap

可以跟JobDetail或者Trigger关联,存储参数信息。还有一种方法不使用JobDataMap,可以在Job方法里定义参数通过get、set方法注入设置参数

Concurrency

注解
@DisallowConcurrentExecution 同一时刻只有一个实例会执行任务

@PersistJobDataAfterExecution
添加到 Job 类后,表示 Quartz 将会在成功执行 execute() 方法后(没有抛出异常)更新 JobDetail 的 JobDataMap,下一次执行相同的任务(JobDetail)将会得到更新后的值,而不是原始的值

JDBC-JobStore

通过JDBC存储任务信息,避免由于系统关闭导致任务漏执行

RAMJobStore

任务存储在内存中,系统被停止后所有的数据都会丢失

Scheduler

JobStore是会来存储运行时信息的,包括Trigger,Schduler,JobDetail,业务锁等

ThreadPool就是线程池

QuartzJobBean

与spring集成继承的一个类,实现executeInternal()方法,可以继承这个类里通过反射调用具体的任务

Trigger 属性

  • jobKey 任务定义
  • startTime 生效开始时间
  • endTime 生效结束时间

CronTrigger

表达式空格分隔属性含义:

  • Seconds
  • Minutes
  • Hours
  • Day-of-Month
  • Month
  • Day-of-Week
  • Year (可选)

“0 0 12 ? * WED” - which means “every Wednesday at 12:00:00 pm”

Cron表达式例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
每隔5秒执行一次:*/5 * * * * ?

每隔1分钟执行一次:0 */1 * * * ?

每天23点执行一次:0 0 23 * * ?

每天凌晨1点执行一次:0 0 1 * * ?

每月1号凌晨1点执行一次:0 0 1 1 * ?

每月最后一天23点执行一次:0 0 23 L * ?

每周星期天凌晨1点实行一次:0 0 1 ? * L

在26分、29分、33分执行一次:0 26,29,33 * * * ?

每天的0点、13点、18点、21点都执行一次:0 0 0,13,18,21 * * ?

项目中的集群模式用法1

原先任务模块跟Admin后台管理模块是做在一起的,而且提供了任务管理的功能。现在想把任务模块单独拆分出来独立部署。看了文档感觉可以这么做:

  • admin跟task任务模块都做好相同的任务配置,关闭admin模块的任务的自启动功能,admin负责任务任务管理,包括页面,开启关闭任务,任务Class任务方法名称配置
  • task负责具体业务执行,配置默认开启任务自启动功能
  • admin进行任务修改的时候会修改cron表里面的配置,task任务模块监听到改变会动态进行任务开启关闭更新任务周期
  • 通过反射的形式实现代码的低耦合,设计一个通用对象(保存需要调用的服务名,方法名,状态)通过反射调用真正需要被调用的job,可以只在task里面写各种任务job进行通用,而不是admin跟task都要同时存在job,把job存入数据库

启动过程分析

若quartz是配置在spring中,当服务器启动时,就会装载相关的bean。SchedulerFactoryBean实现了InitializingBean接口,因此在初始化bean的时候,会执行afterPropertiesSet方法,该方法将会调用SchedulerFactory(DirectSchedulerFactory 或者 StdSchedulerFactory,通常用StdSchedulerFactory)创建Scheduler。SchedulerFactory在创建quartzScheduler的过程中,将会读取配置参数,初始化各个组件。通过线程池执行任务

调用StdSchedulerFactory中的initialize()方法初始化,加载quartz.properties配置,如果用户没有进行参数配置就调用默认配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
    public Scheduler getScheduler() throws SchedulerException {
if (this.cfg == null) {
this.initialize();
}

SchedulerRepository schedRep = SchedulerRepository.getInstance();
Scheduler sched = schedRep.lookup(this.getSchedulerName());
if (sched != null) {
if (!sched.isShutdown()) {
return sched;
}

schedRep.remove(this.getSchedulerName());
}

sched = this.instantiate();// 具体初始化方法
return sched;
}

任务装配

执行计划任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public Date scheduleJob(JobDetail jobDetail, Trigger trigger) throws SchedulerException {
this.validateState();
if (jobDetail == null) {
throw new SchedulerException("JobDetail cannot be null");
} else if (trigger == null) {
throw new SchedulerException("Trigger cannot be null");
} else if (jobDetail.getKey() == null) {
throw new SchedulerException("Job's key cannot be null");
} else if (jobDetail.getJobClass() == null) {
throw new SchedulerException("Job's class cannot be null");
} else {
OperableTrigger trig = (OperableTrigger)trigger;
if (trigger.getJobKey() == null) {
trig.setJobKey(jobDetail.getKey());
} else if (!trigger.getJobKey().equals(jobDetail.getKey())) {
throw new SchedulerException("Trigger does not reference given job!");
}

trig.validate();
Calendar cal = null;
if (trigger.getCalendarName() != null) {
cal = this.resources.getJobStore().retrieveCalendar(trigger.getCalendarName());
}

Date ft = trig.computeFirstFireTime(cal);
if (ft == null) {
throw new SchedulerException("Based on configured schedule, the given trigger '" + trigger.getKey() + "' will never fire.");
} else {
this.resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
this.notifySchedulerListenersJobAdded(jobDetail);
this.notifySchedulerThread(trigger.getNextFireTime().getTime());
this.notifySchedulerListenersSchduled(trigger);
return ft;
}
}
}

分布式和集群能力原理分析

quartz通过数据库进行通讯,保障节点互相感知对方存在,正常的站点能够接管宕机的站点。补执行失败的任务。注意各个服务器之间做好时间同步

集群相关表:

表名 说明
QRTZ_CALENDARS 存储Quartz的Calendar信息
QRTZ_CRON_TRIGGERS 存储CronTrigger,包括Cron表达式和时区信息
QRTZ_FIRED_TRIGGERS 存储与已触发的Trigger相关的状态信息,以及相联Job的执行信息
QRTZ_PAUSED_TRIGGER_GRPS 存储已暂停的Trigger组的信息
QRTZ_SCHEDULER_STATE 存储少量的有关Scheduler的状态信息,和别的Scheduler实例
QRTZ_LOCKS 存储程序的悲观锁的信息
QRTZ_JOB_DETAILS 存储每一个已配置的Job的详细信息
QRTZ_SIMPLE_TRIGGERS 存储简单的Trigger,包括重复次数、间隔、以及已触的次数
QRTZ_BLOG_TRIGGERS Trigger作为Blob类型存储
QRTZ_TRIGGERS 存储已配置的Trigger的信息
QRTZ_SIMPROP_TRIGGERS -

QRTZ_LOCKS中有5条记录,代表5把锁。
重点关注:

  • QuartzSchedulerThread run()方法
  • JobStoreTx executeInNonManagedTxLock(…) 方法
  • StdRowLockSemaphore
  • SimpleThreadPool

源码分析

Trigger

Trigger触发器类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// 获取触发器key
TriggerKey getKey();

JobKey getJobKey();

String getDescription();

String getCalendarName();

JobDataMap getJobDataMap();

int getPriority();
// 是否重复执行
boolean mayFireAgain();

Date getStartTime();

Date getEndTime();

Date getNextFireTime();

Date getPreviousFireTime();
// 获取某个时间后的运行时间
Date getFireTimeAfter(Date var1);

Date getFinalFireTime();
// 获取识别策略
int getMisfireInstruction();
// 获取触发器builder
TriggerBuilder<? extends Trigger> getTriggerBuilder();
// 获取调度器builder
ScheduleBuilder<? extends Trigger> getScheduleBuilder();
// 完成状态
public static enum CompletedExecutionInstruction {
NOOP,// 无
RE_EXECUTE_JOB,
SET_TRIGGER_COMPLETE,// 触发器执行完成
DELETE_TRIGGER,
SET_ALL_JOB_TRIGGERS_COMPLETE,
SET_TRIGGER_ERROR,// 执行错误
SET_ALL_JOB_TRIGGERS_ERROR;

private CompletedExecutionInstruction() {
}
}
// 触发器状态
public static enum TriggerState {
NONE,
NORMAL,// 正常
PAUSED,
COMPLETE,
ERROR,// 错误
BLOCKED;// 阻塞

private TriggerState() {
}
}

触发器的所有实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Trigger
CalendarIntervalTrigger (org.quartz)
CalendarIntervalTriggerImpl (org.quartz.impl.triggers)
MutableTrigger (org.quartz.spi)
OperableTrigger (org.quartz.spi)
AbstractTrigger (org.quartz.impl.triggers)
SimpleTrigger (org.quartz)
SimpleTriggerImpl (org.quartz.impl.triggers)
CoreTrigger (org.quartz.impl.triggers)
CalendarIntervalTriggerImpl (org.quartz.impl.triggers)
SimpleTriggerImpl (org.quartz.impl.triggers)
DailyTimeIntervalTriggerImpl (org.quartz.impl.triggers)
CronTriggerImpl (org.quartz.impl.triggers)
CronTrigger (org.quartz)
CronTriggerImpl (org.quartz.impl.triggers)
DailyTimeIntervalTrigger (org.quartz)
DailyTimeIntervalTriggerImpl (org.quartz.impl.triggers)

调度器Builder

1
2
3
4
5
ScheduleBuilder (org.quartz)   
CalendarIntervalScheduleBuilder (org.quartz)
DailyTimeIntervalScheduleBuilder (org.quartz)
SimpleScheduleBuilder (org.quartz)
CronScheduleBuilder (org.quartz)

触发器Builder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 私有构造函数
private TriggerBuilder()
// 创建一个Builder
public static TriggerBuilder<Trigger> newTrigger()
// 创建触发器
build()
// 根据name和默认的group创建trigger的key
public TriggerBuilder<T> withIdentity(String name)
public TriggerBuilder<T> withIdentity(String name, String group)
public TriggerBuilder<T> withIdentity(TriggerKey triggerKey)
// 描述
public TriggerBuilder<T> withDescription(String triggerDescription)
// 优先级
public TriggerBuilder<T> withPriority(int triggerPriority)
// 日期
public TriggerBuilder<T> modifiedByCalendar(String calName)
// 开始时间
public TriggerBuilder<T> startAt(Date triggerStartTime)
// 立即执行
public TriggerBuilder<T> startNow()
// 结束时间
public TriggerBuilder<T> endAt(Date triggerEndTime)
// 调度器
public <SBT extends T> TriggerBuilder<SBT> withSchedule(ScheduleBuilder<SBT> schedBuilder)
// 设置作业
public TriggerBuilder<T> forJob(JobKey keyOfJobToFire)
……

简单触发器SimpleTrigger

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 构造函数私有化
protected SimpleScheduleBuilder()
// 获取简单调度器
public static SimpleScheduleBuilder simpleSchedule()
// 1分钟执行(一直执行)
public static SimpleScheduleBuilder repeatMinutelyForever()
// 每隔几分钟执行(一直执行)
public static SimpleScheduleBuilder repeatMinutelyForever(int minutes)
// 1秒执行(一直执行)
public static SimpleScheduleBuilder repeatSecondlyForever()
// 间隔时间为1分钟,总的执行次数为count
public static SimpleScheduleBuilder repeatMinutelyForTotalCount(int count
……
public MutableTrigger build()// 创建一个Trigger
// 几秒钟重复执行
public SimpleScheduleBuilder withIntervalInMilliseconds(long intervalInMillis)
public SimpleScheduleBuilder withIntervalInSeconds(int intervalInSeconds)
public SimpleScheduleBuilder withIntervalInMinutes(int intervalInMinutes)
public SimpleScheduleBuilder withIntervalInHours(int intervalInHours)
// 重复执行册数
public SimpleScheduleBuilder withRepeatCount(int triggerRepeatCount)
// 以错过的第一个频率时间立刻开始执行
// 重做错过的所有频率周期后
// 当下一次触发频率发生时间大于当前时间后,再按照正常的频率依次执行
public SimpleScheduleBuilder withMisfireHandlingInstructionIgnoreMisfires()
……

参考