[分布式任务调度框架:xxl-job]执行器启动过程简单分析

什么是执行器

  1. 执行器就是指跑任务逻辑的节点
  2. 官方提供了执行器的samples,位于xxl-job\xxl-job-executor-samples

简析启动过程

  1. 以springboot版本的执行器为例子来解析
  • 首先看com.xxl.job.executor.core.config.XxlJobConfig
    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        // 设置调度中心的地址
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        // 执行器名字
        xxlJobSpringExecutor.setAppname(appname);
        // 注册到调度中心的地址,如果为空默认使用ip:port
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        // 如果admin配置了accesstoken 那么执行器也需要配置
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
        return xxlJobSpringExecutor;
    }
复制代码
  1. XxlJobSpringExecutor
  • XxlJobSpringExecutor是一个典型的spring管理的bean,需要分解一下它的继承实现结构以及它的生命周期方法
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean
复制代码
  • XxlJobExecutor这是执行的关键类,我的理解XxlJobSpringExecutor只是用来适配spring的生命周期来达到XxlJobExecutor生命周期方法的调用
  • 初始化方法
    // start
    // com.xxl.job.core.executor.impl.XxlJobSpringExecutor
    @Override
    public void afterSingletonsInstantiated() {
        // 这一步是初始化被xxl-job注解的方法,这些方法其实就是任务逻辑
        initJobHandlerMethodRepository(applicationContext);
        // 刷新胶水代码的工厂?先保留悬念,我猜它是处理admin上直接编写代码的逻辑
        // 这部分解析放到下一个章节
        GlueFactory.refreshInstance(1);
        try {
            // 调用XxlJobExecutor生命周期的方法
            super.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    
    // 省略代码N行
    private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
       // 1.遍历上下文所有的bean
       // 2.通过MethodIntrospector筛选所有被@XxlJob标注的的方法
       // 3.读取注解的属性,拿到任务名字,初始化方法,销毁方法
       // 4.构造一个MethodJobHandler注册到jobHandlerRepository(一个map变量)
    }
复制代码
  1. XxlJobExecutor
  • 生命周期之start
public void start() throws Exception {
    // 初始化日志文件,如果没有指定,默认是/data/applogs/xxl-job/jobhandler
    XxlJobFileAppender.initLogPath(logPath);
    // 初始化调度中心的列表
    // 本质上就是构造AdminBizClient,然后保存到列表里
    initAdminBizList(adminAddresses, accessToken);
    // 如方法名,日志文件清理线程
    JobLogFileCleanThread.getInstance().start(logRetentionDays);
    // 触发器回调,启动方法里会创建2个线程去执行轮询逻辑
    TriggerCallbackThread.getInstance().start();
    // 初始化内置服务器
    initEmbedServer(address, ip, port, appname, accessToken);
}
复制代码
  • 触发器回调
//com.xxl.job.core.thread.TriggerCallbackThread#start
//省略N行代码
public void start() {
    // 1.判断调度中心列表是否为空,空则推出
    // 2.启动回调线程
    // callback
    // 用一个toStop变量来控制循环是否退出,推出后后面还有一段兜底逻辑,会把
    // 队列里所有剩下的元素拉出来处理
    // 这里主要分析回调是啥
    while(!toStop){
        try {
            // com.xxl.job.core.thread.TriggerCallbackThread#callBackQueue
            // callBackQueue是一个阻塞型的队列
            // HandleCallbackParam (handleCode,handleMsg)
            HandleCallbackParam callback = getInstance().callBackQueue.take();
            if (callback != null) {
                // 一次性取出队列所有元素组装成的一个list取调用doCallback
                if (callbackParamList!=null && callbackParamList.size()>0) {
                    doCallback(callbackParamList);
                }
            }
        } catch (Exception e) {
            if (!toStop) {
                logger.error(e.getMessage(), e);
            }
        }
    }
    
    // 启动重试线程,主要是从回调失败的记录文件里拿出记录重新回调
   
}

// 回调的主要代码
private void doCallback(List<HandleCallbackParam> callbackParamList){
    // callback, will retry if error
    // 这里会对所有的调度中心进行回调,只要有一个回调失败就会重试
    // 如果没记错,官方提倡提供一个域名,屏蔽掉调度中心的地址列表
    // 所以这段代码我理解是“不同”的调度中心
    // 回调的逻辑是:一个http请求,至于这个请求会触发admin什么逻辑,以后会分析的
    // 目前猜测是任务执行完毕,回调调度中心,类似一个ACK的东西
    // return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
    // 失败的话会插入到失败重试的文件  
}
复制代码
  • 启动的最后一步,EmbedServer start!!!
public void start(final String address, final int port, final String appname, final String accessToken) {
    // ExecutorBiz 这个接口定义了心跳,任务kill,任务运行的接口
    // Impl实现了里面的“真正逻辑”,还有一个实现类主要是http请求ExecutorBizClient
    executorBiz = new ExecutorBizImpl();
    // 构建一条线程
    // 然后启动了一个http服务器,实现是基于Netty(后面也会有netty相对应的学习笔记)
    // 省略N行代码
    // 前面3个handler都是netty内置的,有兴趣的同学可以学一下netty相关的东西
    // 处理空闲连接
    addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
    // http请求的编解码
    addLast(new HttpServerCodec())
    // http对象聚合器
    addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
    // 这个就是xxl-job自己实现的http处理器了,传入刚才的业务处理类和一个max=200的线程池
    addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
复制代码
  • EmbedHttpServerHandler
@Override
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
    // 省略N行代码
    // 丢进线程池去执行,持有ctx可以进行异步写回,不会阻塞netty处理器的线程
    bizThreadPool.execute(new Runnable() {
        @Override
        public void run() {
            // 这段代码会对请求做分发
            Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
            // 序列化对象
            // 写回去
        }
    });
}

private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
    // 省略N行前置验证
    // services mapping
    try {
        // 心跳处理
        if ("/beat".equals(uri)) {
            return executorBiz.beat();
        } else if ("/idleBeat".equals(uri)) {
            // 空闲心跳处理
            // 这个参数里带了个jobid,任务id
            // 利用jobid去查找有没有对应的jobThread,如果没有代表该任务是空闲的
            IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
            return executorBiz.idleBeat(idleBeatParam);
        } else if ("/run".equals(uri)) {
            // 任务处理
            // TriggerParam 参数有点多,感兴趣的伙伴自己去看下
            // 后面我们会重点讲这段里面的逻辑
            TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
            return executorBiz.run(triggerParam);
        } else if ("/kill".equals(uri)) {
            // 停止任务
            KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
            return executorBiz.kill(killParam);
        } else if ("/log".equals(uri)) {
            // log请求
            LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
            return executorBiz.log(logParam);
        } else {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
        }
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
        return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
    }
}
复制代码
  • com.xxl.job.core.biz.impl.ExecutorBizImpl#run
// 这段代码的分析以第一次执行来备注
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
    // 获取管理这个jobid的job线程
    // job线程持有一个LinkedBlockingQueue,和一个id集合(集合是用来避免admin重复投递相同的任务的)
    JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
    // 第一次获取不到job线程,所以绑定的job处理器也是空
    // 这个处理器就是最开始初始化的时候,扫描xxljob注解的方法组装的类
    IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
    String removeOldReason = null;

    GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
    if (GlueTypeEnum.BEAN == glueTypeEnum) {
        // 这里就根据name直接去jobHandlerRepository获取一个处理器
        IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
        // 校验
        if (jobThread!=null && jobHandler != newJobHandler) {
            // change handler, need kill old thread
            removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
            jobThread = null;
            jobHandler = null;
        }

        // 变量替换
        if (jobHandler == null) {
            jobHandler = newJobHandler;
            if (jobHandler == null) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
            }
        }

    } 
    // 省略非Bean模式的代码执行,后面讲glue的时候再回过头来看

    // 第一次进来,所以job线程为null,所以这段逻辑先绕过
    // 后面会回过头来讲这一段东西
    if (jobThread != null) {
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
        if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
            // discard when running
            if (jobThread.isRunningOrHasQueue()) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
            }
        } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
            // kill running jobThread
            if (jobThread.isRunningOrHasQueue()) {
                removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                jobThread = null;
            }
        } else {
            // just queue trigger
        }
    }
    
    // 注册一个job线程
    // 注册线程的代码主要是传入jobid和处理器,然后启动(启动代码下面分析,目前猜测它不是马上执行任务的)
    // 同时会把旧的关联jobid的线程拿出来停止和中断
    if (jobThread == null) {
        jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
    }

    // 把任务推到job线程的队列和集合
    // jobthread启动是不断拉取队列的元素进行消费的
    ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
    return pushResult;
}
复制代码
  • 接下来看JobThread启动后干了什么(com.xxl.job.core.thread.JobThread#run)
// 篇幅原因我会适当省略代码
@Override
public void run() {
  // 初始化方法执行
  // 注意这个地方,这个初始化方法就是@XxlJob的init()
  // jobThread启动的时候才会执行一次,并不是每次执行execute方法都会执行init和destroy
  handler.init();
  // 接下里是一个循环不断从队列中拉取元素
  // toStop依然是一个volatile变量用来控制jobThread是否运行的
  while(!toStop){
    // 接下来会创建一个针对当前日期的当前jobid的日志文件,把本次job的日志打到对应的文本
    // 这也是为什么admin后台可以精准针对每个job的日志rolling了
    // 接下来就是任务执行,先检查传过来的执行超时时间
    if (triggerParam.getExecutorTimeout() > 0) {
        Thread futureThread = null;
        try {
          // 如果存在执行超时时间,其实本质上就是利用一个线程单独去处理,返回future,然后对future结果
          // 获取执行超时设置
          FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
            @Override
            public Boolean call() throws Exception {
              XxlJobContext.setXxlJobContext(xxlJobContext);
              handler.execute();
              return true;
            }
          });
          futureThread = new Thread(futureTask);
          futureThread.start();
          Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
        } catch (TimeoutException e) {
          // 省略N行
        } finally {
          futureThread.interrupt();
        }
      } else {
        // 没超时直接执行
        handler.execute();
      }
      
      // 上面的逻辑都是从队列获取到的元素不为空
      // 那么有个变量 idleTimes 是判断连续30次都从队列获取不到元素,那就推出这个jobthread了
      if (idleTimes > 30) {
        if(triggerQueue.size() == 0) {  // avoid concurrent trigger causes jobId-lost
          XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
        }
      }
      
      // 后面还有一段finally逻辑,会把任务处理结果丢进一个队列,然后有一条异步线程拉取然后http推回admin
      // 就是上文提到的TriggerCallbackThread
      TriggerCallbackThread.pushCallBack
  }
}
复制代码

结尾

  1. 上面留了一些问题,后面的文章会继续解析
  • callback回调后admin做了什么?
  • glue代码是怎么执行的?
  1. 上面简单分析了一整个启动过程,总结一下
  • 我们可以把xxl-job这种分布式任务调度框架简单理解为:每个执行器其实就是一个http服务器,然后依然是请求应答模式,只是现在的请求是admin发出的,然后执行器就执行任务
  • 以往的单点任务调度把任务和调度2件事放在了一起,所以不方便做集群,分布式处理。现在就是调度抽离出去到admin,执行器就做任务处理,分而治之