Condition

A condition task type’s example and dive into information of PyDolphinScheduler.

Example

r"""
A example workflow for task condition.

This example will create five task in single workflow, with four shell task and one condition task. Task
condition have one upstream which we declare explicit with syntax `parent >> condition`, and three downstream
automatically set dependence by condition task by passing parameter `condition`. The graph of this workflow
like:
pre_task_1 ->                     -> success_branch
             \                  /
pre_task_2 ->  -> conditions ->
             /                  \
pre_task_3 ->                     -> fail_branch
.
"""

from pydolphinscheduler.core.workflow import Workflow
from pydolphinscheduler.tasks.condition import FAILURE, SUCCESS, And, Condition
from pydolphinscheduler.tasks.shell import Shell

with Workflow(name="task_condition_example", tenant="tenant_exists") as workflow:
    pre_task_1 = Shell(name="pre_task_1", command="echo pre_task_1")
    pre_task_2 = Shell(name="pre_task_2", command="echo pre_task_2")
    pre_task_3 = Shell(name="pre_task_3", command="echo pre_task_3")
    cond_operator = And(
        And(
            SUCCESS(pre_task_1, pre_task_2),
            FAILURE(pre_task_3),
        ),
    )

    success_branch = Shell(name="success_branch", command="echo success_branch")
    fail_branch = Shell(name="fail_branch", command="echo fail_branch")

    condition = Condition(
        name="condition",
        condition=cond_operator,
        success_task=success_branch,
        failed_task=fail_branch,
    )
    workflow.submit()

Dive Into

Task Conditions.

class pydolphinscheduler.tasks.condition.And(*args)[source]

Bases: ConditionOperator

Operator And for task condition.

It could accept both Task and children of ConditionOperator, and set AND condition to those args.

class pydolphinscheduler.tasks.condition.Condition(name: str, condition: ConditionOperator, success_task: Task, failed_task: Task, *args, **kwargs)[source]

Bases: Task

Task condition object, declare behavior for condition task to dolphinscheduler.

_set_dep() None[source]

Set upstream according to parameter condition.

_downstream_task_codes: Set[int]
_task_relation: Set[TaskRelation]
_timeout: timedelta
_upstream_task_codes: Set[int]
property condition_result: Dict

Get condition result define for java gateway.

property task_params: Dict

Override Task.task_params for Condition task.

Condition task have some specials attribute dependence, and in most of the task this attribute is None and use empty dict {} as default value. We do not use class attribute _task_custom_attr due to avoid attribute cover.

class pydolphinscheduler.tasks.condition.ConditionOperator(*args)[source]

Bases: Base

Set ConditionTask or ConditionOperator with specific operator.

get_define(camel_attr=True) Dict[source]

Overwrite Base.get_define to get task Condition specific get define.

classmethod operator_name() str[source]

Get operator name in different class.

set_define_attr() str[source]

Set attribute to function get_define().

It is a wrapper for both And and Or operator.

_DEFINE_ATTR: set = {'relation'}
property relation: str

Get operator name in different class, for function get_define().

class pydolphinscheduler.tasks.condition.FAILURE(*tasks)[source]

Bases: Status

Class FAILURE to task condition, sub class of Status.

class pydolphinscheduler.tasks.condition.Or(*args)[source]

Bases: ConditionOperator

Operator Or for task condition.

It could accept both Task and children of ConditionOperator, and set OR condition to those args.

class pydolphinscheduler.tasks.condition.SUCCESS(*tasks)[source]

Bases: Status

Class SUCCESS to task condition, sub class of Status.

class pydolphinscheduler.tasks.condition.Status(*tasks)[source]

Bases: Base

Base class of Condition task status.

It a parent class for SUCCESS and FAILURE. Provider status name and get_define() to sub class.

get_define(camel_attr: bool = True) List[source]

Get status definition attribute communicate to Java gateway server.

classmethod status_name() str[source]

Get name for Status or its sub class.

YAML file example

# Define the workflow
workflow:
  name: "Condition"

# Define the tasks within the workflow
tasks:
  - { "task_type": "Shell", "name": "pre_task_1", "command": "echo pre_task_1" }
  - { "task_type": "Shell", "name": "pre_task_2", "command": "echo pre_task_2" }
  - { "task_type": "Shell", "name": "pre_task_3", "command": "echo pre_task_3" }
  - { "task_type": "Shell", "name": "success_branch", "command": "echo success_branch" }
  - { "task_type": "Shell", "name": "fail_branch", "command": "echo fail_branch" }

  - name: condition
    task_type: Condition
    success_task: success_branch
    failed_task: fail_branch
    op: AND
    groups:
      - op: AND
        groups:
          - task: pre_task_1
            flag: true
          - task: pre_task_2
            flag: true
          - task: pre_task_3
            flag: false