WEB:
ExecutorServlet web端执行一个流的入口
1、ajaxExecuteFlow执行这个方法
1、getProjectAjaxByPermission执行这个方法,判断用户是否有权限执行这个工程
2、final ExecutableFlow exflow = FlowUtils.createExecutableFlow(project, flow);获取一个ExecutableFlow对象
3、executorManagerAdapter.submitExecutableFlow(exflow, user.getUserId()); 提交这个flow
ExecutorManager flow的管理类,上面把flow提交到了这个类
1、submitExecutableFlow
1、exflow.isLocked(),判断flow的状态
2、this.queuedFlows,判断放flow的队列是不是满了,满了就报错
3、this.queuedFlows.enqueue(exflow, reference);把flow放到队列中
2、QueueProcessorThread这个线程负责消费queuedFlows队列里面的flow
1、if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow
|| currentContinuousFlowProcessed >= maxContinuousFlowProcessed) { 根据当前时间和最后一次更新executor的时间与更新时间窗口做对比,已提交的flow跟允许提交的flow数量做对比,满足其一,向exector发送请求,获取exector的状态,包括cpu,mem,上次是否提交flow,以及运行的flow总数
2、if (exflow.getUpdateTime() > lastExecutorRefreshTime) 这块用来判断flow的最后更新时间,跟最后一次刷新executor的时间做对比,如果更新时间晚的话,则sleep到刷新executor状态的时候再进行提交
3、selectExecutorAndDispatchFlow(reference, exflow); 选择executor并且提交flow
1、selectExecutor(exflow, remainingExecutors);
1、getUserSpecifiedExecutor(exflow.getExecutionOptions(),
exflow.getExecutionId() 指定executorId,直接去数据库里面查找wxwcutor是否存在
2、final ExecutorSelector selector = new ExecutorSelector(ExecutorManager.this.filterList,
ExecutorManager.this.comparatorWeightsMap); 上述方式没有找到executor的情况下,创建一个ExecutorSelector选择器,
3、choosenExecutor = selector.getBest(availableExecutors, exflow); 根据executor的状态,上述刷新executor状态时候获取到的各项指标,进行对比,选择一个合适的executor
2、dispatch(reference, exflow, selectedExecutor); 找到executor以后进行flow的分配,发送到相应的executor
EXEC:
ExecutorServlet exec端执行一个流的入口
1、handleAjaxExecute
1、this.flowRunnerManager.submitFlow(execId); 具体执行的那个流的id,后面通过这个id去数据库获取flow的具体配置信息。
FlowRunnerManager
1、submitFlow
1、isAlreadyRunning(execId) 判断这个flow是否正在运行
2、final FlowRunner runner = createFlowRunner(execId); 通过execId创建一个FlowRunner对象
3、final Future<?> future = this.executorService.submit(runner); 往线程池里面提交这个FlowRunner对象
4、submitFlowRunner(runner); 提交这个flow去运行
2、submitFlowRunner
1、this.submittedFlows.put(future, runner.getExecutionId()); 往submittedFlows队列里面提交这个flow,submittedFlows只是为了记录已经提交的flow
FlowRunner
1、run(),直接运行run方法
1、setupFlowExecution 添加配置信息
2、runFlow(); 真正的运行这个flow
1、runReadyJob 判断flow里面node的状态,也就是job的状态,从第一个job开始运行
2、runExecutableNode 运行具体的job
1、 prepareJobProperties(node); 准备job的配置文件
2、final JobRunner runner = createJobRunner(node); 创建一个JobRunner对象,
3、this.executorService.submit(runner); executorService是一个指定线程数量的线程池
this.executorService = Executors.newFixedThreadPool(this.numJobThreads,
new ThreadFactoryBuilder().setNameFormat("azk-job-pool-%d").build());
4、this.activeJobRunners.add(runner); activeJobRunners是一个记录正在运行job的队列
JobRunner 提交到executorService线程池以后开始运行
1、run(),直接运行run方法
1、doRun();
1、createAttachmentFile(); 创建job的工作目录
2、createLogger(); 创建一个job运行日志的追加器
3、uploadExecutableNode(); 往数据库中插入正在运行的job
4、prepareJob() 判断job是否准备好
1、finalStatus = runJob(); 运行这个job
2、runJob()
1、this.job.run();
1、执行的是ProcessJob的实现类
ProcessJob
1、run() 执行job的方法,
1、resolveProps(); 解析配置文件
2、this.process.run(); 执行的方法
2、public void run() 一直堵塞直到job执行完成
1、ProcessBuilder 使用java自带的实现来执行cmd
2、LogGobbler 一个获取job日志的方法
判断任务是否执行完成
JobRunner对象生成的时候有一个FlowWatcher对象,监听job的状态
FlowRunner在判断到执行成功的job时,会执行这个方法,finalizeFlow(),如果这个job是这个flow的最下游job的话,那么就把这个flow的状态改成执行成功,如果不是,就拿到job的下游job,继续执行。