airflow dag_run

用于Airflow,通过手动触发的自动化工作流程,集成多个节点,实时监控DAG运行状态,确保及时处理失败或超时情况,提升工作效率和可靠性。

2025/7/8
12 个节点
复杂
yseldq3zfwb0levn手动复杂stopanderrorexecuteworkflowtriggerwait高级api集成逻辑条件路由
分类:
Complex WorkflowManual TriggeredBusiness Process Automation
集成服务:
StopAndErrorExecuteWorkflowTriggerWait

适用人群

该工作流适用于以下人群:
- 数据工程师:需要自动化管理 Airflow DAG 运行的任务。
- DevOps 工程师:希望通过 API 整合工作流与 Airflow 进行交互。
- 项目经理:需要监控 DAG 运行状态和结果,以便做出及时决策。
- 开发者:希望在项目中实现复杂的工作流逻辑,以提高工作效率。

解决的问题

此工作流解决了以下问题:
- 手动触发 DAG 运行:用户可以通过 API 手动触发 Airflow 的 DAG 运行。
- 状态监控:实时监控 DAG 的运行状态(如:queuedrunningsuccessfailed),并根据状态做出相应处理。
- 超时处理:设置等待时间,若超时则自动终止运行并返回错误信息。
- 结果获取:从 DAG 运行中提取任务的结果,便于后续处理。

工作流程

工作流过程的详细说明如下:
1. 初始化参数:使用 ExecuteWorkflowTrigger 节点接收输入参数(如:dag_idtask_idconfwaitwait_time)。
2. 设置 API 前缀:通过 Set 节点定义 Airflow API 的基础 URL。
3. 触发 DAG 运行:使用 HTTP Request 节点向 Airflow API 发送 POST 请求,启动指定的 DAG 运行。
4. 获取 DAG 运行状态:通过 HTTP Request 节点查询 DAG 运行的状态,并判断状态是否为 queued
5. 等待处理:若状态为 queued,则进入 Wait 节点,等待指定的时间。
6. 状态判断:使用 Switch 节点根据 DAG 的状态进行分支处理,分别处理不同的状态(如:成功、失败、运行中等)。
7. 计数与超时判断:通过 Code 节点记录等待次数,并判断是否超过最大等待时间,若超过则返回超时错误。
8. 获取任务结果:若 DAG 运行成功,使用 HTTP Request 节点获取任务的返回值结果。

自定义指南

用户可以通过以下方式自定义和调整此工作流:
- 修改输入参数:在 ExecuteWorkflowTrigger 节点中添加或修改输入参数,以适应不同的 DAG 运行需求。
- 调整 API URL:在 Set 节点中修改 Airflow API 的基础 URL,以连接到不同的 Airflow 实例。
- 更改状态处理逻辑:在 Switch 节点中添加或修改状态条件,以实现特定状态的处理逻辑。
- 修改超时设置:在 If count > wait_time 节点中调整等待时间和最大重试次数,以适应不同的业务需求。