Python Function Wrapper
A decorator covert Python function into pydolphinscheduler’s task.
Example
# [start package_import]
# Import ProcessDefinition object to define your workflow attributes
from pydolphinscheduler.core.process_definition import ProcessDefinition
# Import task Shell object cause we would create some shell tasks later
from pydolphinscheduler.tasks.func_wrap import task
# [end package_import]
# [start task_declare]
@task
def task_parent():
"""First task in this workflow."""
print("echo hello pydolphinscheduler")
@task
def task_child_one():
"""Child task will be run parallel after task ``task_parent`` finished."""
print("echo 'child one'")
@task
def task_child_two():
"""Child task will be run parallel after task ``task_parent`` finished."""
print("echo 'child two'")
@task
def task_union():
"""Last task in this workflow."""
print("echo union")
# [end task_declare]
# [start workflow_declare]
with ProcessDefinition(
name="tutorial_decorator",
schedule="0 0 0 * * ? *",
start_time="2021-01-01",
tenant="tenant_exists",
) as pd:
# [end workflow_declare]
# [start task_relation_declare]
task_group = [task_child_one(), task_child_two()]
task_parent().set_downstream(task_group)
task_union() << task_group
# [end task_relation_declare]
# [start submit_or_run]
pd.run()
# [end submit_or_run]
Dive Into
Task function wrapper allows using decorator to create a task.