azkaban整体工作流程

发布时间:2022-07-02 发布网站:脚本宝典
脚本宝典收集整理的这篇文章主要介绍了azkaban整体工作流程脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。

1、工作流程

WEB:
 
ExecutorServlet web端执行一个流的入口
  1、ajaxExecuteFlow执行这个方法
    1、getProjectAjaxByPermission执行这个方法,判断用户是否有权限执行这个工程
    2、final ExecutableFlow exflow = FlowUtils.createExecutableFlow(project, flow);获取一个ExecutableFlow对象
    3、executorManagerAdapter.submitExecutableFlow(exflow, user.getUserId()); 提交这个flow
ExecutorManager  flow的管理类,上面把flow提交到了这个类
  1、submitExecutableFlow
    1、exflow.isLocked(),判断flow的状态
    2、this.queuedFlows,判断放flow的队列是不是满了,满了就报错
    3、this.queuedFlows.enqueue(exflow, reference);把flow放到队列中
  2、QueueProcessorThread这个线程负责消费queuedFlows队列里面的flow
    1、if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow
            || currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {  根据当前时间和最后一次更新executor的时间与更新时间窗口做对比,已提交的flow跟允许提交的flow数量做对比,满足其一,向exector发送请求,获取exector的状态,包括cpu,mem,上次是否提交flow,以及运行的flow总数
    2、if (exflow.getUpdateTime() > lastExecutorRefreshTime) 这块用来判断flow的最后更新时间,跟最后一次刷新executor的时间做对比,如果更新时间晚的话,则sleep到刷新executor状态的时候再进行提交
    3、selectExecutorAndDispatchFlow(reference, exflow); 选择executor并且提交flow
      1、selectExecutor(exflow, remainingExecutors);
         1、getUserSpecifiedExecutor(exflow.getExecutionOptions(),
              exflow.getExecutionId() 指定executorId,直接去数据库里面查找wxwcutor是否存在
         2、final ExecutorSelector selector = new ExecutorSelector(ExecutorManager.this.filterList,
            ExecutorManager.this.comparatorWeightsMap); 上述方式没有找到executor的情况下,创建一个ExecutorSelector选择器,
         3、choosenExecutor = selector.getBest(availableExecutors, exflow); 根据executor的状态,上述刷新executor状态时候获取到的各项指标,进行对比,选择一个合适的executor
      2、dispatch(reference, exflow, selectedExecutor); 找到executor以后进行flow的分配,发送到相应的executor
 
 
EXEC:
 
ExecutorServlet exec端执行一个流的入口
  1、handleAjaxExecute
    1、this.flowRunnerManager.submitFlow(execId); 具体执行的那个流的id,后面通过这个id去数据库获取flow的具体配置信息。
FlowRunnerManager
  1、submitFlow
    1、isAlreadyRunning(execId) 判断这个flow是否正在运行
    2、final FlowRunner runner = createFlowRunner(execId); 通过execId创建一个FlowRunner对象
    3、final Future<?> future = this.executorService.submit(runner); 往线程池里面提交这个FlowRunner对象
    4、submitFlowRunner(runner); 提交这个flow去运行
  2、submitFlowRunner
    1、this.submittedFlows.put(future, runner.getExecutionId()); 往submittedFlows队列里面提交这个flow,submittedFlows只是为了记录已经提交的flow
FlowRunner
  1、run(),直接运行run方法
    1、setupFlowExecution 添加配置信息
    2、runFlow(); 真正的运行这个flow
      1、runReadyJob 判断flow里面node的状态,也就是job的状态,从第一个job开始运行
      2、runExecutableNode 运行具体的job
        1、 prepareJobProperties(node); 准备job的配置文件
        2、final JobRunner runner = createJobRunner(node); 创建一个JobRunner对象,
        3、this.executorService.submit(runner); executorService是一个指定线程数量的线程池
        this.executorService = Executors.newFixedThreadPool(this.numJobThreads,
            new ThreadFactoryBuilder().setNameFormat("azk-job-pool-%d").build());
        4、this.activeJobRunners.add(runner); activeJobRunners是一个记录正在运行job的队列
JobRunner 提交到executorService线程池以后开始运行
  1、run(),直接运行run方法
    1、doRun();
      1、createAttachmentFile(); 创建job的工作目录
      2、createLogger(); 创建一个job运行日志的追加器
      3、uploadExecutableNode(); 往数据库中插入正在运行的job
      4、prepareJob() 判断job是否准备好
        1、finalStatus = runJob(); 运行这个job
  2、runJob()
    1、this.job.run();
      1、执行的是ProcessJob的实现类
ProcessJob
  1、run() 执行job的方法,
    1、resolveProps(); 解析配置文件
    2、this.process.run(); 执行的方法
  2、public void run()  一直堵塞直到job执行完成
    1、ProcessBuilder 使用java自带的实现来执行cmd
    2、LogGobbler 一个获取job日志的方法
    
判断任务是否执行完成
JobRunner对象生成的时候有一个FlowWatcher对象,监听job的状态
FlowRunner在判断到执行成功的job时,会执行这个方法,finalizeFlow(),如果这个job是这个flow的最下游job的话,那么就把这个flow的状态改成执行成功,如果不是,就拿到job的下游job,继续执行。
      

 

脚本宝典总结

以上是脚本宝典为你收集整理的azkaban整体工作流程全部内容,希望文章能够帮你解决azkaban整体工作流程所遇到的问题。

如果觉得脚本宝典网站内容还不错,欢迎将脚本宝典推荐好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。
标签:数据库