Python Function Decorate
A decorator covert Python function into pydolphinscheduler’s task. Python function decorator use decorate
@task
from from pydolphinscheduler.tasks.func_wrap import task
to convert Python function into
a single Python task of dolphinscheduler.
Because we have to covert the whole Python definition into multiple Python task in dolphinscheduler, and all of the seperated Python task will be executed in the different Python process, so we need to separate not only the python function code, but also the all variables and the imported modules related to decorated function.
For example, we decorated function depend_import
in definition
import time
@task
def depend_import():
time.sleep(2)
and we can see functon depend_import
depend on other modules, it use time.sleep(2)
from module time
to sleep 2 seconds. So when we want to separate this function into dolphinscheduler task, need to include the imported
time
module.
which means we not only post code
def depend_import():
time.sleep(2)
depend_import()
to dolphinscheduler Python task, we post the dependencies of this function as well, so you will see this in dolphinscheduler Python task to make it work. And if you use the global variables or other function in the decorated function, it will also including them as well.
import time
def depend_import():
time.sleep(2)
depend_import()
Note
We use third party library stmdency to get the dependencies statement of current function, so if you find some unexpected behavior you can report bug to apache-dolphinscheduler or stmdency.
Example
# [start package_import]
# Import Workflow object to define your workflow attributes
from pydolphinscheduler.core.workflow import Workflow
# Import task Shell object cause we would create some shell tasks later
from pydolphinscheduler.tasks.func_wrap import task
# [end package_import]
scope_global = "global-var"
# [start task_declare]
@task
def print_something():
"""First task in this workflow."""
print("hello python function wrap task")
@task
def depend_import():
"""Depend on import module."""
time.sleep(2)
@task
def depend_global_var():
"""Depend on global var."""
print(f"Use global variable {scope_global}")
@task
def depend_local_var():
"""Depend on local variable."""
scope_global = "local"
print(f"Use local variable overwrite global {scope_global}")
def foo():
"""Call in other task."""
print("this is a global function")
@task
def depend_func():
"""Depend on global function."""
foo()
# [end task_declare]
# [start workflow_declare]
with Workflow(
name="tutorial_decorator",
schedule="0 0 0 * * ? *",
start_time="2021-01-01",
tenant="tenant_exists",
) as workflow:
# [end workflow_declare]
# [start task_relation_declare]
task_group = [depend_import(), depend_global_var()]
print_something().set_downstream(task_group)
task_group >> depend_local_var() >> depend_func()
# [end task_relation_declare]
# [start submit_or_run]
workflow.submit()
# [end submit_or_run]
Dive Into
Task function wrapper allows using decorator to create a task.