Tutorial

This tutorial shows you the basic concept of PyDolphinScheduler and tells all things you should know before you submit or run your first workflow. If you still have not installed PyDolphinScheduler and start DolphinScheduler, you could go and see how to getting start PyDolphinScheduler firstly.

Overview of Tutorial

Here have an overview of our tutorial, and it looks a little complex but does not worry about that because we explain this example below as detail as possible.

There are two types of tutorials: traditional and task decorator.

  • Traditional Way: More general, support many built-in task types, it is convenient when you build your workflow at the beginning.

  • Task Decorator: A Python decorator allow you to wrap your function into pydolphinscheduler’s task. Less versatility to the traditional way because it only supported Python functions and without build-in tasks supported. But it is helpful if your workflow is all built with Python or if you already have some Python workflow code and want to migrate them to pydolphinscheduler.

  • YAML File: We can use pydolphinscheduler CLI to create process using YAML file: pydolphinscheduler yaml -f tutorial.yaml. We can find more YAML file examples in examples/yaml_define

# [start package_import]
# Import ProcessDefinition object to define your workflow attributes
from pydolphinscheduler.core.process_definition import ProcessDefinition

# Import task Shell object cause we would create some shell tasks later
from pydolphinscheduler.tasks.shell import Shell

# [end package_import]

# [start workflow_declare]
with ProcessDefinition(
    name="tutorial",
    schedule="0 0 0 * * ? *",
    start_time="2021-01-01",
    tenant="tenant_exists",
) as pd:
    # [end workflow_declare]
    # [start task_declare]
    task_parent = Shell(name="task_parent", command="echo hello pydolphinscheduler")
    task_child_one = Shell(name="task_child_one", command="echo 'child one'")
    task_child_two = Shell(name="task_child_two", command="echo 'child two'")
    task_union = Shell(name="task_union", command="echo union")
    # [end task_declare]

    # [start task_relation_declare]
    task_group = [task_child_one, task_child_two]
    task_parent.set_downstream(task_group)

    task_union << task_group
    # [end task_relation_declare]

    # [start submit_or_run]
    pd.run()
    # [end submit_or_run]
# [start package_import]
# Import ProcessDefinition object to define your workflow attributes
from pydolphinscheduler.core.process_definition import ProcessDefinition

# Import task Shell object cause we would create some shell tasks later
from pydolphinscheduler.tasks.func_wrap import task

# [end package_import]


# [start task_declare]
@task
def task_parent():
    """First task in this workflow."""
    print("echo hello pydolphinscheduler")


@task
def task_child_one():
    """Child task will be run parallel after task ``task_parent`` finished."""
    print("echo 'child one'")


@task
def task_child_two():
    """Child task will be run parallel after task ``task_parent`` finished."""
    print("echo 'child two'")


@task
def task_union():
    """Last task in this workflow."""
    print("echo union")


# [end task_declare]


# [start workflow_declare]
with ProcessDefinition(
    name="tutorial_decorator",
    schedule="0 0 0 * * ? *",
    start_time="2021-01-01",
    tenant="tenant_exists",
) as pd:
    # [end workflow_declare]

    # [start task_relation_declare]
    task_group = [task_child_one(), task_child_two()]
    task_parent().set_downstream(task_group)

    task_union() << task_group
    # [end task_relation_declare]

    # [start submit_or_run]
    pd.run()
    # [end submit_or_run]
# Define the workflow
workflow:
  name: "tutorial"
  schedule: "0 0 0 * * ? *"
  start_time: "2021-01-01"
  tenant: "tenant_exists"
  release_state: "offline"
  run: true

# Define the tasks under the workflow
tasks:
  - name: task_parent
    task_type: Shell
    command: echo hello pydolphinscheduler

  - name: task_child_one
    task_type: Shell
    deps: [task_parent]
    command: echo "child one"

  - name: task_child_two
    task_type: Shell
    deps: [task_parent]
    command: echo "child two"

  - name: task_union
    task_type: Shell
    deps: [task_child_one, task_child_two]
    command: echo "union"

Import Necessary Module

First of all, we should import the necessary module which we would use later just like other Python packages.

# Import ProcessDefinition object to define your workflow attributes
from pydolphinscheduler.core.process_definition import ProcessDefinition

# Import task Shell object cause we would create some shell tasks later
from pydolphinscheduler.tasks.shell import Shell

In tradition tutorial we import pydolphinscheduler.core.process_definition.ProcessDefinition and pydolphinscheduler.tasks.shell.Shell.

If you want to use other task type you could click and see all tasks we support

# Import ProcessDefinition object to define your workflow attributes
from pydolphinscheduler.core.process_definition import ProcessDefinition

# Import task Shell object cause we would create some shell tasks later
from pydolphinscheduler.tasks.func_wrap import task

In task decorator tutorial we import pydolphinscheduler.core.process_definition.ProcessDefinition and pydolphinscheduler.tasks.func_wrap.task().

Process Definition Declaration

We should instantiate pydolphinscheduler.core.process_definition.ProcessDefinition object after we import them from import necessary module. Here we declare basic arguments for process definition(aka, workflow). We define the name of ProcessDefinition, using Python context manager and it the only required argument for ProcessDefinition. Besides, we also declare three arguments named schedule and start_time which setting workflow schedule interval and schedule start_time, and argument tenant defines which tenant will be running this task in the DolphinScheduler worker. See section tenant in PyDolphinScheduler Concepts for more information.

with ProcessDefinition(
    name="tutorial",
    schedule="0 0 0 * * ? *",
    start_time="2021-01-01",
    tenant="tenant_exists",
) as pd:
with ProcessDefinition(
    name="tutorial_decorator",
    schedule="0 0 0 * * ? *",
    start_time="2021-01-01",
    tenant="tenant_exists",
) as pd:
# Define the workflow
workflow:
  name: "tutorial"
  schedule: "0 0 0 * * ? *"
  start_time: "2021-01-01"
  tenant: "tenant_exists"
  release_state: "offline"
  run: true

We could find more detail about ProcessDefinition in concept about process definition if you are interested in it. For all arguments of object process definition, you could find in the pydolphinscheduler.core.process_definition API documentation.

Task Declaration

We declare four tasks to show how to create tasks, and both of them are simple tasks of pydolphinscheduler.tasks.shell which runs echo command in the terminal. Besides the argument command with echo command, we also need to set the argument name for each task (not only shell task, `name` is required for each type of task).

    task_parent = Shell(name="task_parent", command="echo hello pydolphinscheduler")
    task_child_one = Shell(name="task_child_one", command="echo 'child one'")
    task_child_two = Shell(name="task_child_two", command="echo 'child two'")
    task_union = Shell(name="task_union", command="echo union")

Besides shell task, PyDolphinScheduler supports multiple tasks and you could find in Tasks.

We declare four tasks to show how to create tasks, and both of them are created by the task decorator which using pydolphinscheduler.tasks.func_wrap.task(). All we have to do is add a decorator named @task to existing Python function, and then use them inside pydolphinscheduler.core.process_definition

@task
def task_parent():
    """First task in this workflow."""
    print("echo hello pydolphinscheduler")


@task
def task_child_one():
    """Child task will be run parallel after task ``task_parent`` finished."""
    print("echo 'child one'")


@task
def task_child_two():
    """Child task will be run parallel after task ``task_parent`` finished."""
    print("echo 'child two'")


@task
def task_union():
    """Last task in this workflow."""
    print("echo union")


It makes our workflow more Pythonic, but be careful that when we use task decorator mode mean we only use Python function as a task and could not use the built-in tasks most of the cases.

tasks:
  - name: task_parent
    task_type: Shell
    command: echo hello pydolphinscheduler

  - name: task_child_one
    task_type: Shell
    deps: [task_parent]
    command: echo "child one"

  - name: task_child_two
    task_type: Shell
    deps: [task_parent]
    command: echo "child two"

  - name: task_union
    task_type: Shell
    deps: [task_child_one, task_child_two]
    command: echo "union"

Setting Task Dependence

After we declare both process definition and task, we have four tasks that are independent and will be running in parallel. If you want to start one task until some task is finished, you have to set dependence on those tasks.

Set task dependence is quite easy by task’s attribute set_downstream and set_upstream or by bitwise operators >> and <<

In this tutorial, task task_parent is the leading task of the whole workflow, then task task_child_one and task task_child_two are its downstream tasks. Task task_union will not run unless both task task_child_one and task task_child_two was done, because both two task is task_union’s upstream.

    task_group = [task_child_one, task_child_two]
    task_parent.set_downstream(task_group)

    task_union << task_group
    task_group = [task_child_one(), task_child_two()]
    task_parent().set_downstream(task_group)

    task_union() << task_group

We can use deps:[] to set task dependence

tasks:
  - name: task_parent
    task_type: Shell
    command: echo hello pydolphinscheduler

  - name: task_child_one
    task_type: Shell
    deps: [task_parent]
    command: echo "child one"

  - name: task_child_two
    task_type: Shell
    deps: [task_parent]
    command: echo "child two"

  - name: task_union
    task_type: Shell
    deps: [task_child_one, task_child_two]
    command: echo "union"

Note

We could set task dependence in batch mode if they have the same downstream or upstream by declaring those tasks as task groups. In tutorial, We declare task task_child_one and task_child_two as task group named task_group, then set task_group as downstream of task task_parent. You could see more detail in Tasks Dependence for more detail about how to set task dependence.

Submit Or Run Workflow

After that, we finish our workflow definition, with four tasks and task dependence, but all these things are local, we should let the DolphinScheduler daemon know how the definition of workflow. So the last thing we have to do is submit the workflow to the DolphinScheduler daemon.

Fortunately, we have a convenient method to submit workflow via ProcessDefinition attribute run which will create workflow definition as well as workflow schedule.

    pd.run()
    pd.run()

pydolphinscheduler YAML CLI always submit workflow. We can run the workflow if we set run: true

# Define the workflow
workflow:
  name: "tutorial"
  run: true

At last, we could execute this workflow code in your terminal like other Python scripts, running python tutorial.py to trigger and execute it.

Note

If you do not start your DolphinScheduler API server, you could find how to start it in Start Python Gateway Service for more detail. Besides attribute run, we have attribute submit for object ProcessDefinition which just submits workflow to the daemon but does not set the workflow schedule information. For more detail, you could see Process Definition.

DAG Graph After Tutorial Run

After we run the tutorial code, you could log in DolphinScheduler web UI, go and see the DolphinScheduler project page. They is a new process definition be created by PyDolphinScheduler and it named “tutorial” or “tutorial_decorator”. The task graph of workflow like below:

                  --> task_child_one
                /                    \
task_parent -->                        -->  task_union
                \                    /
                  --> task_child_two

Create Process Using YAML File

We can use pydolphinscheduler CLI to create process using YAML file

pydolphinscheduler yaml -f Shell.yaml

We can use the following four special grammars to define workflows more flexibly.

  • $FILE{"file_name"}: Read the file (file_name) contents and replace them to that location.

  • $WORKFLOW{"other_workflow.yaml"}: Refer to another process defined using YAML file (other_workflow.yaml) and replace the process name in this location.

  • $ENV{env_name}: Read the environment variable (env_name) and replace it to that location.

  • ${CONFIG.key_name}: Read the configuration value of key (key_name) and it them to that location.

In addition, when loading the file path use $FILE{"file_name"} or $WORKFLOW{"other_workflow.yaml"}, pydolphinscheduler will search in the path of the YAMl file if the file does not exist.

For exmaples, our file directory structure is as follows:

.
└── yaml_define
    ├── Condition.yaml
    ├── DataX.yaml
    ├── Dependent_External.yaml
    ├── Dependent.yaml
    ├── example_datax.json
    ├── example_sql.sql
    ├── example_subprocess.yaml
    ├── Flink.yaml
    ├── Http.yaml
    ├── MapReduce.yaml
    ├── MoreConfiguration.yaml
    ├── Procedure.yaml
    ├── Python.yaml
    ├── Shell.yaml
    ├── Spark.yaml
    ├── Sql.yaml
    ├── SubProcess.yaml
    └── Switch.yaml

After we run

pydolphinscheduler yaml -file yaml_define/SubProcess.yaml

the $WORKFLOW{"example_sub_workflow.yaml"} will be set to $WORKFLOW{"yaml_define/example_sub_workflow.yaml"}, because ./example_subprocess.yaml does not exist and yaml_define/example_sub_workflow.yaml does.

Furthermore, this feature supports recursion all the way down.