Python Function Decorate

A decorator covert Python function into pydolphinscheduler’s task. Python function decorator use decorate @task from from pydolphinscheduler.tasks.func_wrap import task to convert Python function into a single Python task of dolphinscheduler.

Because we have to covert the whole Python definition into multiple Python task in dolphinscheduler, and all of the seperated Python task will be executed in the different Python process, so we need to separate not only the python function code, but also the all variables and the imported modules related to decorated function.

For example, we decorated function depend_import in definition

import time

@task
def depend_import():
    time.sleep(2)

and we can see functon depend_import depend on other modules, it use time.sleep(2) from module time to sleep 2 seconds. So when we want to separate this function into dolphinscheduler task, need to include the imported time module.

which means we not only post code

def depend_import():
    time.sleep(2)

depend_import()

to dolphinscheduler Python task, we post the dependencies of this function as well, so you will see this in dolphinscheduler Python task to make it work. And if you use the global variables or other function in the decorated function, it will also including them as well.

import time

def depend_import():
    time.sleep(2)

depend_import()

Note

We use third party library stmdency to get the dependencies statement of current function, so if you find some unexpected behavior you can report bug to apache-dolphinscheduler or stmdency.

Example

# [start package_import]
# Import Workflow object to define your workflow attributes
from pydolphinscheduler.core.workflow import Workflow

# Import task Shell object cause we would create some shell tasks later
from pydolphinscheduler.tasks.func_wrap import task

# [end package_import]

scope_global = "global-var"


# [start task_declare]
@task
def print_something():
    """First task in this workflow."""
    print("hello python function wrap task")


@task
def depend_import():
    """Depend on import module."""
    time.sleep(2)


@task
def depend_global_var():
    """Depend on global var."""
    print(f"Use global variable {scope_global}")


@task
def depend_local_var():
    """Depend on local variable."""
    scope_global = "local"
    print(f"Use local variable overwrite global {scope_global}")


def foo():
    """Call in other task."""
    print("this is a global function")


@task
def depend_func():
    """Depend on global function."""
    foo()


# [end task_declare]


# [start workflow_declare]
with Workflow(
    name="tutorial_decorator",
    schedule="0 0 0 * * ? *",
    start_time="2021-01-01",
    tenant="tenant_exists",
) as workflow:
    # [end workflow_declare]

    # [start task_relation_declare]
    task_group = [depend_import(), depend_global_var()]
    print_something().set_downstream(task_group)

    task_group >> depend_local_var() >> depend_func()
    # [end task_relation_declare]

    # [start submit_or_run]
    workflow.submit()
    # [end submit_or_run]

Dive Into

Task function wrapper allows using decorator to create a task.

pydolphinscheduler.tasks.func_wrap._exists_other_decorator(func: LambdaType) None[source]

Check if the function has other decorators except @task.

Parameters:

func – The function which wraps by decorator @task.

pydolphinscheduler.tasks.func_wrap.task(func: LambdaType)[source]

Decorate which covert Python functions into pydolphinscheduler task.

Parameters:

func – The function which wraps by decorator @task.