Tutorial
This tutorial show you the basic concept of PyDolphinScheduler and tell all things you should know before you submit or run your first workflow. If you still not install PyDolphinScheduler and start Apache DolphinScheduler, you could go and see how to getting start PyDolphinScheduler
Overview of Tutorial
Here have an overview of our tutorial, and it look a little complex but do not worry about that because we explain this example below as detailed as possible.
# [start package_import]
# Import ProcessDefinition object to define your workflow attributes
from pydolphinscheduler.core.process_definition import ProcessDefinition
# Import task Shell object cause we would create some shell tasks later
from pydolphinscheduler.tasks.shell import Shell
# [end package_import]
# [start workflow_declare]
with ProcessDefinition(
name="tutorial",
schedule="0 0 0 * * ? *",
start_time="2021-01-01",
tenant="tenant_exists",
) as pd:
# [end workflow_declare]
# [start task_declare]
task_parent = Shell(name="task_parent", command="echo hello pydolphinscheduler")
task_child_one = Shell(name="task_child_one", command="echo 'child one'")
task_child_two = Shell(name="task_child_two", command="echo 'child two'")
task_union = Shell(name="task_union", command="echo union")
# [end task_declare]
# [start task_relation_declare]
task_group = [task_child_one, task_child_two]
task_parent.set_downstream(task_group)
task_union << task_group
# [end task_relation_declare]
# [start submit_or_run]
pd.run()
# [end submit_or_run]
Import Necessary Module
First of all, we should importing necessary module which we would use later just
like other Python package. We just create a minimum demo here, so we just import
pydolphinscheduler.core.process_definition
and
pydolphinscheduler.tasks.shell
.
# Import ProcessDefinition object to define your workflow attributes
from pydolphinscheduler.core.process_definition import ProcessDefinition
# Import task Shell object cause we would create some shell tasks later
from pydolphinscheduler.tasks.shell import Shell
If you want to use other task type you could click and see all tasks we support
Process Definition Declaration
We should instantiate object after we import them from import necessary module. Here we declare basic arguments for process definition(aka, workflow). We define the name of process definition, using Python context manager and it the only required argument for object process definition. Beside that we also declare three arguments named schedule, start_time which setting workflow schedule interval and schedule start_time, and argument tenant which changing workflow’s task running user in the worker, section tenant in PyDolphinScheduler Concepts page have more detail information.
with ProcessDefinition(
name="tutorial",
schedule="0 0 0 * * ? *",
start_time="2021-01-01",
tenant="tenant_exists",
) as pd:
We could find more detail about process definition in
concept about process definition if you interested in it.
For all arguments of object process definition, you could find in the
pydolphinscheduler.core.process_definition
api documentation.
Task Declaration
Here we declare four tasks, and bot of them are simple task of
pydolphinscheduler.tasks.shell
which running echo command in terminal.
Beside the argument command, we also need setting argument name for each task (not
only shell task, `name` is required for each type of task).
task_parent = Shell(name="task_parent", command="echo hello pydolphinscheduler")
task_child_one = Shell(name="task_child_one", command="echo 'child one'")
task_child_two = Shell(name="task_child_two", command="echo 'child two'")
task_union = Shell(name="task_union", command="echo union")
Beside shell task, PyDolphinScheduler support multiple tasks and you could find in Tasks.
Setting Task Dependence
After we declare both process definition and task, we have one workflow with four tasks, both all tasks is independent so that they would run in parallel. We should reorder the sort and the dependence of tasks. It useful when we need run prepare task before we run actual task or we need tasks running is specific rule. We both support attribute set_downstream and set_upstream, or bitwise operators >> and <<.
In this example, we set task task_parent is the upstream task of task task_child_one and task_child_two, and task task_union is the downstream task of both these two task.
task_group = [task_child_one, task_child_two]
task_parent.set_downstream(task_group)
task_union << task_group
Please notice that we could grouping some tasks and set dependence if they have same downstream or upstream. We declare task task_child_one and task_child_two as a group here, named as task_group and set task task_parent as upstream of both of them. You could see more detail in Tasks Dependence section in concept documentation.
Submit Or Run Workflow
Now we finish our workflow definition, with task and task dependence, but all these things are in local, we should let Apache DolphinScheduler daemon know what we define our workflow. So the last thing we have to do here is submit our workflow to Apache DolphinScheduler daemon.
We here in the example using ProcessDefinition attribute run to submit workflow to the daemon, and set the schedule time we just declare in process definition declaration.
Now, we could run the Python code like other Python script, for the basic usage run
python tutorial.py
to trigger and run it.
pd.run()
If you not start your Apache DolphinScheduler server, you could find the way in Start Python Gateway Server and it would have more detail about related server start. Beside attribute run, we have attribute submit for object ProcessDefinition and it just submit workflow to the daemon but not setting the schedule information. For more detail you could see Process Definition.
DAG Graph After Tutorial Run
After we run the tutorial code, you could login Apache DolphinScheduler web UI, go and see the DolphinScheduler project page. they is a new process definition be created and named “Tutorial”. It create by PyDolphinScheduler and the DAG graph as below
--> task_child_one
/ \
task_parent --> --> task_union
\ /
--> task_child_two