Dependent

A dependent task type’s example and dive into information of PyDolphinScheduler.

Example

r"""
A example workflow for task dependent.

This example will create two workflows named `task_dependent` and `task_dependent_external`.
`task_dependent` is true workflow define and run task dependent, while `task_dependent_external`
define outside workflow and task from dependent.

After this script submit, we would get workflow as below:

task_dependent_external:

task_1
task_2
task_3

task_dependent:

task_dependent(this task dependent on task_dependent_external.task_1 and task_dependent_external.task_2).
"""
from pydolphinscheduler import configuration
from pydolphinscheduler.core.workflow import Workflow
from pydolphinscheduler.tasks.dependent import And, Dependent, DependentItem, Or
from pydolphinscheduler.tasks.shell import Shell

with Workflow(
    name="task_dependent_external",
) as workflow:
    task_1 = Shell(name="task_1", command="echo task 1")
    task_2 = Shell(name="task_2", command="echo task 2")
    task_3 = Shell(name="task_3", command="echo task 3")
    workflow.submit()

with Workflow(
    name="task_dependent_example",
) as workflow:
    task = Dependent(
        name="task_dependent",
        dependence=And(
            Or(
                DependentItem(
                    project_name=configuration.WORKFLOW_PROJECT,
                    workflow_name="task_dependent_external",
                    dependent_task_name="task_1",
                ),
                DependentItem(
                    project_name=configuration.WORKFLOW_PROJECT,
                    workflow_name="task_dependent_external",
                    dependent_task_name="task_2",
                ),
            )
        ),
    )
    workflow.submit()

Dive Into

Task dependent.

class pydolphinscheduler.tasks.dependent.And(*args)[source]

Bases: DependentOperator

Operator And for task dependent.

It could accept both DependentItem and children of DependentOperator, and set AND condition to those args.

class pydolphinscheduler.tasks.dependent.Dependent(name: str, dependence: DependentOperator, *args, **kwargs)[source]

Bases: Task

Task dependent object, declare behavior for dependent task to dolphinscheduler.

_downstream_task_codes: Set[int]
_task_relation: Set[TaskRelation]
_timeout: timedelta
_upstream_task_codes: Set[int]
property task_params: Dict

Override Task.task_params for dependent task.

Dependent task have some specials attribute dependence, and in most of the task this attribute is None and use empty dict {} as default value. We do not use class attribute _task_custom_attr due to avoid attribute cover.

class pydolphinscheduler.tasks.dependent.DependentDate[source]

Bases: str

Constant of Dependent date value.

These values set according to Java server models, if you want to add and change it, please change Java server models first.

CURRENT_HOUR = 'currentHour'
LAST_FRIDAY = 'lastFriday'
LAST_MONDAY = 'lastMonday'
LAST_MONTH = 'lastMonth'
LAST_MONTH_BEGIN = 'lastMonthBegin'
LAST_MONTH_END = 'lastMonthEnd'
LAST_ONE_DAYS = 'last1Days'
LAST_ONE_HOUR = 'last1Hour'
LAST_SATURDAY = 'lastSaturday'
LAST_SEVEN_DAYS = 'last7Days'
LAST_SUNDAY = 'lastSunday'
LAST_THREE_DAYS = 'last3Days'
LAST_THREE_HOURS = 'last3Hours'
LAST_THURSDAY = 'lastThursday'
LAST_TUESDAY = 'lastTuesday'
LAST_TWENTY_FOUR_HOURS = 'last24Hours'
LAST_TWO_DAYS = 'last2Days'
LAST_TWO_HOURS = 'last2Hours'
LAST_WEDNESDAY = 'lastWednesday'
LAST_WEEK = 'lastWeek'
THIS_MONTH = 'thisMonth'
THIS_WEEK = 'thisWeek'
TODAY = 'today'
class pydolphinscheduler.tasks.dependent.DependentItem(project_name: str, workflow_name: str | None = None, dependent_task_name: str | None = '0', dependent_date: DependentDate | None = 'today', *args, **kwargs)[source]

Bases: Base

Dependent item object, minimal unit for task dependent.

It declares which project, workflow, task are dependent to this task.

get_code_from_gateway() Dict[source]

Get project, definition, task code from given parameter.

_DEFINE_ATTR: set = {'cycle', 'date_value', 'definition_code', 'dep_task_code', 'project_code'}
property code_parameter: Tuple

Get name info parameter to query code.

property cycle: str

Get dependent cycle.

property date_value: str

Get dependent date.

property definition_code: str

Get dependent definition code.

property dep_task_code: str

Get dependent tasks code list.

property is_all_task: bool

Check whether dependent all tasks or not.

property project_code: str

Get dependent project code.

class pydolphinscheduler.tasks.dependent.DependentOperator(*args)[source]

Bases: Base

Set DependentItem or dependItemList with specific operator.

get_define(camel_attr=True) Dict[source]

Overwrite Base.get_define to get task dependent specific get define.

classmethod operator_name() str[source]

Get operator name in different class.

set_define_attr() str[source]

Set attribute to function get_define().

It is a wrapper for both And and Or operator.

_DEFINE_ATTR: set = {'relation'}
property relation: str

Get operator name in different class, for function get_define().

class pydolphinscheduler.tasks.dependent.Or(*args)[source]

Bases: DependentOperator

Operator Or for task dependent.

It could accept both DependentItem and children of DependentOperator, and set OR condition to those args.

YAML file example

workflow:
  name: "Dependent"

# Define the tasks within the workflow
tasks:
  - name: dependent
    task_type: Dependent
    denpendence:
    op: and
    groups:
      - op: or
        groups:
          - project_name: pydolphin
            workflow_name: task_dependent_external
            dependent_task_name: task_1

          - project_name: pydolphin
            workflow_name: task_dependent_external
            dependent_task_name: task_2

      - op: and
        groups:
          - project_name: pydolphin
            workflow_name: task_dependent_external
            dependent_task_name: task_1
            dependent_date: LAST_WEDNESDAY 

          - project_name: pydolphin
            workflow_name: task_dependent_external
            dependent_task_name: task_2
            dependent_date: last24Hours 

  - name: dependent_var
    task_type: Dependent
    denpendence:
    op: and
    groups:
      - op: or
        # we can use ${CONFIG.WORKFLOW_PROJECT} to set the value to configuration.WORKFLOW_PROJECT
        # we can use $WORKFLOW{"Dependent_External.yaml"} to create or update a workflow from dependent_external.yaml and set the value to that workflow name
        groups:
          - project_name: ${CONFIG.WORKFLOW_PROJECT} 
            workflow_name: $WORKFLOW{"Dependent_External.yaml"} 
            dependent_task_name: task_1

          - project_name: ${CONFIG.WORKFLOW_PROJECT} 
            workflow_name: $WORKFLOW{"Dependent_External.yaml"} 
            dependent_task_name: task_2
      - op: and
        groups:
          - project_name: ${CONFIG.WORKFLOW_PROJECT} 
            workflow_name: $WORKFLOW{"Dependent_External.yaml"} 
            dependent_task_name: task_1
            dependent_date: LAST_WEDNESDAY 

          - project_name: ${CONFIG.WORKFLOW_PROJECT} 
            workflow_name: $WORKFLOW{"Dependent_External.yaml"} 
            dependent_task_name: task_2
            dependent_date: last24Hours 

Dependent_External.yaml:

# Define the workflow
workflow:
  name: "task_dependent_external"

# Define the tasks within the workflow
tasks:
  - { "task_type": "Shell", "name": "task_1", "command": "echo task 1" }
  - { "task_type": "Shell", "name": "task_2", "command": "echo task 2" }
  - { "task_type": "Shell", "name": "task_3", "command": "echo task 3" }