Tutorial
This tutorial shows you the basic concept of PyDolphinScheduler and tells all things you should know before you submit or run your first workflow. If you still have not installed PyDolphinScheduler and start DolphinScheduler, you could go and see how to getting start PyDolphinScheduler firstly.
Overview of Tutorial
Here have an overview of our tutorial, and it looks a little complex but does not worry about that because we explain this example below as detail as possible.
There are two types of tutorials: traditional and task decorator.
Traditional Way: More general, support many built-in task types, it is convenient when you build your workflow at the beginning.
Task Decorator: A Python decorator allow you to wrap your function into pydolphinscheduler’s task. Less versatility to the traditional way because it only supported Python functions and without build-in tasks supported. But it is helpful if your workflow is all built with Python or if you already have some Python workflow code and want to migrate them to pydolphinscheduler.
# [start package_import]
# Import ProcessDefinition object to define your workflow attributes
from pydolphinscheduler.core.process_definition import ProcessDefinition
# Import task Shell object cause we would create some shell tasks later
from pydolphinscheduler.tasks.shell import Shell
# [end package_import]
# [start workflow_declare]
with ProcessDefinition(
name="tutorial",
schedule="0 0 0 * * ? *",
start_time="2021-01-01",
tenant="tenant_exists",
) as pd:
# [end workflow_declare]
# [start task_declare]
task_parent = Shell(name="task_parent", command="echo hello pydolphinscheduler")
task_child_one = Shell(name="task_child_one", command="echo 'child one'")
task_child_two = Shell(name="task_child_two", command="echo 'child two'")
task_union = Shell(name="task_union", command="echo union")
# [end task_declare]
# [start task_relation_declare]
task_group = [task_child_one, task_child_two]
task_parent.set_downstream(task_group)
task_union << task_group
# [end task_relation_declare]
# [start submit_or_run]
pd.run()
# [end submit_or_run]
# [start package_import]
# Import ProcessDefinition object to define your workflow attributes
from pydolphinscheduler.core.process_definition import ProcessDefinition
# Import task Shell object cause we would create some shell tasks later
from pydolphinscheduler.tasks.func_wrap import task
# [end package_import]
# [start task_declare]
@task
def task_parent():
"""First task in this workflow."""
print("echo hello pydolphinscheduler")
@task
def task_child_one():
"""Child task will be run parallel after task ``task_parent`` finished."""
print("echo 'child one'")
@task
def task_child_two():
"""Child task will be run parallel after task ``task_parent`` finished."""
print("echo 'child two'")
@task
def task_union():
"""Last task in this workflow."""
print("echo union")
# [end task_declare]
# [start workflow_declare]
with ProcessDefinition(
name="tutorial_decorator",
schedule="0 0 0 * * ? *",
start_time="2021-01-01",
tenant="tenant_exists",
) as pd:
# [end workflow_declare]
# [start task_relation_declare]
task_group = [task_child_one(), task_child_two()]
task_parent().set_downstream(task_group)
task_union() << task_group
# [end task_relation_declare]
# [start submit_or_run]
pd.run()
# [end submit_or_run]
Import Necessary Module
First of all, we should import the necessary module which we would use later just like other Python packages.
# Import ProcessDefinition object to define your workflow attributes
from pydolphinscheduler.core.process_definition import ProcessDefinition
# Import task Shell object cause we would create some shell tasks later
from pydolphinscheduler.tasks.shell import Shell
In tradition tutorial we import pydolphinscheduler.core.process_definition.ProcessDefinition
and
pydolphinscheduler.tasks.shell.Shell
.
If you want to use other task type you could click and see all tasks we support
# Import ProcessDefinition object to define your workflow attributes
from pydolphinscheduler.core.process_definition import ProcessDefinition
# Import task Shell object cause we would create some shell tasks later
from pydolphinscheduler.tasks.func_wrap import task
In task decorator tutorial we import pydolphinscheduler.core.process_definition.ProcessDefinition
and
pydolphinscheduler.tasks.func_wrap.task()
.
Process Definition Declaration
We should instantiate pydolphinscheduler.core.process_definition.ProcessDefinition
object after we
import them from import necessary module. Here we declare basic arguments for process definition(aka, workflow).
We define the name of ProcessDefinition
, using Python context manager and it the only required argument
for ProcessDefinition. Besides, we also declare three arguments named schedule
and start_time
which setting workflow schedule interval and schedule start_time, and argument tenant
defines which tenant
will be running this task in the DolphinScheduler worker. See section tenant in
PyDolphinScheduler Concepts for more information.
with ProcessDefinition(
name="tutorial",
schedule="0 0 0 * * ? *",
start_time="2021-01-01",
tenant="tenant_exists",
) as pd:
with ProcessDefinition(
name="tutorial_decorator",
schedule="0 0 0 * * ? *",
start_time="2021-01-01",
tenant="tenant_exists",
) as pd:
We could find more detail about ProcessDefinition
in concept about process definition
if you are interested in it. For all arguments of object process definition, you could find in the
pydolphinscheduler.core.process_definition
API documentation.
Task Declaration
We declare four tasks to show how to create tasks, and both of them are simple tasks of
pydolphinscheduler.tasks.shell
which runs echo command in the terminal. Besides the argument
command with echo
command, we also need to set the argument name for each task
(not only shell task, `name` is required for each type of task).
task_parent = Shell(name="task_parent", command="echo hello pydolphinscheduler")
task_child_one = Shell(name="task_child_one", command="echo 'child one'")
task_child_two = Shell(name="task_child_two", command="echo 'child two'")
task_union = Shell(name="task_union", command="echo union")
Besides shell task, PyDolphinScheduler supports multiple tasks and you could find in Tasks.
We declare four tasks to show how to create tasks, and both of them are created by the task decorator which
using pydolphinscheduler.tasks.func_wrap.task()
. All we have to do is add a decorator named
@task
to existing Python function, and then use them inside pydolphinscheduler.core.process_definition
@task
def task_parent():
"""First task in this workflow."""
print("echo hello pydolphinscheduler")
@task
def task_child_one():
"""Child task will be run parallel after task ``task_parent`` finished."""
print("echo 'child one'")
@task
def task_child_two():
"""Child task will be run parallel after task ``task_parent`` finished."""
print("echo 'child two'")
@task
def task_union():
"""Last task in this workflow."""
print("echo union")
It makes our workflow more Pythonic, but be careful that when we use task decorator mode mean we only use Python function as a task and could not use the built-in tasks most of the cases.
Setting Task Dependence
After we declare both process definition and task, we have four tasks that are independent and will be running in parallel. If you want to start one task until some task is finished, you have to set dependence on those tasks.
Set task dependence is quite easy by task’s attribute set_downstream
and set_upstream
or by
bitwise operators >>
and <<
In this tutorial, task task_parent is the leading task of the whole workflow, then task task_child_one and task task_child_two are its downstream tasks. Task task_union will not run unless both task task_child_one and task task_child_two was done, because both two task is task_union’s upstream.
task_group = [task_child_one, task_child_two]
task_parent.set_downstream(task_group)
task_union << task_group
task_group = [task_child_one(), task_child_two()]
task_parent().set_downstream(task_group)
task_union() << task_group
Note
We could set task dependence in batch mode if they have the same downstream or upstream by declaring those tasks as task groups. In tutorial, We declare task task_child_one and task_child_two as task group named task_group, then set task_group as downstream of task task_parent. You could see more detail in Tasks Dependence for more detail about how to set task dependence.
Submit Or Run Workflow
After that, we finish our workflow definition, with four tasks and task dependence, but all these things are local, we should let the DolphinScheduler daemon know how the definition of workflow. So the last thing we have to do is submit the workflow to the DolphinScheduler daemon.
Fortunately, we have a convenient method to submit workflow via ProcessDefinition attribute run
which
will create workflow definition as well as workflow schedule.
pd.run()
pd.run()
At last, we could execute this workflow code in your terminal like other Python scripts, running
python tutorial.py
to trigger and execute it.
Note
If you do not start your DolphinScheduler API server, you could find how to start it in
Start Python Gateway Service for more detail. Besides attribute run
, we have attribute
submit
for object ProcessDefinition which just submits workflow to the daemon but does not set
the workflow schedule information. For more detail, you could see Process Definition.
DAG Graph After Tutorial Run
After we run the tutorial code, you could log in DolphinScheduler web UI, go and see the DolphinScheduler project page. They is a new process definition be created by PyDolphinScheduler and it named “tutorial” or “tutorial_decorator”. The task graph of workflow like below:
--> task_child_one
/ \
task_parent --> --> task_union
\ /
--> task_child_two