探寻用户自定义定时任务的实践方案

导读

工作中会遇到一些由用户自定义定时任务的业务场景,常用的开源框架(如 XXL-Job、Quartz)设计的初衷是给开发人员使用,并不适合开放给用户创建大量的自定义任务。本文借鉴开源框架定时任务作业的思想,结合 j.u.c 的 ScheduledExecutor,提供一种定时任务的实现方法,以解决用户自定义定时任务场景的问题。希望对大家有所帮助。

作者:杨凯 | 网易智企资深开发工程师

用户自定义定时任务

谈到定时任务的实现,我们优先想到的是引入优秀的开源框架方案去解决,常见的开源产品上文也提到过,如Quartz、XXL-Job、ElasticJob 等,但是开源框架应用到用户自定义任务上,存在以下需要问题或不足:

  • 开源框架从任务创建到执行有一套标准方案,用户自定义任务在何时,何地插入符合开源框架标准任务并能控制生效、停止是一个需要考虑的复杂问题。
  • 开源框架(如 XXL-Job)对任务的管理和业务容器是解耦的,如果用户要完成任务的创建、修改需要业务服务反向调用操作任务中心,这不符合任务中心设计原则。
  • 开源框架设计的初衷是给程序开发者创建和控制任务。一般情况下,任务执行的策略、目的都比较明确,不像用户自定义任务存在频繁修改和相同业务背景多个任务定义使用同一个处理逻辑。
  • 开源框架未提供用户友好的任务配置界面。

设计用户自定义任务组件除了要考虑上面的问题,还需要站在用户角度思考用户自定义任务的特点:

  • 开始和结束可控

用户自定义定时任务业务依赖性强,可以多次创建和更新任务,但不会执行,也会在任务执行期间人为停止。所以任务组件要将业务任务创建和作业任务的创建区分,只创建、加载用户确定执行的任务。

  • 执行策略和执行时间对用户友好

程序开发者创建定时任务,执行策略(单个任务循环、单次)和执行时间是由配置的 Cron 表达式确定,但是 Cron 表达式对用户不友好,容易配置出错。用户自定义定时任务在设置定时策略和执行时间时,需要提供用户友好的配置界面,任务组件内部转换成对应的 Cron 表达式。

  • 执行时间范围可控

完成一、二步的配置后,需要给用户提供一个任务执行的时间范围,在这个时间范围内才会执行任务。 简单的用户自定义定时任务的界面如下:

image.png

清楚了用户自定义定时任务的特点,定义任务模型 TaskScheduleDefine 为:

属性 注释
id 任务的唯一标识
busId 业务维度的 ID:可以根据业务背景决定是唯一还是指定
taskName 任务名称
beanName 任务处理类实例名称
cron cron 表达式
startTime 用户定义的开始时间
endTime 用户定义的结束时间
isPermanent 是否永久任务
multiple 是否允许同一时间任务任务并行执行
once 是否单次任务
valid 任务是否有效

定时任务执行周期

定时任务从创建到执行可分为如下阶段:

2.png

  • 创建:界面化的配置(如 XXL-Job),代码配置(如 Quartz,spring-schedule)。
  • 加载:任务加载到应用缓存,可以在创建时进行,但实际上任务创建和加载任务是分开的,比如当任务被修改时,实际上是有一个更新的过程的,可以把这种更新叫做任务的重载。
  • 调度:判断被加载的任务是否满足执行条件(如果支持分布式调度要决定那一台服务器去执行),如果满足,开始执行。
  • 执行:开源框架都会完成上面的三个步骤(调度中心或应用本身),业务开发者只用关注业务逻辑部分,做到任务调度和业务执行解耦。

本文介绍的任务组件也是基于这个思想去实现用户自定义任务。

用户自定义任务设计

应用启动时,初始化任务加载线程和任务调度线程(类似于 XXL-Job 的 scheduleThread 和 ringThread)

//上传+加载,支持本地和数据库任务
uploadAndLoadDefinition();
//初始化调度, 调度由维护任务来处理,由调度任务来唤起相应的具体执行
internalScheduledExecutor.scheduleAtFixedRate(new SpringTaskMonitor(), 10, 45, TimeUnit.SECONDS);
//定义维护 
internalScheduledExecutor.scheduleAtFixedRate(new SpringTaskDefinitionMonitor(), 1, 2, TimeUnit.MINUTES);
复制代码

任务创建

3.png

将业务任务执行和停止与作业任务创建和失效关联,达到用户自定义定时任务的初衷,作业任务完全由用户决定。

任务加载

4.png

任务加载使用 j.u.c 提供的定时任务线程池 ScheduledThreadPoolExecutor 的 scheduleAtFixedRate 方法,周期性的触发任务的加载,保证缓存中任务的及时更新。不同的是用户自定义任务一般都是提前创建好的,不需要不间断的去查询,而且可以通过开始和结束时间双重保证任务正确触发。

注册任务部分逻辑:

//获取全部任务列表defineList更新任务
 defineList.forEach(t -> {
        String key = t.getBeanName() + t.getBusId();
        val task = TaskDefinitions.registered(key);
        //没有(并且有效),就添加
        if (task == null) {
            if (t.getValid()) {
                TaskDefinitions.registerTask(new ScheduleTask(t));
                changedList.add(t);
            }
        }
        //有,就替换定义
        else {
            boolean changed = task.updateDefine(t);
            if (changed) {
                changedList.add(t);
            }
        }
    });
    //打印变化的任务日志
}
//ScheduleTask任务定义,updateDefine这个对象的属性
public class ScheduleTask {
    private long id;
    private TaskScheduleDefine localScheduleDefine;
    private CronSequenceGenerator cronGenerator;
    public ScheduleTask(TaskScheduleDefine taskScheduleDefine) {
        this.id = taskScheduleDefine.getId();
        this.localScheduleDefine = taskScheduleDefine;    
    }
}
复制代码

任务调度

5.png

调度任务的部分逻辑:

public class SpringTaskMonitor implements Runnable {
    private static Date DATE_INIT = new Date();
    @Override
    public void run() {
        ExceptionUtils.doActionLogE(this::doRun);
    }
    private void doRun() throws Throwable {
        val taskScheduleDefineMapper = ApplicationContextUtils.getReadyApplicationContext().getBean(TaskScheduleDefineMapper.class);
        val taskScheduleRecordMapper = ApplicationContextUtils.getReadyApplicationContext().getBean(TaskScheduleRecordMapper.class);
        TaskDefinitions.getTaskMap().values().forEach(t -> {
            //1.无效任务
            if (!t.getLocalScheduleDefine().getValid()) {
                return;
            }
            //2.设置了过期时间
            Date now = new Date();
            if (!t.getLocalScheduleDefine().getIsPermanent()) {
                Date endTime = t.getLocalScheduleDefine().getEndTime();
                if (null == endTime || endTime.before(now)) {
                    TaskDefinitions.getTaskMap().remove(t.getLocalScheduleDefine().getBeanName() + t.getLocalScheduleDefine().getBusId());
                    taskScheduleDefineMapper.updateTaskValid(t.getLocalScheduleDefine().getId(), false);
                    return;
                }
            }

            val lastRecord = taskScheduleRecordMapper.getLast(t.getLocalScheduleDefine().getId());
            Date date = lastRecord == null ? DATE_INIT : lastRecord.getExecuteDate();
            boolean shouldRun = false;
            Date nextDate = t.cronGenerator().next(date);
            //首次执行且执行时间未到重置开始时间
            if (null != t.getLocalScheduleDefine().getStartTime() && nextDate.before(t.getLocalScheduleDefine().getStartTime())) {
                DATE_INIT = new Date();
                log.warn("任务执行时间未到设置的开始时间,重新设置系统时间{},本次任务忽略:{}", DateUtil.formatDate(DATE_INIT, "yyyy-MM-dd HH:mm:ss"), GsonUtil.toJson(t));
                return;
            }
            if (DateUtils.addSeconds(nextDate, 30).before(now)) {
                shouldRun = true;
            }
            if (shouldRun) {
                TaskWork localWork = (TaskWork) ApplicationContextUtils.getReadyApplicationContext().getBean(t.getLocalScheduleDefine().getBeanName());
                SpringTaskExecutor.getExecutorService().submit(() -> localWork.runJob(t));
            }
        });
    }
}
复制代码

上述流程较清晰的还原了任务调度的一些主要逻辑。从任务调度的部分代码中可以看出,整个调度过程异常被捕获,出现异常不会影响下一次的调度执行,任务的 misfire 问题处理策略是:

  • 任务过了用户的设定时间不执行
  • 任务未到用户的设定时间不执行
  • 任务首次执行出了异常(以数据库执行记录为准),以当前时间为触发频率立刻触发一次执行,然后按照 Cron 频率依次执行(类似类似于 Quartz 的默认 withMisfireHandlingInstructionFireAndProceed 模式)
  • 定时任务已有执行记录,以错过的第一个频率时间立刻开始执行,重做错过的所有频率周期后,重当下一次触发频率发生时间大于当前时间后,再按照正常的 Cron 频率依次执行(类似于 Quartz的withMisfireHandlingInstructionIgnoreMisfires 模式)

另外,需要考虑的是在同一个业务场景下,用户会创建多个任务定义,但它们执行的业务逻辑是一样的(执行策略,执行时间等不一样)。

6.png

任务执行

任务调度提交的任务给线程池处理,执行前后根据任务定义对任务做一些通用处理(黄色框部分),具体的执行业务逻辑交给接口 LocalWork 实现类的 execute() 方法处理。

7.png

/**
 * description: 辅助来完成默认的localWork方法
 */
public class TaskWorkUtils {
    static void helpRun(TaskWork localWork, ScheduleTask scheduleTask) {
    //部分伪代码如下
    }
 }
//是否任务有执行过
boolean executed = false;
TaskScheduleRecord record = null;
Date executeDate = new Date();
try {
   //根据需要决定是否获取锁后执行(redisLock,zkLock,dbLock都可以,保证任务唯一执行)
   String lockName = localWork.getClass().getSimpleName() + scheduleTask.getLocalScheduleDefine().getBusId();
   //获取不到锁return
   //获取到执行下面逻辑
    record = ExceptionUtils.doFunLogE(() -> {
        TaskScheduleRecord newRecord = buildRecord(scheduleTask, executeDate);
        newRecord.setId(taskRecordService.save(newRecord));
        return newRecord;
    });
    //如果不能保存成功,表示出现了数据库异常,相应状态不能存取,则直接返回,不再执行
    if (record == null) {
        return;
    }
    executed = true;
    localWork.execute(record);
} catch (Throwable throwable) {
    log.error("执行任务时出现异常信息:{}", throwable.getMessage(), throwable);
    e = throwable;
} finally {
    //释放锁:releaseLock()
    //记录异常日志,更新任务状态和失败原因
    if (record != null) { 
        }
    }
    if (!scheduleTask.getLocalScheduleDefine().getOnce()&&executed) {
       Date next = scheduleTask.cronGenerator().next(executeDate);
       long delay = next.getTime() - executeDate.getTime();
       SpringTaskExecutor.getExecutorService().schedule(() -> localWork.runJob(scheduleTask), delay, TimeUnit.MILLISECONDS);
        }
    }
复制代码

如果要保证任务在集群中保证唯一执行可通过分布式锁实现,具体的key已给参考,因为没有提供集群节点注册的功能,负载均衡的调度只能依赖集群中节点获取锁的随机性,即那个节点获取到锁,任务在哪个节点执行。

当任务执行出错时(保存完执行记录后),不影响下一次任务的执行,但会更新此次任务执行的结果和失败原因。

任务设计小结

应用启动时,初始化任务,开启任务加载线程,开启任务调度线程。任务加载线程周期性的从 DB 中获取全部任务,并更新缓存中任务实例;调度线程负责对任务定义实例进行一系列的判断,决定是否交给执行线程池去执行,任务加载和调用可以使用一个定时线程池。

private ScheduledExecutorService internalScheduledExecutor = new ScheduledThreadPoolExecutor(2, 
            new ThreadFactoryBuilder().setNameFormat("task-internal-%d").build());
复制代码

执行任务的线程池接收到提交的任务,执行前后做统一处理,任务执行的具体业务逻辑交给具体的实现类去做。整个处理流程中,需要两张表(任务定义表+任务执行记录表),2 个定时线程池可完成。

总结

本文基于用户自定义定时任务的特点,从任务创建、任务加载、任务调度、任务执行四个方面详细的介绍了任务执行的过程,对定时任务中常见的问题和处理过程附带了部分代码供参考,在支持一般定时任务的同时给大家提供了一种用户自定义定时任务的实践方法。

更多技术干货,欢迎关注【网易智企技术+】微信公众号