Dependent
A dependent task type’s example and dive into information of PyDolphinScheduler.
Example
r"""
A example workflow for task dependent.
This example will create two workflows named `task_dependent` and `task_dependent_external`.
`task_dependent` is true workflow define and run task dependent, while `task_dependent_external`
define outside workflow and task from dependent.
After this script submit, we would get workflow as below:
task_dependent_external:
task_1
task_2
task_3
task_dependent:
task_dependent(this task dependent on task_dependent_external.task_1 and task_dependent_external.task_2).
"""
from pydolphinscheduler import configuration
from pydolphinscheduler.core.workflow import Workflow
from pydolphinscheduler.tasks.dependent import And, Dependent, DependentItem, Or
from pydolphinscheduler.tasks.shell import Shell
with Workflow(
name="task_dependent_external",
) as workflow:
task_1 = Shell(name="task_1", command="echo task 1")
task_2 = Shell(name="task_2", command="echo task 2")
task_3 = Shell(name="task_3", command="echo task 3")
workflow.submit()
with Workflow(
name="task_dependent_example",
) as workflow:
task = Dependent(
name="task_dependent",
dependence=And(
Or(
DependentItem(
project_name=configuration.WORKFLOW_PROJECT,
workflow_name="task_dependent_external",
dependent_task_name="task_1",
),
DependentItem(
project_name=configuration.WORKFLOW_PROJECT,
workflow_name="task_dependent_external",
dependent_task_name="task_2",
),
)
),
)
workflow.submit()
Dive Into
Task dependent.
- class pydolphinscheduler.tasks.dependent.And(*args)[source]
Bases:
DependentOperator
Operator And for task dependent.
It could accept both
DependentItem
and children ofDependentOperator
, and set AND condition to those args.
- class pydolphinscheduler.tasks.dependent.Dependent(name: str, dependence: DependentOperator, *args, **kwargs)[source]
Bases:
BatchTask
Task dependent object, declare behavior for dependent task to dolphinscheduler.
- _downstream_task_codes: set[int]
- _task_relation: set[TaskRelation]
- _timeout: timedelta | int
- _upstream_task_codes: set[int]
- property task_params: dict
Override Task.task_params for dependent task.
Dependent task have some specials attribute dependence, and in most of the task this attribute is None and use empty dict {} as default value. We do not use class attribute _task_custom_attr due to avoid attribute cover.
- class pydolphinscheduler.tasks.dependent.DependentDate[source]
Bases:
str
Constant of Dependent date value.
These values set according to Java server models, if you want to add and change it, please change Java server models first.
- CURRENT_HOUR = 'currentHour'
- LAST_FRIDAY = 'lastFriday'
- LAST_MONDAY = 'lastMonday'
- LAST_MONTH = 'lastMonth'
- LAST_MONTH_BEGIN = 'lastMonthBegin'
- LAST_MONTH_END = 'lastMonthEnd'
- LAST_ONE_DAYS = 'last1Days'
- LAST_ONE_HOUR = 'last1Hour'
- LAST_SATURDAY = 'lastSaturday'
- LAST_SEVEN_DAYS = 'last7Days'
- LAST_SUNDAY = 'lastSunday'
- LAST_THREE_DAYS = 'last3Days'
- LAST_THREE_HOURS = 'last3Hours'
- LAST_THURSDAY = 'lastThursday'
- LAST_TUESDAY = 'lastTuesday'
- LAST_TWENTY_FOUR_HOURS = 'last24Hours'
- LAST_TWO_DAYS = 'last2Days'
- LAST_TWO_HOURS = 'last2Hours'
- LAST_WEDNESDAY = 'lastWednesday'
- LAST_WEEK = 'lastWeek'
- THIS_MONTH = 'thisMonth'
- THIS_WEEK = 'thisWeek'
- TODAY = 'today'
- class pydolphinscheduler.tasks.dependent.DependentItem(project_name: str, workflow_name: str | None = None, dependent_task_name: str | None = '0', dependent_date: DependentDate | None = 'today', *args, **kwargs)[source]
Bases:
Base
Dependent item object, minimal unit for task dependent.
It declares which project, workflow, task are dependent to this task.
- _DEFINE_ATTR: set = {'cycle', 'date_value', 'definition_code', 'dep_task_code', 'project_code'}
- property code_parameter: tuple
Get name info parameter to query code.
- property cycle: str
Get dependent cycle.
- property date_value: str
Get dependent date.
- property definition_code: str
Get dependent definition code.
- property dep_task_code: str
Get dependent tasks code list.
- property is_all_task: bool
Check whether dependent all tasks or not.
- property project_code: str
Get dependent project code.
- class pydolphinscheduler.tasks.dependent.DependentOperator(*args)[source]
Bases:
Base
Set DependentItem or dependItemList with specific operator.
- get_define(camel_attr=True) dict [source]
Overwrite Base.get_define to get task dependent specific get define.
- set_define_attr() str [source]
Set attribute to function
get_define()
.It is a wrapper for both And and Or operator.
- _DEFINE_ATTR: set = {'relation'}
- property relation: str
Get operator name in different class, for function
get_define()
.
- class pydolphinscheduler.tasks.dependent.Or(*args)[source]
Bases:
DependentOperator
Operator Or for task dependent.
It could accept both
DependentItem
and children ofDependentOperator
, and set OR condition to those args.
YAML file example
workflow:
name: "Dependent"
# Define the tasks within the workflow
tasks:
- name: dependent
task_type: Dependent
denpendence:
op: and
groups:
- op: or
groups:
- project_name: pydolphin
workflow_name: task_dependent_external
dependent_task_name: task_1
- project_name: pydolphin
workflow_name: task_dependent_external
dependent_task_name: task_2
- op: and
groups:
- project_name: pydolphin
workflow_name: task_dependent_external
dependent_task_name: task_1
dependent_date: LAST_WEDNESDAY
- project_name: pydolphin
workflow_name: task_dependent_external
dependent_task_name: task_2
dependent_date: last24Hours
- name: dependent_var
task_type: Dependent
denpendence:
op: and
groups:
- op: or
# we can use ${CONFIG.WORKFLOW_PROJECT} to set the value to configuration.WORKFLOW_PROJECT
# we can use $WORKFLOW{"Dependent_External.yaml"} to create or update a workflow from dependent_external.yaml and set the value to that workflow name
groups:
- project_name: ${CONFIG.WORKFLOW_PROJECT}
workflow_name: $WORKFLOW{"Dependent_External.yaml"}
dependent_task_name: task_1
- project_name: ${CONFIG.WORKFLOW_PROJECT}
workflow_name: $WORKFLOW{"Dependent_External.yaml"}
dependent_task_name: task_2
- op: and
groups:
- project_name: ${CONFIG.WORKFLOW_PROJECT}
workflow_name: $WORKFLOW{"Dependent_External.yaml"}
dependent_task_name: task_1
dependent_date: LAST_WEDNESDAY
- project_name: ${CONFIG.WORKFLOW_PROJECT}
workflow_name: $WORKFLOW{"Dependent_External.yaml"}
dependent_task_name: task_2
dependent_date: last24Hours
Dependent_External.yaml:
# Define the workflow
workflow:
name: "task_dependent_external"
# Define the tasks within the workflow
tasks:
- { "task_type": "Shell", "name": "task_1", "command": "echo task 1" }
- { "task_type": "Shell", "name": "task_2", "command": "echo task 2" }
- { "task_type": "Shell", "name": "task_3", "command": "echo task 3" }