Switch

A switch task type’s example and dive into information of PyDolphinScheduler.

Example

r"""
A example workflow for task switch.

This example will create four task in single workflow, with three shell task and one switch task. Task switch
have one upstream which we declare explicit with syntax `parent >> switch`, and two downstream automatically
set dependence by switch task by passing parameter `condition`. The graph of this workflow like:
                      --> switch_child_1
                    /
parent -> switch ->
                    \
                      --> switch_child_2
.
"""

from pydolphinscheduler.core.workflow import Workflow
from pydolphinscheduler.tasks.shell import Shell
from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondition

with Workflow(name="task_switch_example", param={"var": "1"}) as workflow:
    parent = Shell(name="parent", command="echo parent")
    switch_child_1 = Shell(name="switch_child_1", command="echo switch_child_1")
    switch_child_2 = Shell(name="switch_child_2", command="echo switch_child_2")
    switch_condition = SwitchCondition(
        Branch(condition="${var} > 1", task=switch_child_1),
        Default(task=switch_child_2),
    )

    switch = Switch(name="switch", condition=switch_condition)
    parent >> switch
    workflow.submit()

Dive Into

Task Switch.

class pydolphinscheduler.tasks.switch.Branch(condition: str, task: Task)[source]

Bases: SwitchBranch

Common condition branch for switch task.

If any condition in Branch match, would set this Branch’s task as downstream of task switch. If all condition branch do not match would set Default’s task as task switch downstream.

class pydolphinscheduler.tasks.switch.Default(task: Task)[source]

Bases: SwitchBranch

Class default branch for switch task.

If all condition of Branch do not match, task switch would run the tasks in Default and set Default’s task as switch downstream. Please notice that each switch condition could only have one single Default.

class pydolphinscheduler.tasks.switch.Switch(name: str, condition: SwitchCondition, *args, **kwargs)[source]

Bases: BatchTask

Task switch object, declare behavior for switch task to dolphinscheduler.

Param of workflow or at least one local param of task must be set if task switch in this workflow.

_set_dep() None[source]

Set downstream according to parameter condition.

_downstream_task_codes: set[int]
_task_ignore_attr: set = {'condition_result', 'dependence'}
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
property task_params: dict

Override Task.task_params for switch task.

switch task have some specials attribute switch, and in most of the task this attribute is None and use empty dict {} as default value. We do not use class attribute _task_custom_attr due to avoid attribute cover.

class pydolphinscheduler.tasks.switch.SwitchBranch(task: Task, exp: str | None = None)[source]

Bases: Base

Base class of ConditionBranch of task switch.

It a parent class for Branch and Default.

get_define(camel_attr: bool = True) dict[source]

Get ConditionBranch definition attribute communicate to Java gateway server.

_DEFINE_ATTR: set = {'next_node'}
property condition: str | None

Get task switch property condition.

property next_node: str

Get task switch property next_node, it return task code when init class switch.

class pydolphinscheduler.tasks.switch.SwitchCondition(*args)[source]

Bases: Base

Set switch condition of given parameter.

get_define(camel_attr=True) dict[source]

Overwrite Base.get_define to get task Condition specific get define.

set_define_attr() None[source]

Set attribute to function get_define().

It is a wrapper for both And and Or operator.

_DEFINE_ATTR: set = {'depend_task_list'}

YAML file example

# Define the workflow
workflow:
  name: "Switch"
  param:
    var: 1

# Define the tasks within the workflow
tasks:
  - name: switch_child_1
    task_type: Shell
    command: echo switch_child_1

  - name: switch_child_2
    task_type: Shell
    command: echo switch_child_2

  - name: switch
    task_type: Switch
    condition:
      - task: switch_child_1
        condition: "${var} > 1"
      - task: switch_child_2