Concepts

In this section, you would know the core concepts of PyDolphinScheduler.

Workflow

Workflow describes the whole things except tasks and tasks dependence, which includes name, schedule interval, schedule start time and end time. You would know scheduler

Workflow could be initialized in a normal assignment statement or within a context manger.

# Initialization with assign statement
workflow = Workflow(name="my first workflow")

# Or context manger
with Workflow(name="my first workflow") as workflow:
    workflow.submit()

Workflow is the main object communicating between PyDolphinScheduler and DolphinScheduler daemon. After workflow and task is declared, you could use submit and run to notify server your definition.

If you just want to submit your definition and create workflow, without running it, you should use attribute submit. But if you want to run the workflow after you submit it, you could use attribute run.

# Just submit definition, without run it
workflow.submit()

# Both submit and run definition
workflow.run()

Schedule

We use parameter schedule to determine the schedule interval of workflow, PyDolphinScheduler supports seven asterisks expression, and each of the meaning of position is as below

* * * * * * *
┬ ┬ ┬ ┬ ┬ ┬ ┬
│ │ │ │ │ │ │
│ │ │ │ │ │ └─── year
│ │ │ │ │ └───── day of week (1 - 7) (1 to 7 are Sunday to Saturday, or use names; 7 is for Sunday, or use `SUN`)
│ │ │ │ └─────── month (1 - 12)
│ │ │ └───────── day of month (1 - 31)
│ │ └─────────── hour (0 - 23)
│ └───────────── min (0 - 59)
└─────────────── second (0 - 59)

Here we add some example crontab:

  • 0 0 0 * * ? *: Workflow execute every day at 00:00:00.

  • 10 2 * * * ? *: Workflow execute hourly day at ten pass two.

  • 10,11 20 0 1,2 * ? *: Workflow execute first and second day of month at 00:20:10 and 00:20:11.

Tenant

Tenant is the user who run task command in machine or in virtual machine. it could be assign by simple string. You should change the tenant value to exists tenant in your host, it config in config.yaml in your pydolphinscheduler PYDS_HOME, or via CLI

pydolphinscheduler config --set default.user.tenant <YOUR-TENANT-NAME>

Note

Make should tenant exists in target machine, otherwise it will raise an error when you try to run command

Execution Type

Decision which behavior to run when workflow have multiple instances. when workflow schedule interval is too short, it may cause multiple instances run at the same time. We can use this parameter to control the behavior about how to run those workflow instances. Currently we have four execution type:

  • parallel (default value): it means all instances will allow to run even though the previous instance is not finished.

  • serial_wait: it means the all instance will wait for the previous instance to finish, and all the waiting instances will be executed base on scheduling order.

  • serial_discard: it means the all instance will be discard(abandon) if the previous instance is not finished.

  • serial_priority: it means the all instance will wait for the previous instance to finish, and all the waiting instances will be executed base on workflow priority order.

Parameter execution type can be set in

  • Direct assign statement. You can pick execute type from above and direct assign to parameter execution_type.

    workflow = Workflow(
        name="workflow_name",
        execution_type="parallel"
    )
    
  • Via environment variables, configurations file setting, for more detail about those way setting, you can see you can read Configuration section.

Alert

Alert is the way to notify user when workflow instance is success or failed. We can set alert with parameter warning_type and warning_group_id in workflow definition.

  • warning_type represent the type of alert, when workflow instance in those status, it will trigger alert. The value of warning_type could be one of failure, success, all, none.

  • warning_group_id represent the group of alert, you can get the group id from DolphinScheduler web UI.

Tasks

Task is the minimum unit running actual job, and it is a node of DAG, aka directed acyclic graph. You could define what you want in the task. It has some required parameters to make uniqueness and definition.

Here we use pydolphinscheduler.tasks.Shell() as example, parameter name and command is required and must be provider. Parameter name set name to the task, and parameter command declare the command you wish to run in this task.

# We named this task as "shell", and just run command `echo shell task`
shell_task = Shell(name="shell", command="echo shell task")

If you want to see all types of tasks, you could see Tasks.

Tasks Dependence

You could define many tasks in on single Workflow. If all those tasks are in parallel processing, then you could leave them alone without adding any additional information. But if there are some tasks that should not be run unless pre task in workflow has been done, we should set task dependence to them. Set task dependence have two main ways and both of them are easy. You could use bitwise operator >> and <<, or task attribute set_downstream and set_upstream to do it.

# Set task1 as task2 upstream
task1 >> task2
# You could use attribute `set_downstream` too, is same as `task1 >> task2`
task1.set_downstream(task2)

# Set task1 as task2 downstream
task1 << task2
# It is same as attribute `set_upstream`
task1.set_upstream(task2)

# Beside, we could set dependence between task and sequence of tasks,
# we set `task1` is upstream to both `task2` and `task3`. It is useful
# for some tasks have same dependence.
task1 >> [task2, task3]

Task With Workflow

In most of data orchestration cases, you should assign attribute workflow to task instance to decide workflow of task. You could set workflow in both normal assign or in context manger mode

# Normal assign, have to explicit declaration and pass `Workflow` instance to task
workflow = Workflow(name="my first workflow")
shell_task = Shell(name="shell", command="echo shell task", workflow=workflow)

# Context manger, `Workflow` instance workflow would implicit declaration to task
with Workflow(name="my first workflow") as workflow:
    shell_task = Shell(name="shell", command="echo shell task",

With both Workflow, Tasks and Tasks Dependence, we could build a workflow with multiple tasks.

Task Group

A task group can manage and control the maximum number of concurrently running tasks. This is particularly useful when you want to limit the simultaneous execution of various task types. For instance, in an ETL (Extract, Transform, Load) job where data is extracted from a source database, it’s crucial to control the parallelism of extract tasks to prevent an excessive number of connections to the source database. This is where a task group comes into play. There are two key parameters, task_group_id and task_group_priority that determine the behavior of the task group.

Task group can control the maximum number of tasks running at the same time. It is useful when you don’t want to run too many type of tasks at the same time. For example when you extract data from source database in ELT job, you want to control the parallelism of extract task to avoid too many connections to source database. Then task group can help you. There are two major parameters task_group_id and task_group_priority to control the behavior of task group.

  • task_group_id: is an integer used to identify the task group. You can set a task_group_id to restrict the parallelism of tasks. The task_group_id can be find in the DolphinScheduler web UI. The default value is 0, which means there are no restrictions for this task group.

  • task_group_priority: is an integer used to define the priority of the task group. When different tasks share the same task_group_id, the task group’s priority comes into play, controlling the order in which they run. Higher values indicate higher priority. The default value is 0, which means there’s no specific priority for this task group, and tasks will run in the order they were created.

Here’s an example in Python:

extract = Shell(
   name="extract",
   command="echo 'Some extract command here'",
   task_group_id=1,
   task_group_priority=123
)

Resource Files

During workflow running, we may need some resource files to help us run task usually. One of a common situation is that we already have some executable files locally, and we need to schedule a specific time, or add them to existing workflow by adding the new tasks. Of course, we can upload those files to target machine and run them in shell task by reference the absolute path of file. But if we have more than one machine to run task, we have to upload those files to each of them. And it is not convenient and not flexible, because we may need to change our resource files sometimes.

One more pydolphinscheduler way is to upload those files together with workflow, and use them in task to run. For example, you have a bash script named echo-ten.sh locally, and it contains some code like this:

#!/bin/env bash
max=10
for ((i=1; i <= $max; ++i)); do
    echo "$i"
done

and you want to use it in workflow but do not want to copy-paste it to shell task parameter command. You could use this mechanism to upload it to resource center when you create workflow

# Read file content
file_name = "echo-ten.sh"

with open(file_name, "r") as f:
      content = f.read()

with Workflow(
   name="upload_and_run",
   resource_list=[
      Resource(name=file_name, content=content),
   ],
) as workflow:

And when we call workflow.run() the new file named echo-ten.sh would be uploaded to dolphinscheduler resource center.

After that we can use this file in our task by reference it by name, in this case we could use shell task to run it.

# We use `shell` task to run `echo-ten.sh` file
shell_task = Shell(
   name="run",
   command=f"bash {file_name}",
   resource_list=[
      file_name
   ],
)

During workflow running, the file would be downloaded to the task runtime working directory which mean you could execute them. We execute file by bash but reference it by name directly.

Please notice that we could also reference the resource file already in dolphinscheduler resource center, which mean we could use resource center files in task by name without upload it again. And we can upload files to resource center bare without workflow.

# Upload file to resource center
from pydolphinscheduler.core.resource import Resource

resource = Resource(name="bare-create.py", user_name="<USER-MUST-EXISTS-WITH-TENANT>", content="print('Bareh create resource')")
resource.create_or_update_resource()

After that, we could see new file named bare-create.py is be created in resource center.

Note

Both parameter resource_list in workflow and task is list of string which mean you could upload and reference multiple files. For more complex usage, you could read Upload and Use Multiple Resources.

Local Parameters

In DolphinScheduler, we can define parameter in task, aka Local Parameter.

We can set parameters to variables in tasks to better manage our tasks.

For example:

    # define a parameter "a", and use it in Shell task
    example1_input_params = Shell(
        name="example1_input_params",
        command="echo ${a}",
        input_params={
            "a": "123",
        },
    )

    # define a parameter "random_value", and pass it to the downstream tasks
    example2_output_params = Shell(
        name="example2_output_params",
        command="""
        val=$(echo $RANDOM)
        echo "#{setValue(random_value=${val})}"
        echo $val
        """,
        output_params={
            "random_value": "",
        },
    )

    # use the parameter "random_value", from upstream tasks
    # we don't need to define input_params again if the parameter is from upstram tasks
    example2_input_params = Shell(
        name="example2_input_params", command="""echo ${random_value}"""
    )

    example2_output_params >> example2_input_params

There are two ways to define local parameters:

    # Add parameter via task arguments
    task_1 = Shell(
        name="task_1",
        command="echo hello pydolphinscheduler",
        input_params={
            "value_VARCHAR": "abc",
            "value_INTEGER": 123,
            "value_FLOAT": 0.1,
            "value_BOOLEAN": True,
        },
        output_params={
            "value_EMPTY": None,
        },
    )

    # Add parameter via task instance's method
    task_2 = Shell(name="task_2", command="echo hello pydolphinscheduler")

    task_2.add_in("value_VARCHAR", "abc")
    task_2.add_in("value_INTEGER", 123)
    task_2.add_in("value_FLOAT", 0.1)
    task_2.add_in("value_BOOLEAN", True)
    task_2.add_out("value_EMPTY")

    # Task 1 is the same as task 2

    # Others parameter types which cannot be converted automatically, must declare type explicitly
    task_3 = Shell(
        name="task_3",
        command="echo '123' >> test.txt",
        input_params={
            "value_LONG": ParameterType.LONG("1000000"),
            "value_DATE": ParameterType.DATE("2022-01-02"),
            "value_TIME": ParameterType.TIME("2022-01-01"),
            "value_TIMESTAMP": ParameterType.TIMESTAMP(123123124125),
            "value_LIST": ParameterType.LIST("123123"),
        },
        output_params={
            "output_INTEGER": ParameterType.INTEGER(100),
            "output_LIST": ParameterType.LIST(),
            "output_FILE": ParameterType.FILE("test.txt"),
        },
    )

    workflow.submit()

Full example:

r"""
A tutorial example set local parameter in pydolphinscheduler.

Method 1:
    task = Shell(..., input_params={"input":"a"}, output_params={"output": "b"})

Method 2:
    task = Shell(...)
    task.add_in("input", "a")
    task.add_out("output", "b")
"""

from pydolphinscheduler.core.parameter import ParameterType
from pydolphinscheduler.core.workflow import Workflow
from pydolphinscheduler.tasks.shell import Shell

with Workflow(name="local_parameter_example", release_state="offline") as workflow:
    # [start parameter example]
    # define a parameter "a", and use it in Shell task
    example1_input_params = Shell(
        name="example1_input_params",
        command="echo ${a}",
        input_params={
            "a": "123",
        },
    )

    # define a parameter "random_value", and pass it to the downstream tasks
    example2_output_params = Shell(
        name="example2_output_params",
        command="""
        val=$(echo $RANDOM)
        echo "#{setValue(random_value=${val})}"
        echo $val
        """,
        output_params={
            "random_value": "",
        },
    )

    # use the parameter "random_value", from upstream tasks
    # we don't need to define input_params again if the parameter is from upstram tasks
    example2_input_params = Shell(
        name="example2_input_params", command="""echo ${random_value}"""
    )

    example2_output_params >> example2_input_params
    # [end parameter example]

    # [start parameter define]
    # Add parameter via task arguments
    task_1 = Shell(
        name="task_1",
        command="echo hello pydolphinscheduler",
        input_params={
            "value_VARCHAR": "abc",
            "value_INTEGER": 123,
            "value_FLOAT": 0.1,
            "value_BOOLEAN": True,
        },
        output_params={
            "value_EMPTY": None,
        },
    )

    # Add parameter via task instance's method
    task_2 = Shell(name="task_2", command="echo hello pydolphinscheduler")

    task_2.add_in("value_VARCHAR", "abc")
    task_2.add_in("value_INTEGER", 123)
    task_2.add_in("value_FLOAT", 0.1)
    task_2.add_in("value_BOOLEAN", True)
    task_2.add_out("value_EMPTY")

    # Task 1 is the same as task 2

    # Others parameter types which cannot be converted automatically, must declare type explicitly
    task_3 = Shell(
        name="task_3",
        command="echo '123' >> test.txt",
        input_params={
            "value_LONG": ParameterType.LONG("1000000"),
            "value_DATE": ParameterType.DATE("2022-01-02"),
            "value_TIME": ParameterType.TIME("2022-01-01"),
            "value_TIMESTAMP": ParameterType.TIMESTAMP(123123124125),
            "value_LIST": ParameterType.LIST("123123"),
        },
        output_params={
            "output_INTEGER": ParameterType.INTEGER(100),
            "output_LIST": ParameterType.LIST(),
            "output_FILE": ParameterType.FILE("test.txt"),
        },
    )

    workflow.submit()
    # [end parameter define]

Authentication Token

pydolphinscheduler use token as authentication when communication with dolphinscheduler server, and we have a default auth token to make it out-of-box. For security reason, we highly recommend you to change your own auth token when you deploy in production environment or test dolphinscheduler in public network. The auth token keyword in auth_token and it can be set in multiple ways which you can read Configuration section for more detail.