Tutorial

This tutorial shows you the basic concept of PyDolphinScheduler and tells all things you should know before you submit or run your first workflow. If you still have not installed PyDolphinScheduler and started DolphinScheduler, you could go and see how to get started with PyDolphinScheduler firstly.

Overview of Tutorial

Here, we have an overview of our tutorial, and it looks a little complex but don’t worry about that because we will explain this example below in as much detail as possible.

There are two types of tutorials: traditional and task decorator.

  • Traditional Way: More general, support many built-in task types, it is convenient when you build your workflow at the beginning.

  • Task Decorator: A Python decorator that allows you to wrap your function into pydolphinscheduler’s task. Less versatility to the traditional way because it only supports Python functions without build-in tasks supported. But it is helpful if your workflow is all built with Python or if you already have some Python workflow code and want to migrate them to pydolphinscheduler.

  • YAML File: We can use pydolphinscheduler CLI to create workflow using YAML file: pydolphinscheduler yaml -f tutorial.yaml. We can find more YAML file examples in examples/yaml_define

# [start package_import]
# Import Workflow object to define your workflow attributes
from pydolphinscheduler.core.workflow import Workflow

# Import task Shell object cause we would create some shell tasks later
from pydolphinscheduler.tasks.shell import Shell

# [end package_import]

# [start workflow_declare]
with Workflow(
    name="tutorial",
    schedule="0 0 0 * * ? *",
    start_time="2021-01-01",
) as workflow:
    # [end workflow_declare]
    # [start task_declare]
    task_parent = Shell(name="task_parent", command="echo hello pydolphinscheduler")
    task_child_one = Shell(
        name="task_child_one",
        command="""
        echo "Executing line 1 with parameter str type ${param1}"
        echo "Executing line 2 with parameter int type ${param2}"
        echo "Executing line 3 with parameter build-in parameter currently date ${param3}"
        """,
        params={"param1": "str1", "param2": 123, "param3": "$[yyyy-MM-dd]"},
    )
    task_child_two = Shell(name="task_child_two", command="echo 'child two'")
    task_union = Shell(name="task_union", command="echo union")

    # [start resource_limit]
    resource_limit = Shell(
        name="resource_limit",
        command="echo resource limit",
        cpu_quota=1,
        memory_max=100,
    )
    # [end resource_limit]
    # [end task_declare]

    # [start task_relation_declare]
    task_group = [task_child_one, task_child_two]
    task_parent.set_downstream(task_group)

    resource_limit << task_union << task_group
    # [end task_relation_declare]

    # [start submit_or_run]
    workflow.run()
    # [end submit_or_run]
# [start package_import]
# Import Workflow object to define your workflow attributes
from pydolphinscheduler.core.workflow import Workflow

# Import task Shell object cause we would create some shell tasks later
from pydolphinscheduler.tasks.func_wrap import task

# [end package_import]

scope_global = "global-var"


# [start task_declare]
@task
def print_something():
    """First task in this workflow."""
    print("hello python function wrap task")


@task
def depend_import():
    """Depend on import module."""
    time.sleep(2)


@task
def depend_global_var():
    """Depend on global var."""
    print(f"Use global variable {scope_global}")


@task
def depend_local_var():
    """Depend on local variable."""
    scope_global = "local"
    print(f"Use local variable overwrite global {scope_global}")


def foo():
    """Call in other task."""
    print("this is a global function")


@task
def depend_func():
    """Depend on global function."""
    foo()


# [end task_declare]


# [start workflow_declare]
with Workflow(
    name="tutorial_decorator",
    schedule="0 0 0 * * ? *",
    start_time="2021-01-01",
) as workflow:
    # [end workflow_declare]

    # [start task_relation_declare]
    task_group = [depend_import(), depend_global_var()]
    print_something().set_downstream(task_group)

    task_group >> depend_local_var() >> depend_func()
    # [end task_relation_declare]

    # [start submit_or_run]
    workflow.submit()
    # [end submit_or_run]
# Define the workflow
workflow:
  name: "tutorial"
  schedule: "0 0 0 * * ? *"
  start_time: "2021-01-01"
  release_state: "offline"
  run: true

# Define the tasks within the workflow
tasks:
  - name: task_parent
    task_type: Shell
    command: echo hello pydolphinscheduler

  - name: task_child_one
    task_type: Shell
    deps: [task_parent]
    command: echo "child one"

  - name: task_child_two
    task_type: Shell
    deps: [task_parent]
    command: echo "child two"

  - name: task_union
    task_type: Shell
    deps: [task_child_one, task_child_two]
    command: echo "union"

Import Necessary Module

First of all, we should import the necessary module which we would use later just like other Python packages.

# Import Workflow object to define your workflow attributes
from pydolphinscheduler.core.workflow import Workflow

# Import task Shell object cause we would create some shell tasks later
from pydolphinscheduler.tasks.shell import Shell

In tradition tutorial we import pydolphinscheduler.core.workflow.Workflow and pydolphinscheduler.tasks.shell.Shell.

If you want to use other task type you could click and see all tasks we support

# Import Workflow object to define your workflow attributes
from pydolphinscheduler.core.workflow import Workflow

# Import task Shell object cause we would create some shell tasks later
from pydolphinscheduler.tasks.func_wrap import task

In task decorator tutorial we import pydolphinscheduler.core.workflow.Workflow and pydolphinscheduler.tasks.func_wrap.task().

workflow Declaration

We should instantiate pydolphinscheduler.core.workflow.Workflow object after we import them from import necessary module. Here we declare basic arguments for workflow. We define the name of Workflow, using Python context manager and it the only required argument for Workflow. Besides, we also declare three arguments named schedule and start_time which sets workflow schedule interval and schedule start_time, and argument tenant defines which tenant will be running this task in the DolphinScheduler worker. See section tenant in PyDolphinScheduler Concepts for more information.

with Workflow(
    name="tutorial",
    schedule="0 0 0 * * ? *",
    start_time="2021-01-01",
) as workflow:
with Workflow(
    name="tutorial_decorator",
    schedule="0 0 0 * * ? *",
    start_time="2021-01-01",
) as workflow:
# Define the workflow
workflow:
  name: "tutorial"
  schedule: "0 0 0 * * ? *"
  start_time: "2021-01-01"
  release_state: "offline"
  run: true

We could find more details about Workflow in concept about workflow if you are interested in it. For all arguments of object workflow, you could find in the pydolphinscheduler.core.workflow API documentation.

Task Declaration

We declare four tasks to show how to create tasks, and all of them are simple tasks of pydolphinscheduler.tasks.shell which runs echo command in the terminal. Besides the argument command with echo command, we also need to set the argument name for each task (not only shell task, `name` is required for each type of task).

    task_parent = Shell(name="task_parent", command="echo hello pydolphinscheduler")
    task_child_one = Shell(
        name="task_child_one",
        command="""
        echo "Executing line 1 with parameter str type ${param1}"
        echo "Executing line 2 with parameter int type ${param2}"
        echo "Executing line 3 with parameter build-in parameter currently date ${param3}"
        """,
        params={"param1": "str1", "param2": 123, "param3": "$[yyyy-MM-dd]"},
    )
    task_child_two = Shell(name="task_child_two", command="echo 'child two'")
    task_union = Shell(name="task_union", command="echo union")

    # [start resource_limit]
    resource_limit = Shell(
        name="resource_limit",
        command="echo resource limit",
        cpu_quota=1,
        memory_max=100,
    )
    # [end resource_limit]

Besides shell task, PyDolphinScheduler supports multiple tasks and you could find in Tasks.

We declare four tasks to show how to create tasks, and all of them are created by the task decorator which using pydolphinscheduler.tasks.func_wrap.task(). All we have to do is add a decorator named @task to existing Python function, and then use them inside pydolphinscheduler.core.workflow

@task
def print_something():
    """First task in this workflow."""
    print("hello python function wrap task")


@task
def depend_import():
    """Depend on import module."""
    time.sleep(2)


@task
def depend_global_var():
    """Depend on global var."""
    print(f"Use global variable {scope_global}")


@task
def depend_local_var():
    """Depend on local variable."""
    scope_global = "local"
    print(f"Use local variable overwrite global {scope_global}")


def foo():
    """Call in other task."""
    print("this is a global function")


@task
def depend_func():
    """Depend on global function."""
    foo()


It makes our workflow more Pythonic, but be careful that when we use task decorator mode, it means we only use Python function as a task and could not use the built-in tasks most of the cases.

tasks:
  - name: task_parent
    task_type: Shell
    command: echo hello pydolphinscheduler

  - name: task_child_one
    task_type: Shell
    deps: [task_parent]
    command: echo "child one"

  - name: task_child_two
    task_type: Shell
    deps: [task_parent]
    command: echo "child two"

  - name: task_union
    task_type: Shell
    deps: [task_child_one, task_child_two]
    command: echo "union"

Setting Task Dependence

After we declare both workflow and task, we have four tasks that are independent and will be running in parallel. If you want to start one task until some task is finished, you have to set dependence on those tasks.

Set task dependence is quite easy by task’s attribute set_downstream and set_upstream or by bitwise operators >> and <<

In this tutorial, task task_parent is the leading task of the whole workflow, then task task_child_one and task task_child_two are its downstream tasks. Task task_union will not run unless both task task_child_one and task task_child_two was done, because both two task is task_union’s upstream.

    task_group = [task_child_one, task_child_two]
    task_parent.set_downstream(task_group)

    resource_limit << task_union << task_group
    task_group = [depend_import(), depend_global_var()]
    print_something().set_downstream(task_group)

    task_group >> depend_local_var() >> depend_func()

We can use deps:[] to set task dependence

tasks:
  - name: task_parent
    task_type: Shell
    command: echo hello pydolphinscheduler

  - name: task_child_one
    task_type: Shell
    deps: [task_parent]
    command: echo "child one"

  - name: task_child_two
    task_type: Shell
    deps: [task_parent]
    command: echo "child two"

  - name: task_union
    task_type: Shell
    deps: [task_child_one, task_child_two]
    command: echo "union"

Note

We could set task dependence in batch mode if they have the same downstream or upstream by declaring those tasks as task groups. In tutorial, We declare task task_child_one and task_child_two as task group named task_group, then set task_group as downstream of task task_parent. You could see more detail in Tasks Dependence for more detail about how to set task dependence.

Submit Or Run Workflow

After that, we finish our workflow definition, with four tasks and task dependence, but all these things are local, we should let the DolphinScheduler daemon know how the definition of workflow. So the last thing we have to do is submit the workflow to the DolphinScheduler daemon.

Fortunately, we have a convenient method to submit workflow via Workflow attribute run which will create workflow definition as well as workflow schedule.

    workflow.run()
    workflow.submit()

pydolphinscheduler YAML CLI always submit workflow. We can run the workflow if we set run: true

# Define the workflow
workflow:
  name: "tutorial"
  run: true

At last, we could execute this workflow code in your terminal like other Python scripts, running python tutorial.py to trigger and execute it.

Note

If you do not start your DolphinScheduler API server, you could find how to start it in Start Python Gateway Service for more detail. Besides attribute run, we have attribute submit for object Workflow which just submits workflow to the daemon but does not set the workflow schedule information. For more detail, you could see Workflow.

DAG Graph After Tutorial Run

After we run the tutorial code, you could log in DolphinScheduler web UI, go and see the DolphinScheduler project page. They is a new workflow be created by PyDolphinScheduler and it named “tutorial” or “tutorial_decorator”. The task graph of workflow like below:

                /                    \
task_parent -->                        -->  task_union
                \                    /
                  --> task_child_two

Create Workflow Using YAML File

We can use pydolphinscheduler CLI to create workflow using YAML file

pydolphinscheduler yaml -f Shell.yaml

We can use the following four special grammars to define workflows more flexibly.

  • $FILE{"file_name"}: Read the file (file_name) contents and replace them to that location.

  • $WORKFLOW{"other_workflow.yaml"}: Refer to another workflow defined using YAML file (other_workflow.yaml) and replace the workflow name in this location.

  • $ENV{env_name}: Read the environment variable (env_name) and replace it to that location.

  • ${CONFIG.key_name}: Read the configuration value of key (key_name) and it them to that location.

In addition, when loading the file path use $FILE{"file_name"} or $WORKFLOW{"other_workflow.yaml"}, pydolphinscheduler will search in the path of the YAMl file if the file does not exist.

For exmaples, our file directory structure is as follows:

.
└── yaml_define
    ├── Condition.yaml
    ├── DataX.yaml
    ├── Dependent_External.yaml
    ├── Dependent.yaml
    ├── example_datax.json
    ├── example_sql.sql
    ├── example_sub_workflow.yaml
    ├── Flink.yaml
    ├── Http.yaml
    ├── MapReduce.yaml
    ├── MoreConfiguration.yaml
    ├── Procedure.yaml
    ├── Python.yaml
    ├── Shell.yaml
    ├── Spark.yaml
    ├── Sql.yaml
    ├── SubWorkflow.yaml
    └── Switch.yaml

After we run

pydolphinscheduler yaml -file yaml_define/SubWorkflow.yaml

the $WORKFLOW{"example_sub_workflow.yaml"} will be set to $WORKFLOW{"yaml_define/example_sub_workflow.yaml"}, because ./example_sub_workflow.yaml does not exist and yaml_define/example_sub_workflow.yaml does.

Furthermore, this feature supports recursion all the way down.