Airflow 的运行流程

对于Airflow的使用来说了解整个Airflow的运行流程是非常重要的,否则就会出现很多无法预料的行为,关于这部分,本少爷也是踩了很多的坑。

DAG,TASK和他们的实例

首先要明白一个概念,无论是DAG,还是TASK都是一个描述一个抽象的逻辑。
真正在某个时间点运行的DAG或TASK才是运行的实例。

在Airflow中,DAG的实例叫做Dag Run, Task的实例叫做Task Instance

Airflow 执行组件

Airflow的调度和执行流程中有两个核心的组件

  • Scheduler:这个是整个Airflow的调度器,Airflow所有DAG的调度过程是由Scheduler轮询来处理的。触发条件达到后,会丢给Executor执行。

  • Executor:现在的Executor有三种:

    1. SequnceExecutor:提供本地执行,并且串行执行一个DAG中的所有Task,基本上只用在初期的Airflow概念验证阶段

    2. LocalExecutor:这个是比较常用的Executor,可以在本地并行执行一个DAG内的所有Task

    3. CeleryExecutor:这个是在大型任务调度场景,或者是表较复杂的任务分离场景中需要用到的Executor。顾名思义,在这个Executor下,Airflow使用了Celery这个强大的Python分布式队列框架去分发任务,然后在这样的环境下,需要在执行任务的机器上启用Airflow Worker来处理队列中的请求。

      在一个Airflow中同时只能一个Executor启动,不能给指定的DAG指定Executor

  • Pool:这个Pool虽然不是Airflow的核心,但也跟整个Airflow的执行流程相关。任何一个Task其实都是指定了Pool这个参数的,即使没有自己指定,其实也是归结到了Default Pool这么个池子中。Pool本身是个抽象的概念,由Slot组成,可以建立任何一个Pool,指定Slot的数量。任何一个使用了这个Pool的Task Instance就需要占用一个Slot,Slot用完了,Task就处于等待状态。

Airflow 执行参数

在整个Airflow的执行流程中,有几个参数,控制了整个调度流程的并行度,但是在文档中却没有好好的写明白。

  1. parallelism:这个参数指定了整个Airflow系统,在任何一刻能同时运行的Task Instance的数量,这个数量跟DAG无关,只跟Executor和Task有关。举个例子:如果parallelism=15, 这时你有两个DAG,A和B,如果A需要同时开跑10个Task,B也要同时开跑10个Task,两个DAG同时触发,那么这时候同时在跑的Task数量只能是15,其余的5个会等之前的Task运行完了触发,这时的状态不会显示在web上。而且在这种情况下,触发的顺序是不确定的。

  2. dag_concurrency:这个参数指定了同一个Dag Run中能同时运行的Task Instance的个数

  3. max_active_runs_per_dag:这个参数指定了同一个Dag能被同时激活的Dag Run的数量

  4. non_pooled_task_slot_count:这个参数指定了默认的Pool能同时运行的Task Instance的数量,如果你的Task没有指定Pool选项,那么这个Task就是属于这个默认的Pool的

Airflow 执行状态

对于Airflow来说,Dag Run和Task Instance都有自己的执行状态,而且这两者的执行状态不关联,也就是说有可能某一个Dag Run是Success的,但是这个Dag Run里的Task Instance确是Failed或者无状态的,反之亦然。

怎么会出现这种情况呢?一般来说,正常的调度行为下,这种情况是不会出现的,但是如果说我们的Dag写错了,Task跑错了呢?

  • 错误的处理方法:直接在Dag Run的菜单中删除这个跑错的Dag Run,然后让调度器重跑,或者Backfill它

    但这时,实际上这个Dag Run跑过的Task Instance的状态还在数据库中,于是实际上根本就没有运行Task就调度器就自动判断跑完了。

    直接删除Task Instance也是一样的情况,调度器会认为这个Dag Run是Success的状态所以就不跑它。但这时可以Backfill

  • 所以正确的做法是使用Clear

    当我们Clear一个Task Instance时,这个Task Instance所属的Dag Run的状态会立即被置为Running,这样调度器就会认为这个Dag Run要继续跑。

    当然,如果我们同时删除了一批Task Instance和它们所属的Dag Run的话,调度器也会正常的重新开始执行,实际上这样的操作方式,在界面上更容易一点。

在清空状态或重跑时,暂停当前Dag的调度是比较靠谱的,否则会出现,清空到一半,当中的某个任务已经开始被调度的情况,所以最好全部清空完毕后,再打开调度器。

One more thing,还有非常重要的一点,如果当前有Task Instance在运行,这时我们如果删除了这个Task Instance的状态或者Clear它的状态,实际在后台运行着的任务并不会停止!所以需要手工Kill这个任务的运行,然后这时Scheduler进程收到了子进程(我们的运行的Task)异常退出的状态,就会把这个Task Instance的任务状态重新写成Failed,然后我们就又要清空一遍,所以在重跑任务前,一定要先停止调度,然后Kill当前正在运行的任务进程,最后清空任务状态