Python Function Wrapper

A decorator covert Python function into pydolphinscheduler’s task.

Example

# [start package_import]
# Import ProcessDefinition object to define your workflow attributes
from pydolphinscheduler.core.process_definition import ProcessDefinition

# Import task Shell object cause we would create some shell tasks later
from pydolphinscheduler.tasks.func_wrap import task

# [end package_import]


# [start task_declare]
@task
def task_parent():
    """First task in this workflow."""
    print("echo hello pydolphinscheduler")


@task
def task_child_one():
    """Child task will be run parallel after task ``task_parent`` finished."""
    print("echo 'child one'")


@task
def task_child_two():
    """Child task will be run parallel after task ``task_parent`` finished."""
    print("echo 'child two'")


@task
def task_union():
    """Last task in this workflow."""
    print("echo union")


# [end task_declare]


# [start workflow_declare]
with ProcessDefinition(
    name="tutorial_decorator",
    schedule="0 0 0 * * ? *",
    start_time="2021-01-01",
    tenant="tenant_exists",
) as pd:
    # [end workflow_declare]

    # [start task_relation_declare]
    task_group = [task_child_one(), task_child_two()]
    task_parent().set_downstream(task_group)

    task_union() << task_group
    # [end task_relation_declare]

    # [start submit_or_run]
    pd.run()
    # [end submit_or_run]

Dive Into

Task function wrapper allows using decorator to create a task.

pydolphinscheduler.tasks.func_wrap._get_func_str(func: LambdaType) str[source]

Get Python function string without indent from decorator.

Parameters:

func – The function which wraps by decorator @task.

pydolphinscheduler.tasks.func_wrap.task(func: LambdaType)[source]

Decorate which covert Python function into pydolphinscheduler task.