有条件地一一执行多个分支

有条件地一一执行多个分支

问题描述:

注意

我们的工作流程中有一个不寻常的多路复用器类用例

We have an unusual multiplexer-like use-case in our workflow

                                +-----------------------+
                                |                       |
                  +------------>+  branch-1.begin-task  |
                  |             |                       |
                  |             +-----------------------+
                  |
                  |
                  |             +-----------------------+
                  |             |                       |
                  +------------>+  branch-2.begin-task  |
                  |             |                       |
+------------+    |             +-----------------------+
|            |    |
|  MUX-task  +----+                         +
|            |    |                         |
+------------+    |
                  |                         |
                  +- -- -- -- ->
                  |                         |
                  |
                  |                         |
                  |                         +
                  |
                  |             +-----------------------+
                  |             |                       |
                  +------------>+  branch-n.begin-task  |
                                |                       |
                                +-----------------------+

流程预计如下


The flow is expected to work as follows

  • MUX-task 监听外部队列上的事件(单队列)
  • 队列中的每个事件都会触发其中一个分支(branch-n.begin-task)的执行
  • 一个接一个,当事件到达时,MUX 任务必须触发相应分支的执行
  • 一旦所有分支都被触发,MUX 任务就完成了
  • MUX-task listens for events on an external queue (single queue)
  • each event on queue triggers execution of one of the branches (branch-n.begin-task)
  • one-by-one, as events arrive, the MUX-task must trigger execution of respective branch
  • once all branches have been triggered, the MUX-task completes

假设

  • 正好 n 个事件到达队列,一个用于触发每个分支
  • n动态已知的:它的值在 变量
  • Exactly n events arrive on queue, one for triggering each branch
  • n is dynamically-known: it's value is defined in a Variable

限制

  • 事件到达的外部队列只有一个
  • 我们不能有 n 个队列(每个分支一个),因为分支随时间增长(n 是动态定义的)
  • The external queue where events arrive is only one
  • we can't have n queues (one per branch) since branches grow with time (n is dynamically defined)

我们无法在 Airflow 的一组 operators 中提出解决方案和传感器(或Airflow中可用的任何类似的东西)来构建这个

We are not able to come up with a solution within Airflow's set of operators and sensors (or any such thing available out-of-the-hood in Airflow) to build this

  1. Sensors 可用于监听外部队列上的事件;但我们必须监听多个事件,而不是一个
  2. BranchPythonOperator 可用于触发执行多个分支中的单个分支,但它立即将剩余的分支标记为已跳过
  1. Sensors can be used for listening events on external queue; but we have to listen for multiple events, not one
  2. BranchPythonOperator can be used to trigger execution of a single branch out of many, but it immediately marks remaining branches as skipped

主要瓶颈

由于上述第二个限制,即使是结合 SensorBranchPythonOperator 功能的自定义运算符也无法工作.

Because of the 2nd limitation above, even a custom-operator combining functionality of a Sensor and BranchPythonOperator won't work.

我们试图围绕 SensorsDummyOperatortrigger_rules 来实现这一点,但到目前为止还没有成功.

We have tried to brainstorm around a fancy combination of Sensors, DummyOperator and trigger_rules to achieve this, but have had no success thus far.

这在 Airflow 中可行吗?

Is this doable in Airflow?

UPDATE-1

这里有一些背景信息来了解工作流的上下文

Here's some background info to understand the context of workflow

  • 我们有一个 ETL 管道来将 MySQL 表(跨多个 Aurora 数据库)同步到我们的数据湖
  • 为了克服我们的同步管道对生产数据库的影响,我们决定这样做
    • we have an ETL pipeline to sync MySQL tables (across multiple Aurora databases) to our data-lake
    • to overcome the impact of our sync pipeline on production databases, we have decided to do this
      • for each database, create a snapshot (restore AuroraDB cluster from last backup)
      • run MySQL sync pipeline using that snapshot
      • at then end of sync, terminate the snapshot (AuroraDB cluster)
      • 所有数据库的单一队列
      • 此设置由我们的 DevOps 团队完成(不同的 AWS 帐户,我们无权访问底层 Lambdas/SQS/infra)
      • single queue for all databases
      • this setup was done by our DevOps team (different AWS account, we don't have access to the underlying Lambdas / SQS / infra)

XCOM 来救援!

我们决定对任务建模如下(两个任务都是custom operators)

We decided to model the tasks as follows (both tasks are custom operators)

  • MUX-task 更像是一个迭代传感器:它不断监听队列中的事件并对每个事件采取一些行动排队等候
  • 所有branch-x.begin-task都是简单传感器:它们监听XCOM(谁的名字在预定义的特定格式)
  • The MUX-task is more like an iterative-sensor: it keeps listening for events on queue and takes some action against each event arriving on queue
  • All branch-x.begin-tasks are simple sensors: they listen for publishing of an XCOM (who's name is in a pre-defined specific format)

工作流程如下

  • MUX-task 侦听队列上的事件(侦听部分包含在 for-loop 中,迭代次数与数字相同分行)
  • 当一个事件到达时,MUX-task 会接收它;它确定应该触发哪个分支"并为相应的分支发布XCOM
  • 各自分支的sensor在下一次poke时接收到XCOM,然后分支开始执行.实际上,branch 的 sensor 仅仅充当了一个网关,它向外部事件 (XCOM) 开放并允许分支的执行
  • The MUX-task listens for events on queue (listening part is enclosed in a for-loop with as many iterations as the number of branches)
  • When an event arrives, the MUX-task picks it up; it identifies which 'branch' should be triggered and publishes an XCOM for the respective branch
  • The respective branch's sensor picks up that XCOM on it's next poke and the branch starts executing. In effect, branch's sensor merely acts as a gateway that opens up with an external event (XCOM) and allows execution of branch

由于传感器太多(每个分支一个),我们很可能是 采用mode='reschedule' 来克服死锁一个>

Since there are too many sensors (one per branch), we would most likely be employing mode='reschedule' to overcome deadlocks

  • 由于所描述的方法在很大程度上依赖于轮询,因此我们认为它的效率并不高.
  • 基于反应触发的方法会更受欢迎,但我们还没有解决
  • Since the described approach relies heavily on polling, we don't deem it to be super efficient.
  • A reactive triggering based approach would be more desirable, but we haven't been able to work it out

UPDATE-1

  • 如果我们可以将每个分支建模为一个单独的DAG,而不是为每个分支发布XCOM分支,触发分支的 DAG 就像 TriggerDagRunOperator 一样
  • 但是由于我们的整体DAG是通过复杂的逻辑以编程方式生成的,因此这种更改将非常困难(大量代码重写).因此,我们决定继续使用基于民意调查的方法,并在已经需要几个小时才能完成的管道中忍受几分钟的额外延迟
  • Looks like 'reactive' approach is achievable if we could model each branch as a separate DAG and instead of publishing XCOMs for each branch, trigger the branch's DAG just like TriggerDagRunOperator does
  • But since our monolithic DAG is generated programmatically via complex logic, this change would have been quite hard (lots of code rewrite). So we decided to continue with the poll-based approach and live with few minutes of extra delay in a pipeline that already takes several hours to complete

UPDATE-2

[参考问题的UPDATE-1部分]

由于我们的实际实现只需要等待数据库的创建,我们决定将工作流程简化如下

Since our actual implementation required us to just wait for creation of database, we decided to simplify the workflow as follows

  • 通过DNS 修复了数据库端点(它们不会在每次Aurora 快照恢复时更改)
  • 我们取消了MUX-task(以及Aurora 恢复生命周期事件SQS 队列)
  • 每个分支的开始任务branch-x.begin-task 都被建模为一个简单的sensor,它试图触发一个假人 SQL 查询 (SELECT 1) 以检查数据库端点是否已激活
  • database endpoints were fixed via DNS (they didn't change every time Aurora snapshot was restored)
  • we did away with the MUX-task (and so also the SQS queue for Aurora restore lifecycle events)
  • each branch's begin-task branch-x.begin-task was modelled as a simple sensor that tried firing a dummy SQL query (SELECT 1) to check if database endpoint has become active or not