Dependent

A dependent task type’s example and dive into information of PyDolphinScheduler.

Example

r"""
A example workflow for task dependent.

This example will create two workflows named `task_dependent` and `task_dependent_external`.
`task_dependent` is true workflow define and run task dependent, while `task_dependent_external`
define outside workflow and task from dependent.

After this script submit, we would get workflow as below:

task_dependent_external:

task_1
task_2
task_3

task_dependent:

task_dependent(this task dependent on task_dependent_external.task_1 and task_dependent_external.task_2).
"""
from pydolphinscheduler.core import configuration
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.tasks.dependent import And, Dependent, DependentItem, Or
from pydolphinscheduler.tasks.shell import Shell

with ProcessDefinition(
    name="task_dependent_external",
    tenant="tenant_exists",
) as pd:
    task_1 = Shell(name="task_1", command="echo task 1")
    task_2 = Shell(name="task_2", command="echo task 2")
    task_3 = Shell(name="task_3", command="echo task 3")
    pd.submit()

with ProcessDefinition(
    name="task_dependent_example",
    tenant="tenant_exists",
) as pd:
    task = Dependent(
        name="task_dependent",
        dependence=And(
            Or(
                DependentItem(
                    project_name=configuration.WORKFLOW_PROJECT,
                    process_definition_name="task_dependent_external",
                    dependent_task_name="task_1",
                ),
                DependentItem(
                    project_name=configuration.WORKFLOW_PROJECT,
                    process_definition_name="task_dependent_external",
                    dependent_task_name="task_2",
                ),
            )
        ),
    )
    pd.submit()

Dive Into

Task dependent.

class pydolphinscheduler.tasks.dependent.And(*args)[source]

Bases: DependentOperator

Operator And for task dependent.

It could accept both DependentItem and children of DependentOperator, and set AND condition to those args.

class pydolphinscheduler.tasks.dependent.Dependent(name: str, dependence: DependentOperator, *args, **kwargs)[source]

Bases: Task

Task dependent object, declare behavior for dependent task to dolphinscheduler.

_downstream_task_codes: Set[int]
_task_relation: Set[TaskRelation]
_upstream_task_codes: Set[int]
property task_params: Dict

Override Task.task_params for dependent task.

Dependent task have some specials attribute dependence, and in most of the task this attribute is None and use empty dict {} as default value. We do not use class attribute _task_custom_attr due to avoid attribute cover.

class pydolphinscheduler.tasks.dependent.DependentDate[source]

Bases: str

Constant of Dependent date value.

These values set according to Java server side, if you want to add and change it, please change Java server side first.

CURRENT_HOUR = 'currentHour'
LAST_FRIDAY = 'lastFriday'
LAST_MONDAY = 'lastMonday'
LAST_MONTH = 'lastMonth'
LAST_MONTH_BEGIN = 'lastMonthBegin'
LAST_MONTH_END = 'lastMonthEnd'
LAST_ONE_DAYS = 'last1Days'
LAST_ONE_HOUR = 'last1Hour'
LAST_SATURDAY = 'lastSaturday'
LAST_SEVEN_DAYS = 'last7Days'
LAST_SUNDAY = 'lastSunday'
LAST_THREE_DAYS = 'last3Days'
LAST_THREE_HOURS = 'last3Hours'
LAST_THURSDAY = 'lastThursday'
LAST_TUESDAY = 'lastTuesday'
LAST_TWENTY_FOUR_HOURS = 'last24Hours'
LAST_TWO_DAYS = 'last2Days'
LAST_TWO_HOURS = 'last2Hours'
LAST_WEDNESDAY = 'lastWednesday'
LAST_WEEK = 'lastWeek'
THIS_MONTH = 'thisMonth'
THIS_WEEK = 'thisWeek'
TODAY = 'today'
class pydolphinscheduler.tasks.dependent.DependentItem(project_name: str, process_definition_name: str, dependent_task_name: str | None = '0', dependent_date: DependentDate | None = 'today')[source]

Bases: Base

Dependent item object, minimal unit for task dependent.

It declare which project, process_definition, task are dependent to this task.

get_code_from_gateway() Dict[source]

Get project, definition, task code from given parameter.

_DEFINE_ATTR: set = {'cycle', 'date_value', 'definition_code', 'dep_task_code', 'project_code'}
property code_parameter: Tuple

Get name info parameter to query code.

property cycle: str

Get dependent cycle.

property date_value: str

Get dependent date.

property definition_code: str

Get dependent definition code.

property dep_task_code: str

Get dependent tasks code list.

property is_all_task: bool

Check whether dependent all tasks or not.

property project_code: str

Get dependent project code.

class pydolphinscheduler.tasks.dependent.DependentOperator(*args)[source]

Bases: Base

Set DependentItem or dependItemList with specific operator.

get_define(camel_attr=True) Dict[source]

Overwrite Base.get_define to get task dependent specific get define.

classmethod operator_name() str[source]

Get operator name in different class.

set_define_attr() str[source]

Set attribute to function get_define().

It is a wrapper for both And and Or operator.

_DEFINE_ATTR: set = {'relation'}
property relation: str

Get operator name in different class, for function get_define().

class pydolphinscheduler.tasks.dependent.Or(*args)[source]

Bases: DependentOperator

Operator Or for task dependent.

It could accept both DependentItem and children of DependentOperator, and set OR condition to those args.