Tutorial
This tutorial shows you the basic concept of PyDolphinScheduler and tells all things you should know before you submit or run your first workflow. If you still have not installed PyDolphinScheduler and started DolphinScheduler, you could go and see how to get started with PyDolphinScheduler firstly.
Overview of Tutorial
Here, we have an overview of our tutorial, and it looks a little complex but don’t worry about that because we will explain this example below in as much detail as possible.
There are two types of tutorials: traditional and task decorator.
Traditional Way: More general, support many built-in task types, it is convenient when you build your workflow at the beginning.
Task Decorator: A Python decorator that allows you to wrap your function into pydolphinscheduler’s task. Less versatility to the traditional way because it only supports Python functions without build-in tasks supported. But it is helpful if your workflow is all built with Python or if you already have some Python workflow code and want to migrate them to pydolphinscheduler.
YAML File: We can use pydolphinscheduler CLI to create workflow using YAML file:
pydolphinscheduler yaml -f tutorial.yaml
. We can find more YAML file examples in examples/yaml_define
# [start package_import]
# Import Workflow object to define your workflow attributes
from pydolphinscheduler.core.workflow import Workflow
# Import task Shell object cause we would create some shell tasks later
from pydolphinscheduler.tasks.shell import Shell
# [end package_import]
# [start workflow_declare]
with Workflow(
name="tutorial",
schedule="0 0 0 * * ? *",
start_time="2021-01-01",
) as workflow:
# [end workflow_declare]
# [start task_declare]
task_parent = Shell(name="task_parent", command="echo hello pydolphinscheduler")
task_child_one = Shell(
name="task_child_one",
command="""
echo "Executing line 1 with parameter str type ${param1}"
echo "Executing line 2 with parameter int type ${param2}"
echo "Executing line 3 with parameter build-in parameter currently date ${param3}"
""",
params={"param1": "str1", "param2": 123, "param3": "$[yyyy-MM-dd]"},
)
task_child_two = Shell(name="task_child_two", command="echo 'child two'")
task_union = Shell(name="task_union", command="echo union")
# [start resource_limit]
resource_limit = Shell(
name="resource_limit",
command="echo resource limit",
cpu_quota=1,
memory_max=100,
)
# [end resource_limit]
# [end task_declare]
# [start task_relation_declare]
task_group = [task_child_one, task_child_two]
task_parent.set_downstream(task_group)
resource_limit << task_union << task_group
# [end task_relation_declare]
# [start submit_or_run]
workflow.run()
# [end submit_or_run]
# [start package_import]
# Import Workflow object to define your workflow attributes
from pydolphinscheduler.core.workflow import Workflow
# Import task Shell object cause we would create some shell tasks later
from pydolphinscheduler.tasks.func_wrap import task
# [end package_import]
scope_global = "global-var"
# [start task_declare]
@task
def print_something():
"""First task in this workflow."""
print("hello python function wrap task")
@task
def depend_import():
"""Depend on import module."""
time.sleep(2)
@task
def depend_global_var():
"""Depend on global var."""
print(f"Use global variable {scope_global}")
@task
def depend_local_var():
"""Depend on local variable."""
scope_global = "local"
print(f"Use local variable overwrite global {scope_global}")
def foo():
"""Call in other task."""
print("this is a global function")
@task
def depend_func():
"""Depend on global function."""
foo()
# [end task_declare]
# [start workflow_declare]
with Workflow(
name="tutorial_decorator",
schedule="0 0 0 * * ? *",
start_time="2021-01-01",
) as workflow:
# [end workflow_declare]
# [start task_relation_declare]
task_group = [depend_import(), depend_global_var()]
print_something().set_downstream(task_group)
task_group >> depend_local_var() >> depend_func()
# [end task_relation_declare]
# [start submit_or_run]
workflow.submit()
# [end submit_or_run]
# Define the workflow
workflow:
name: "tutorial"
schedule: "0 0 0 * * ? *"
start_time: "2021-01-01"
release_state: "offline"
run: true
# Define the tasks within the workflow
tasks:
- name: task_parent
task_type: Shell
command: echo hello pydolphinscheduler
- name: task_child_one
task_type: Shell
deps: [task_parent]
command: echo "child one"
- name: task_child_two
task_type: Shell
deps: [task_parent]
command: echo "child two"
- name: task_union
task_type: Shell
deps: [task_child_one, task_child_two]
command: echo "union"
Import Necessary Module
First of all, we should import the necessary module which we would use later just like other Python packages.
# Import Workflow object to define your workflow attributes
from pydolphinscheduler.core.workflow import Workflow
# Import task Shell object cause we would create some shell tasks later
from pydolphinscheduler.tasks.shell import Shell
In tradition tutorial we import pydolphinscheduler.core.workflow.Workflow
and
pydolphinscheduler.tasks.shell.Shell
.
If you want to use other task type you could click and see all tasks we support
# Import Workflow object to define your workflow attributes
from pydolphinscheduler.core.workflow import Workflow
# Import task Shell object cause we would create some shell tasks later
from pydolphinscheduler.tasks.func_wrap import task
In task decorator tutorial we import pydolphinscheduler.core.workflow.Workflow
and
pydolphinscheduler.tasks.func_wrap.task()
.
workflow Declaration
We should instantiate pydolphinscheduler.core.workflow.Workflow
object after we
import them from import necessary module. Here we declare basic arguments for workflow.
We define the name of Workflow
, using Python context manager and it the only required argument
for Workflow. Besides, we also declare three arguments named schedule
and start_time
which sets workflow schedule interval and schedule start_time, and argument tenant
defines which tenant
will be running this task in the DolphinScheduler worker. See section tenant in
PyDolphinScheduler Concepts for more information.
with Workflow(
name="tutorial",
schedule="0 0 0 * * ? *",
start_time="2021-01-01",
) as workflow:
with Workflow(
name="tutorial_decorator",
schedule="0 0 0 * * ? *",
start_time="2021-01-01",
) as workflow:
# Define the workflow
workflow:
name: "tutorial"
schedule: "0 0 0 * * ? *"
start_time: "2021-01-01"
release_state: "offline"
run: true
We could find more details about Workflow
in concept about workflow
if you are interested in it. For all arguments of object workflow, you could find in the
pydolphinscheduler.core.workflow
API documentation.
Task Declaration
We declare four tasks to show how to create tasks, and all of them are simple tasks of
pydolphinscheduler.tasks.shell
which runs echo command in the terminal. Besides the argument
command with echo
command, we also need to set the argument name for each task
(not only shell task, `name` is required for each type of task).
task_parent = Shell(name="task_parent", command="echo hello pydolphinscheduler")
task_child_one = Shell(
name="task_child_one",
command="""
echo "Executing line 1 with parameter str type ${param1}"
echo "Executing line 2 with parameter int type ${param2}"
echo "Executing line 3 with parameter build-in parameter currently date ${param3}"
""",
params={"param1": "str1", "param2": 123, "param3": "$[yyyy-MM-dd]"},
)
task_child_two = Shell(name="task_child_two", command="echo 'child two'")
task_union = Shell(name="task_union", command="echo union")
# [start resource_limit]
resource_limit = Shell(
name="resource_limit",
command="echo resource limit",
cpu_quota=1,
memory_max=100,
)
# [end resource_limit]
Besides shell task, PyDolphinScheduler supports multiple tasks and you could find in Tasks.
We declare four tasks to show how to create tasks, and all of them are created by the task decorator which
using pydolphinscheduler.tasks.func_wrap.task()
. All we have to do is add a decorator named
@task
to existing Python function, and then use them inside pydolphinscheduler.core.workflow
@task
def print_something():
"""First task in this workflow."""
print("hello python function wrap task")
@task
def depend_import():
"""Depend on import module."""
time.sleep(2)
@task
def depend_global_var():
"""Depend on global var."""
print(f"Use global variable {scope_global}")
@task
def depend_local_var():
"""Depend on local variable."""
scope_global = "local"
print(f"Use local variable overwrite global {scope_global}")
def foo():
"""Call in other task."""
print("this is a global function")
@task
def depend_func():
"""Depend on global function."""
foo()
It makes our workflow more Pythonic, but be careful that when we use task decorator mode, it means we only use Python function as a task and could not use the built-in tasks most of the cases.
tasks:
- name: task_parent
task_type: Shell
command: echo hello pydolphinscheduler
- name: task_child_one
task_type: Shell
deps: [task_parent]
command: echo "child one"
- name: task_child_two
task_type: Shell
deps: [task_parent]
command: echo "child two"
- name: task_union
task_type: Shell
deps: [task_child_one, task_child_two]
command: echo "union"
Setting Task Dependence
After we declare both workflow and task, we have four tasks that are independent and will be running in parallel. If you want to start one task until some task is finished, you have to set dependence on those tasks.
Set task dependence is quite easy by task’s attribute set_downstream
and set_upstream
or by
bitwise operators >>
and <<
In this tutorial, task task_parent is the leading task of the whole workflow, then task task_child_one and task task_child_two are its downstream tasks. Task task_union will not run unless both task task_child_one and task task_child_two was done, because both two task is task_union’s upstream.
task_group = [task_child_one, task_child_two]
task_parent.set_downstream(task_group)
resource_limit << task_union << task_group
task_group = [depend_import(), depend_global_var()]
print_something().set_downstream(task_group)
task_group >> depend_local_var() >> depend_func()
We can use deps:[]
to set task dependence
tasks:
- name: task_parent
task_type: Shell
command: echo hello pydolphinscheduler
- name: task_child_one
task_type: Shell
deps: [task_parent]
command: echo "child one"
- name: task_child_two
task_type: Shell
deps: [task_parent]
command: echo "child two"
- name: task_union
task_type: Shell
deps: [task_child_one, task_child_two]
command: echo "union"
Note
We could set task dependence in batch mode if they have the same downstream or upstream by declaring those tasks as task groups. In tutorial, We declare task task_child_one and task_child_two as task group named task_group, then set task_group as downstream of task task_parent. You could see more detail in Tasks Dependence for more detail about how to set task dependence.
Submit Or Run Workflow
After that, we finish our workflow definition, with four tasks and task dependence, but all these things are local, we should let the DolphinScheduler daemon know how the definition of workflow. So the last thing we have to do is submit the workflow to the DolphinScheduler daemon.
Fortunately, we have a convenient method to submit workflow via Workflow attribute run
which
will create workflow definition as well as workflow schedule.
workflow.run()
workflow.submit()
pydolphinscheduler YAML CLI always submit workflow. We can run the workflow if we set run: true
# Define the workflow
workflow:
name: "tutorial"
run: true
At last, we could execute this workflow code in your terminal like other Python scripts, running
python tutorial.py
to trigger and execute it.
Note
If you do not start your DolphinScheduler API server, you could find how to start it in
Start Python Gateway Service for more detail. Besides attribute run
, we have attribute
submit
for object Workflow which just submits workflow to the daemon but does not set
the workflow schedule information. For more detail, you could see Workflow.
DAG Graph After Tutorial Run
After we run the tutorial code, you could log in DolphinScheduler web UI, go and see the DolphinScheduler project page. They is a new workflow be created by PyDolphinScheduler and it named “tutorial” or “tutorial_decorator”. The task graph of workflow like below:
/ \
task_parent --> --> task_union
\ /
--> task_child_two
Create Workflow Using YAML File
We can use pydolphinscheduler CLI to create workflow using YAML file
pydolphinscheduler yaml -f Shell.yaml
We can use the following four special grammars to define workflows more flexibly.
$FILE{"file_name"}
: Read the file (file_name
) contents and replace them to that location.$WORKFLOW{"other_workflow.yaml"}
: Refer to another workflow defined using YAML file (other_workflow.yaml
) and replace the workflow name in this location.$ENV{env_name}
: Read the environment variable (env_name
) and replace it to that location.${CONFIG.key_name}
: Read the configuration value of key (key_name
) and it them to that location.
In addition, when loading the file path use $FILE{"file_name"}
or $WORKFLOW{"other_workflow.yaml"}
, pydolphinscheduler will search in the path of the YAMl file if the file does not exist.
For exmaples, our file directory structure is as follows:
.
└── yaml_define
├── Condition.yaml
├── DataX.yaml
├── Dependent_External.yaml
├── Dependent.yaml
├── example_datax.json
├── example_sql.sql
├── example_sub_workflow.yaml
├── Flink.yaml
├── Http.yaml
├── MapReduce.yaml
├── MoreConfiguration.yaml
├── Procedure.yaml
├── Python.yaml
├── Shell.yaml
├── Spark.yaml
├── Sql.yaml
├── SubWorkflow.yaml
└── Switch.yaml
After we run
pydolphinscheduler yaml -file yaml_define/SubWorkflow.yaml
the $WORKFLOW{"example_sub_workflow.yaml"}
will be set to $WORKFLOW{"yaml_define/example_sub_workflow.yaml"}
, because ./example_sub_workflow.yaml
does not exist and yaml_define/example_sub_workflow.yaml
does.
Furthermore, this feature supports recursion all the way down.