Switch
A switch task type’s example and dive into information of PyDolphinScheduler.
Example
r"""
A example workflow for task switch.
This example will create four task in single workflow, with three shell task and one switch task. Task switch
have one upstream which we declare explicit with syntax `parent >> switch`, and two downstream automatically
set dependence by switch task by passing parameter `condition`. The graph of this workflow like:
--> switch_child_1
/
parent -> switch ->
\
--> switch_child_2
.
"""
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.tasks.shell import Shell
from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondition
with ProcessDefinition(
name="task_switch_example", tenant="tenant_exists", param={"var": "1"}
) as pd:
parent = Shell(name="parent", command="echo parent")
switch_child_1 = Shell(name="switch_child_1", command="echo switch_child_1")
switch_child_2 = Shell(name="switch_child_2", command="echo switch_child_2")
switch_condition = SwitchCondition(
Branch(condition="${var} > 1", task=switch_child_1),
Default(task=switch_child_2),
)
switch = Switch(name="switch", condition=switch_condition)
parent >> switch
pd.submit()
Dive Into
Task Switch.
- class pydolphinscheduler.tasks.switch.Branch(condition: str, task: Task)[source]
Bases:
SwitchBranch
Common condition branch for switch task.
If any condition in
Branch
match, would set thisBranch
’s task as downstream of task switch. If all condition branch do not match would setDefault
’s task as task switch downstream.
- class pydolphinscheduler.tasks.switch.Default(task: Task)[source]
Bases:
SwitchBranch
Class default branch for switch task.
If all condition of
Branch
do not match, task switch would run the tasks inDefault
and setDefault
’s task as switch downstream. Please notice that each switch condition could only have one singleDefault
.
- class pydolphinscheduler.tasks.switch.Switch(name: str, condition: SwitchCondition, *args, **kwargs)[source]
Bases:
Task
Task switch object, declare behavior for switch task to dolphinscheduler.
Param of process definition or at least one local param of task must be set if task switch in this workflow.
- _downstream_task_codes: Set[int]
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property task_params: Dict
Override Task.task_params for switch task.
switch task have some specials attribute switch, and in most of the task this attribute is None and use empty dict {} as default value. We do not use class attribute _task_custom_attr due to avoid attribute cover.
- class pydolphinscheduler.tasks.switch.SwitchBranch(task: Task, exp: str | None = None)[source]
Bases:
Base
Base class of ConditionBranch of task switch.
It a parent class for
Branch
andDefault
.- get_define(camel_attr: bool = True) Dict [source]
Get
ConditionBranch
definition attribute communicate to Java gateway server.
- _DEFINE_ATTR: set = {'next_node'}
- property condition: str | None
Get task switch property condition.
- property next_node: str
Get task switch property next_node, it return task code when init class switch.
- class pydolphinscheduler.tasks.switch.SwitchCondition(*args)[source]
Bases:
Base
Set switch condition of given parameter.
- get_define(camel_attr=True) Dict [source]
Overwrite Base.get_define to get task Condition specific get define.
- set_define_attr() None [source]
Set attribute to function
get_define()
.It is a wrapper for both And and Or operator.
- _DEFINE_ATTR: set = {'depend_task_list'}