前言
Quartz是 OpenSymphony 开源组织在 Job Scheduling 领域又一个开源项目,是完全由 Java 开发的一个开源任务日程管理系统,“任务进度管理器”就是一个在预先确定(被纳入日程)的时间到达时,负责执行(或者通知)其他软件组件的系统。 Quartz 是一个开源的作业调度框架,它完全由 Java 写成,并设计用于 J2SE 和 J2EE 应用中,它提供了巨大的灵活性而不牺牲简单性
当定时任务愈加复杂时,使用 Spring 注解 @Schedule 已经不能满足业务需要
在项目开发中,经常需要定时任务来帮助我们来做一些内容,如定时派息、跑批对账、将任务纳入日程或者从日程中取消,开始,停止,暂停日程进度等。SpringBoot 中现在有两种方案可以选择,第一种是 SpringBoot 内置的方式简单注解就可以使用,当然如果需要更复杂的应用场景还是得 Quartz 上场,Quartz 目前是 Java 体系中最完善的定时方案
Quartz中主要用到了:Builder建造者模式、Factory工厂模式以及组件模式,我们要知道Quartz是如何调度的,需要知道三个概念:任务(Job,我们需要将具体的业务逻辑写到实现了Job接口的实现类中)、触发器(Trigger,它定义了任务的执行规则),最后是调度器(Scheduler,通过传入的任务Job和触发器Trigger,以指定的规则执行任务)
官方网站:http://quartz-scheduler.org
优点:
- 丰富的 Job 操作 API
- 支持多种配置
- SpringBoot 无缝集成
- 支持持久化
- 支持集群
- Quartz 还支持开源,是一个功能丰富的开源作业调度库,可以集成到几乎任何 Java 应用程序中
环境准备
存储
这里我们采用 MySQL 数据库存储的方式,首先需要新建 Quartz 的相关表,从官网下载:sql脚本下载地址,名称为tables_mysql.sql,创建成功后数据库中多出 11 张表
同时根据自身业务场景增加一张定时任务调度(业务)表(可选,可替)
# 任务调度表
DROP TABLE IF EXISTS qrtz_sys_job;
CREATE TABLE `qrtz_sys_job` (
`job_id` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '定时任务ID',
`job_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '定时任务名称',
`job_group` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '定时任务组',
`bean_target` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '目标bean名',
`bean_method_target` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '目标bean的方法名',
`cron_expression` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '执行表达式',
`concurrent` tinyint(4) NULL DEFAULT 0 COMMENT '是否并发执行,0允许,1不允',
`job_description` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL COMMENT '任务描述',
`create_date` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '创建时间',
`update_date` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`job_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '定时任务调度表' ROW_FORMAT = Dynamic;
commit;
针对Quartz表和业务表的说明简要如下:
表名 | 描述 | |
---|---|---|
qrtz_fired_triggers | 保存已经触发的触发器状态信息 | |
qrtz_paused_trigger_grps | 存放暂停掉的触发器表表 | |
qrtz_scheduler_state | 调度器状态表 | |
qrtz_locks | 存储程序的悲观锁的信息(假如使用了悲观锁) | |
qrtz_simple_triggers | 简单的触发器表 | |
qrtz_simprop_triggers | 存储两种类型的触发器表 | |
qrtz_cron_triggers | 定时触发器表 | |
qrtz_blob_triggers | 以blob 类型存储的触发器 | |
qrtz_calendars | 日历信息表 | |
qrtz_triggers | 触发器表 | |
qrtz_job_details | job 详细信息表 | |
qrtz_sys_job | 定时任务调度表 |
Maven依赖
这里使用 druid 作为数据库连接池(可选)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.10</version>
</dependency>
配置文件
默认情况下,Quartz 会加载 classpath 下的 quartz.properties 作为配置文件。如果找不到,则会使用 quartz 框架自己 jar 包下 org/quartz 底下的 quartz.properties 文件
#主要分为scheduler、threadPool、jobStore、dataSource等部分
org.quartz.scheduler.instanceId=AUTO
org.quartz.scheduler.instanceName=DefaultQuartzScheduler
#如果您希望Quartz Scheduler通过RMI作为服务器导出本身,则将“rmi.export”标志设置为true
#在同一个配置文件中为'org.quartz.scheduler.rmi.export'和'org.quartz.scheduler.rmi.proxy'指定一个'true'值是没有意义的,如果你这样做'export'选项将被忽略
org.quartz.scheduler.rmi.export=false
#如果要连接(使用)远程服务的调度程序,则将“org.quartz.scheduler.rmi.proxy”标志设置为true。您还必须指定RMI注册表进程的主机和端口 - 通常是“localhost”端口1099
org.quartz.scheduler.rmi.proxy=false
org.quartz.scheduler.wrapJobExecutionInUserTransaction=false
#实例化ThreadPool时,使用的线程类为SimpleThreadPool
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
#threadCount和threadPriority将以setter的形式注入ThreadPool实例
#并发个数 如果你只有几个工作每天触发几次 那么1个线程就可以,如果你有成千上万的工作,每分钟都有很多工作 那么久需要50-100之间.
#只有1到100之间的数字是非常实用的
org.quartz.threadPool.threadCount=5
#优先级 默认值为5
org.quartz.threadPool.threadPriority=5
#可以是“true”或“false”,默认为false
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread=true
#在被认为“misfired”(失火)之前,调度程序将“tolerate(容忍)”一个Triggers(触发器)将其下一个启动时间通过的毫秒数。默认值(如果您在配置中未输入此属性)为60000(60秒)
org.quartz.jobStore.misfireThreshold=5000
# 默认存储在内存中,RAMJobStore快速轻便,但是当进程终止时,所有调度信息都会丢失
#org.quartz.jobStore.class=org.quartz.simpl.RAMJobStore
#持久化方式,默认存储在内存中,此处使用数据库方式
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
#您需要为JobStore选择一个DriverDelegate才能使用。DriverDelegate负责执行特定数据库可能需要的任何JDBC工作
# StdJDBCDelegate是一个使用“vanilla”JDBC代码(和SQL语句)来执行其工作的委托,用于完全符合JDBC的驱动程序
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
#可以将“org.quartz.jobStore.useProperties”配置参数设置为“true”(默认为false),以指示JDBCJobStore将JobDataMaps中的所有值都作为字符串,
#因此可以作为名称 - 值对存储而不是在BLOB列中以其序列化形式存储更多复杂的对象。从长远来看,这是更安全的,因为您避免了将非String类序列化为BLOB的类版本问题
org.quartz.jobStore.useProperties=false
#表前缀
org.quartz.jobStore.tablePrefix=QRTZ_
#数据源别名,自定义
org.quartz.jobStore.dataSource=qzDS
#使用阿里的druid作为数据库连接池
org.quartz.dataSource.qzDS.connectionProvider.class=com.windseeker.common.quartz.config.DruidPoolingconnectionProvider
org.quartz.dataSource.qzDS.URL=jdbc:mysql://localhost:3306/quartz?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8
org.quartz.dataSource.qzDS.user=root
org.quartz.dataSource.qzDS.password=123456
org.quartz.dataSource.qzDS.driver=com.mysql.jdbc.Driver
org.quartz.dataSource.qzDS.maxConnections=10
#设置为“true”以打开群集功能。如果您有多个Quartz实例使用同一组数据库表,则此属性必须设置为“true”,否则您将遇到破坏
#org.quartz.jobStore.isClustered=false
编码
config
package com.windseeker.common.quartz.config;
import org.quartz.Scheduler;
import org.quartz.ee.servlet.QuartzInitializerListener;
import org.springframework.beans.factory.config.PropertiesFactoryBean;
import org.springframework.boot.autoconfigure.quartz.SchedulerFactoryBeanCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import java.io.IOException;
import java.util.Properties;
/**
* QuartzConfig
*
* @author Lehman
* @blog https://www.lehman.top
* @since 2024-06-06
*/
@Configuration
public class QuartzConfig implements SchedulerFactoryBeanCustomizer {
@Bean
public Properties properties() throws IOException {
PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
// 对quartz.properties文件进行读取
propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
// 在quartz.properties中的属性被读取并注入后再初始化对象
propertiesFactoryBean.afterPropertiesSet();
return propertiesFactoryBean.getObject();
}
@Bean
public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
System.out.println("配置文件:"+properties().getProperty("org.quartz.dataSource.qzDS.URL"));
schedulerFactoryBean.setQuartzProperties(properties());
return schedulerFactoryBean;
}
/*
* quartz初始化监听器
*/
@Bean
public QuartzInitializerListener executorListener() {
return new QuartzInitializerListener();
}
/*
* 通过SchedulerFactoryBean获取Scheduler的实例
*/
@Bean
public Scheduler scheduler() throws IOException {
return schedulerFactoryBean().getScheduler();
}
/**
* 使用阿里的druid作为数据库连接池
*/
@Override
public void customize(SchedulerFactoryBean schedulerFactoryBean) {
schedulerFactoryBean.setStartupDelay(2); // 启动延迟时间
schedulerFactoryBean.setAutoStartup(true); // 自动启动
schedulerFactoryBean.setOverwriteExistingJobs(true); // 覆盖现有作业
}
}
package com.windseeker.common.quartz.config;
import com.alibaba.druid.pool.DruidDataSource;
import lombok.Data;
import org.quartz.SchedulerException;
import org.quartz.utils.ConnectionProvider;
import java.sql.Connection;
import java.sql.SQLException;
/**
* 使用阿里的druid作为数据库连接池
*
* @author Lehman
* @blog https://www.lehman.top
* @since 2024-06-06
*/
@Data
public class DruidPoolingconnectionProvider implements ConnectionProvider {
// JDBC驱动
public String driver;
// JDBC连接串
public String URL;
// 数据库用户名
public String user;
// 数据库用户密码
public String password;
// 数据库最大连接数
public int maxConnections;
// 数据库SQL查询每次连接返回执行到连接池,以确保它仍然是有效的。
public String validationQuery;
private boolean validateOnCheckout;
private int idleConnectionValidationSeconds;
public String maxCachedStatementsPerConnection;
private String discardIdleConnectionsSeconds;
public static final int DEFAULT_DB_MAX_CONNECTIONS = 10;
public static final int DEFAULT_DB_MAX_CACHED_STATEMENTS_PER_CONNECTION = 120;
// Druid连接池
private DruidDataSource datasource;
@Override
public Connection getConnection() throws SQLException {
return datasource.getConnection();
}
@Override
public void shutdown() {
datasource.close();
}
@Override
public void initialize() throws SQLException {
if (this.URL == null) {
throw new SQLException("DBPool could not be created: DB URL cannot be null");
}
if (this.driver == null) {
throw new SQLException("DBPool driver could not be created: DB driver class name cannot be null!");
}
if (this.maxConnections < 0) {
throw new SQLException("DBPool maxConnections could not be created: Max connections must be greater than zero!");
}
datasource = new DruidDataSource();
try{
datasource.setDriverClassName(this.driver);
} catch (Exception e) {
try {
throw new SchedulerException("Problem setting driver class name on datasource: " + e.getMessage(), e);
} catch (SchedulerException ignored) {
}
}
datasource.setUrl(this.URL);
datasource.setUsername(this.user);
datasource.setPassword(this.password);
datasource.setMaxActive(this.maxConnections);
datasource.setMinIdle(1);
datasource.setMaxWait(0);
datasource.setMaxPoolPreparedStatementPerConnectionSize(DEFAULT_DB_MAX_CONNECTIONS);
if (this.validationQuery != null) {
datasource.setValidationQuery(this.validationQuery);
if(!this.validateOnCheckout)
datasource.setTestOnReturn(true);
else
datasource.setTestOnBorrow(true);
datasource.setValidationQueryTimeout(this.idleConnectionValidationSeconds);
}
}
}
常量
package com.windseeker.common.quartz;
/**
* Constants
*
* @author Lehman
* @blog https://www.lehman.top
* @since 2024/6/6 11:23
*/
public class ScheduleConstants {
/**
* 参数
*/
public static final String TASK_PARAMS = "PARAMS";
/** 默认 */
public static final String MISFIRE_DEFAULT = "0";
/**
* 忽略错过的执行,按新 Cron 继续运行。
*/
public static final String MISFIRE_IGNORE_MISFIRES = "1";
/**
* 补偿错过的执行,然后继续运行。
*/
public static final String MISFIRE_FIRE_AND_PROCEED = "2";
/**
* 错过的执行不做任何处理,等待下一次 Cron 触发。
*/
public static final String MISFIRE_DO_NOTHING = "3";
}
业务
package com.windseeker.common.quartz.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.windseeker.common.core.utils.StringUtil;
import com.windseeker.common.quartz.dto.JobDTO;
import com.windseeker.common.quartz.mapper.SysJobMapper;
import com.windseeker.common.quartz.pojo.SysJob;
import com.windseeker.common.quartz.service.SysJobService;
import com.windseeker.common.quartz.utils.ScheduleUtils;
import org.quartz.*;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.lang.reflect.InvocationTargetException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Set;
/**
* SysJobServiceImpl
*
* @author Lehman
* @blog https://www.lehman.top
* @since 204-06-06
*/
@Service
public class SysJobServiceImpl extends ServiceImpl<SysJobMapper, SysJob> implements SysJobService {
@Resource
private Scheduler scheduler;
@Override
public void init() throws SchedulerException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
scheduler.clear();
List<SysJob> list = list();
for (int i = 0 ; i < list.size() ; i ++){
SysJob job = list.get(i);
ScheduleUtils.createScheduleJob(scheduler, job);
}
}
@Override
public List<JobDTO> getJobList(String jobName,String jobGroup,String beanTarget) {
LambdaQueryWrapper<SysJob> lq = new LambdaQueryWrapper<>();
if(StringUtil.isNotBlank(jobName)) lq.like(SysJob::getJobName, jobName);
if(StringUtil.isNotBlank(jobGroup)) lq.eq(SysJob::getJobGroup, jobGroup);
if(StringUtil.isNotBlank(beanTarget)) lq.like(SysJob::getBeanTarget, beanTarget);
lq.orderByDesc(SysJob::getCreateDate);
List<SysJob> jobList = list(lq);
List<JobDTO> list = new ArrayList<>();
try{
for (SysJob job : jobList){
JobKey jobKey = JobKey.jobKey(job.getJobId(),job.getJobGroup());
// 获取JobDetail
JobDetail jobDetail = scheduler.getJobDetail(jobKey);
// 获取与Job关联的TriggerKeys
List<? extends Trigger> triggerKeys = scheduler.getTriggersOfJob(jobKey);
JobDTO jobDTO = new JobDTO();
BeanUtils.copyProperties(job,jobDTO);
// 遍历TriggerKeys
for (Trigger triggerKey : triggerKeys) {
// 获取Trigger
Trigger trigger = scheduler.getTrigger(triggerKey.getKey());
// 检查Trigger状态(可能的状态包括NORMAL, PAUSED, COMPLETE, ERROR等)
Trigger.TriggerState triggerState = scheduler.getTriggerState(triggerKey.getKey());
// 获取上次执行时间(如果触发器尚未触发,则为null)
Date previousFireTime = trigger.getPreviousFireTime();
String previousFireTimeStr = previousFireTime != null ? new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(previousFireTime) : "N/A";
// 获取下次执行时间(如果触发器不再有效,则为null)
Date nextFireTime = trigger.getNextFireTime();
String nextFireTimeStr = nextFireTime != null ? new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(nextFireTime) : "N/A";
jobDTO.setJobStatus(triggerState.name());
jobDTO.setJobPevDate(previousFireTimeStr);
jobDTO.setJobNextDate(nextFireTimeStr);
}
list.add(jobDTO);
}
} catch (Exception e) {
e.printStackTrace();
}finally {
return list;
}
}
@Override
@Transactional(rollbackFor=Exception.class)
public void insertJob(SysJob job) throws Exception {
if(save(job)){
ScheduleUtils.createScheduleJob(scheduler, job);
}else{
throw new Exception("新增定时任务失败");
}
}
@Override
@Transactional(rollbackFor=Exception.class)
public void updateJob(SysJob job) throws Exception {
if(updateById(job)){
// 判断是否存在
JobKey jobKey = JobKey.jobKey(job.getJobId(),job.getJobGroup());
if (scheduler.checkExists(jobKey)) {
scheduler.deleteJob(jobKey);
}
ScheduleUtils.createScheduleJob(scheduler, job);
}else{
throw new Exception("更新定时任务失败");
}
}
@Override
@Transactional(rollbackFor=Exception.class)
public void run(String jobId) throws Exception {
SysJob job = getJob(jobId);
if(job != null){
scheduler.triggerJob(JobKey.jobKey(job.getJobId(),job.getJobGroup()));
}else{
throw new Exception("立即运行定时任务失败,任务对象丢失");
}
}
@Override
@Transactional(rollbackFor=Exception.class)
public void rescheduleJob(String jobId) throws Exception {
SysJob job = getJob(jobId);
if(job != null){
TriggerKey triggerKey = TriggerKey.triggerKey(job.getJobId(), job.getJobGroup());
// 表达式调度构建器
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression());
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
// 按新的cronExpression表达式重新构建trigger
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
// 按新的trigger重新设置job执行,重启触发器
scheduler.rescheduleJob(triggerKey, trigger);
}else{
throw new Exception("重启定时任务失败,任务对象丢失");
}
}
@Override
@Transactional(rollbackFor=Exception.class)
public void pauseJob(String jobId) throws Exception {
SysJob job = getJob(jobId);
if(job != null){
scheduler.pauseJob(JobKey.jobKey(job.getJobId(),job.getJobGroup()));
}else{
throw new Exception("暂停定时任务失败,任务对象丢失");
}
}
@Override
@Transactional(rollbackFor=Exception.class)
public void resumeJob(String jobId) throws Exception {
SysJob job = getJob(jobId);
if(job != null){
scheduler.resumeJob(JobKey.jobKey(job.getJobId(),job.getJobGroup()));
}else{
throw new Exception("恢复定时任务失败,任务对象丢失");
}
}
@Override
@Transactional(rollbackFor=Exception.class)
public void deleteJob(String jobId) throws Exception {
SysJob job = getJob(jobId);
if(job !=null && removeById(jobId)){
scheduler.pauseTrigger(TriggerKey.triggerKey(job.getJobId(),job.getJobGroup()));
scheduler.unscheduleJob(TriggerKey.triggerKey(job.getJobId(),job.getJobGroup()));
scheduler.deleteJob(JobKey.jobKey(job.getJobId(),job.getJobGroup()));
}else{
throw new Exception("删除定时任务失败,任务对象丢失");
}
}
/**
* 根据ID获取调度信息
* @param jobId
* @return
*/
private SysJob getJob(String jobId) {
return getById(jobId);
}
}
VO
package com.windseeker.common.quartz.vo;
import lombok.Data;
/**
* 定时任务DTO
*
* @author Lehman
* @blog https://www.lehman.top
* @since 2024/6/6 11:23
*/
@Data
public class JobVO {
/**
* 定时任务ID
*/
private String jobId;
/**
* 定时任务名称
*/
private String jobName;
/**
* 定时任务组
*/
private String jobGroup;
/**
* 目标bean名
*/
private String beanTarget;
/**
* 目标bean的方法名
*/
private String beanMethodTarget;
/**
* 执行表达式
*/
private String cronExpression;
/**
* 是否并发执行
* 0代表允许并发执行 (默认)
* 1代表不允许并发执行
*/
private Integer concurrent;
/**
* 任务描述
*/
private String jobDescription;
/**
* 创建时间
*/
private String createDate;
/**
* 更新时间
*/
private String updateDate;
/**
* 任务状态
*/
private String jobStatus;
/**
* 上次执行时间
*/
private String jobPevDate;
/**
* 下次执行时间
*/
private String jobNextDate;
}
测试
// 根据任务名称/任务分组/Bean名称获取所有JobDTO
List<JobDTO> list = sysJobService.getJobList(null,null,null);
// 新增任务
SysJob insertJob = new SysJob();
insertJob.setJobId(UUID.randomUUID().toString());
insertJob.setJobName("测试任务");
insertJob.setJobGroup("test");
insertJob.setBeanTarget("testJobBean");
insertJob.setBeanMethodTarget("printData");
insertJob.setCronExpression("0/5 * * * * ?");
insertJob.setMisfirePolicy("1");
sysJobService.insertJob(insertJob);
// 更新任务
SysJob updateJob = new SysJob();
BeanUtils.copyProperties(insertJob,updateJob);
insertJob.setJobName("更新测试任务");
insertJob.setCronExpression("0/2 * * * * ?");
sysJobService.updateJob(updateJob);
// 立即运行任务
sysJobService.run(insertJob.getJobId());
// 重启任务
sysJobService.rescheduleJob(insertJob.getJobId());
// 暂停任务
sysJobService.pauseJob(insertJob.getJobId());
// 恢复任务
sysJobService.resumeJob(insertJob.getJobId());
// 删除任务
sysJobService.deleteJob(insertJob.getJobId());
3 comments
怎么收藏这篇文章?
想想你的文章写的特别好
👍️👍️👍️