Condition
A condition task type’s example and dive into information of PyDolphinScheduler.
Example
r"""
A example workflow for task condition.
This example will create five task in single workflow, with four shell task and one condition task. Task
condition have one upstream which we declare explicit with syntax `parent >> condition`, and three downstream
automatically set dependence by condition task by passing parameter `condition`. The graph of this workflow
like:
pre_task_1 -> -> success_branch
\ /
pre_task_2 -> -> conditions ->
/ \
pre_task_3 -> -> fail_branch
.
"""
from pydolphinscheduler.core.workflow import Workflow
from pydolphinscheduler.tasks.condition import FAILURE, SUCCESS, And, Condition
from pydolphinscheduler.tasks.shell import Shell
with Workflow(name="task_condition_example", tenant="tenant_exists") as workflow:
pre_task_1 = Shell(name="pre_task_1", command="echo pre_task_1")
pre_task_2 = Shell(name="pre_task_2", command="echo pre_task_2")
pre_task_3 = Shell(name="pre_task_3", command="echo pre_task_3")
cond_operator = And(
And(
SUCCESS(pre_task_1, pre_task_2),
FAILURE(pre_task_3),
),
)
success_branch = Shell(name="success_branch", command="echo success_branch")
fail_branch = Shell(name="fail_branch", command="echo fail_branch")
condition = Condition(
name="condition",
condition=cond_operator,
success_task=success_branch,
failed_task=fail_branch,
)
workflow.submit()
Dive Into
Task Conditions.
- class pydolphinscheduler.tasks.condition.And(*args)[source]
Bases:
ConditionOperator
Operator And for task condition.
It could accept both
Task
and children ofConditionOperator
, and set AND condition to those args.
- class pydolphinscheduler.tasks.condition.Condition(name: str, condition: ConditionOperator, success_task: Task, failed_task: Task, *args, **kwargs)[source]
Bases:
Task
Task condition object, declare behavior for condition task to dolphinscheduler.
- _downstream_task_codes: Set[int]
- _task_relation: Set[TaskRelation]
- _timeout: timedelta
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get condition result define for java gateway.
- property task_params: Dict
Override Task.task_params for Condition task.
Condition task have some specials attribute dependence, and in most of the task this attribute is None and use empty dict {} as default value. We do not use class attribute _task_custom_attr due to avoid attribute cover.
- class pydolphinscheduler.tasks.condition.ConditionOperator(*args)[source]
Bases:
Base
Set ConditionTask or ConditionOperator with specific operator.
- get_define(camel_attr=True) Dict [source]
Overwrite Base.get_define to get task Condition specific get define.
- set_define_attr() str [source]
Set attribute to function
get_define()
.It is a wrapper for both And and Or operator.
- _DEFINE_ATTR: set = {'relation'}
- property relation: str
Get operator name in different class, for function
get_define()
.
- class pydolphinscheduler.tasks.condition.FAILURE(*tasks)[source]
Bases:
Status
Class FAILURE to task condition, sub class of
Status
.
- class pydolphinscheduler.tasks.condition.Or(*args)[source]
Bases:
ConditionOperator
Operator Or for task condition.
It could accept both
Task
and children ofConditionOperator
, and set OR condition to those args.
- class pydolphinscheduler.tasks.condition.SUCCESS(*tasks)[source]
Bases:
Status
Class SUCCESS to task condition, sub class of
Status
.
- class pydolphinscheduler.tasks.condition.Status(*tasks)[source]
Bases:
Base
Base class of Condition task status.
It a parent class for
SUCCESS
andFAILURE
. Provider status name andget_define()
to sub class.
YAML file example
# Define the workflow
workflow:
name: "Condition"
# Define the tasks within the workflow
tasks:
- { "task_type": "Shell", "name": "pre_task_1", "command": "echo pre_task_1" }
- { "task_type": "Shell", "name": "pre_task_2", "command": "echo pre_task_2" }
- { "task_type": "Shell", "name": "pre_task_3", "command": "echo pre_task_3" }
- { "task_type": "Shell", "name": "success_branch", "command": "echo success_branch" }
- { "task_type": "Shell", "name": "fail_branch", "command": "echo fail_branch" }
- name: condition
task_type: Condition
success_task: success_branch
failed_task: fail_branch
op: AND
groups:
- op: AND
groups:
- task: pre_task_1
flag: true
- task: pre_task_2
flag: true
- task: pre_task_3
flag: false