Switch

A switch task type’s example and dive into information of PyDolphinScheduler.

Example

r"""
A example workflow for task switch.

This example will create four task in single workflow, with three shell task and one switch task. Task switch
have one upstream which we declare explicit with syntax `parent >> switch`, and two downstream automatically
set dependence by switch task by passing parameter `condition`. The graph of this workflow like:
                      --> switch_child_1
                    /
parent -> switch ->
                    \
                      --> switch_child_2
.
"""

from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.tasks.shell import Shell
from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondition

with ProcessDefinition(
    name="task_switch_example", tenant="tenant_exists", param={"var": "1"}
) as pd:
    parent = Shell(name="parent", command="echo parent")
    switch_child_1 = Shell(name="switch_child_1", command="echo switch_child_1")
    switch_child_2 = Shell(name="switch_child_2", command="echo switch_child_2")
    switch_condition = SwitchCondition(
        Branch(condition="${var} > 1", task=switch_child_1),
        Default(task=switch_child_2),
    )

    switch = Switch(name="switch", condition=switch_condition)
    parent >> switch
    pd.submit()

Dive Into

Task Switch.

class pydolphinscheduler.tasks.switch.Branch(condition: str, task: Task)[source]

Bases: SwitchBranch

Common condition branch for switch task.

If any condition in Branch match, would set this Branch’s task as downstream of task switch. If all condition branch do not match would set Default’s task as task switch downstream.

class pydolphinscheduler.tasks.switch.Default(task: Task)[source]

Bases: SwitchBranch

Class default branch for switch task.

If all condition of Branch do not match, task switch would run the tasks in Default and set Default’s task as switch downstream. Please notice that each switch condition could only have one single Default.

class pydolphinscheduler.tasks.switch.Switch(name: str, condition: SwitchCondition, *args, **kwargs)[source]

Bases: Task

Task switch object, declare behavior for switch task to dolphinscheduler.

Param of process definition or at least one local param of task must be set if task switch in this workflow.

_set_dep() None[source]

Set downstream according to parameter condition.

_downstream_task_codes: Set[int]
_task_relation: Set[TaskRelation]
_upstream_task_codes: Set[int]
property task_params: Dict

Override Task.task_params for switch task.

switch task have some specials attribute switch, and in most of the task this attribute is None and use empty dict {} as default value. We do not use class attribute _task_custom_attr due to avoid attribute cover.

class pydolphinscheduler.tasks.switch.SwitchBranch(task: Task, exp: str | None = None)[source]

Bases: Base

Base class of ConditionBranch of task switch.

It a parent class for Branch and Default.

get_define(camel_attr: bool = True) Dict[source]

Get ConditionBranch definition attribute communicate to Java gateway server.

_DEFINE_ATTR: set = {'next_node'}
property condition: str | None

Get task switch property condition.

property next_node: str

Get task switch property next_node, it return task code when init class switch.

class pydolphinscheduler.tasks.switch.SwitchCondition(*args)[source]

Bases: Base

Set switch condition of given parameter.

get_define(camel_attr=True) Dict[source]

Overwrite Base.get_define to get task Condition specific get define.

set_define_attr() None[source]

Set attribute to function get_define().

It is a wrapper for both And and Or operator.

_DEFINE_ATTR: set = {'depend_task_list'}