用于Airflow,通过手动触发的自动化工作流程,集成多个节点,实时监控DAG运行状态,确保及时处理失败或超时情况,提升工作效率和可靠性。
该工作流适用于以下人群:
- 数据工程师:需要自动化管理 Airflow DAG 运行的任务。
- DevOps 工程师:希望通过 API 整合工作流与 Airflow 进行交互。
- 项目经理:需要监控 DAG 运行状态和结果,以便做出及时决策。
- 开发者:希望在项目中实现复杂的工作流逻辑,以提高工作效率。
此工作流解决了以下问题:
- 手动触发 DAG 运行:用户可以通过 API 手动触发 Airflow 的 DAG 运行。
- 状态监控:实时监控 DAG 的运行状态(如:queued
、running
、success
、failed
),并根据状态做出相应处理。
- 超时处理:设置等待时间,若超时则自动终止运行并返回错误信息。
- 结果获取:从 DAG 运行中提取任务的结果,便于后续处理。
工作流过程的详细说明如下:
1. 初始化参数:使用 ExecuteWorkflowTrigger
节点接收输入参数(如:dag_id
、task_id
、conf
、wait
和 wait_time
)。
2. 设置 API 前缀:通过 Set
节点定义 Airflow API 的基础 URL。
3. 触发 DAG 运行:使用 HTTP Request
节点向 Airflow API 发送 POST 请求,启动指定的 DAG 运行。
4. 获取 DAG 运行状态:通过 HTTP Request
节点查询 DAG 运行的状态,并判断状态是否为 queued
。
5. 等待处理:若状态为 queued
,则进入 Wait
节点,等待指定的时间。
6. 状态判断:使用 Switch
节点根据 DAG 的状态进行分支处理,分别处理不同的状态(如:成功、失败、运行中等)。
7. 计数与超时判断:通过 Code
节点记录等待次数,并判断是否超过最大等待时间,若超过则返回超时错误。
8. 获取任务结果:若 DAG 运行成功,使用 HTTP Request
节点获取任务的返回值结果。
用户可以通过以下方式自定义和调整此工作流:
- 修改输入参数:在 ExecuteWorkflowTrigger
节点中添加或修改输入参数,以适应不同的 DAG 运行需求。
- 调整 API URL:在 Set
节点中修改 Airflow API 的基础 URL,以连接到不同的 Airflow 实例。
- 更改状态处理逻辑:在 Switch
节点中添加或修改状态条件,以实现特定状态的处理逻辑。
- 修改超时设置:在 If count > wait_time
节点中调整等待时间和最大重试次数,以适应不同的业务需求。