Sub Workflow

Task trigger exists workflow run, should make sure workflow exists in current project when you create sub workflow task.

Example

we have a simple example about how to use sub workflow task, when we want to create a sub workflow task, we should makeh sure in already exists in current project. So the first thing we do is to create a workflow will be used as sub workflow task.

with Workflow(name="sub_workflow_downstream") as wf_downstream, Workflow(
    name="task_sub_workflow_example"
) as wf_upstream:
    sub_workflow_ds_task = Shell(
        name="task_sub_workflow",
        command="echo 'call sub workflow success!'",
        workflow=wf_downstream,
    )
    wf_downstream.submit()

workflow with name sub_workflow_upstream would be create after we exists submit method.

Then we create a main workflow, and the sub workflow task will connect to workflow we created before.

    sw_task = SubWorkflow(
        name="sub_workflow",
        workflow_name=wf_downstream.name,
        workflow=wf_upstream,
    )

Finish we can submit or run sub workflow task by submit or run method. And you can also use workflow already exists in current project instead of create a new one.

Note

We could only run the workflow contains sub workflow task, and the sub workflow task will trigger the sub workflow run.

# [start sub_workflow_declare]
with Workflow(name="sub_workflow_downstream") as wf_downstream, Workflow(
    name="task_sub_workflow_example"
) as wf_upstream:
    sub_workflow_ds_task = Shell(
        name="task_sub_workflow",
        command="echo 'call sub workflow success!'",
        workflow=wf_downstream,
    )
    wf_downstream.submit()
    # [end sub_workflow_declare]

    sub_workflow_pre = Shell(
        name="pre-task",
        command="echo 'prefix task for sub workflow'",
        workflow=wf_upstream,
    )
    # [start sub_workflow_task_declare]
    sw_task = SubWorkflow(
        name="sub_workflow",
        workflow_name=wf_downstream.name,
        workflow=wf_upstream,
    )
    # [end sub_workflow_task_declare]
    sub_workflow_pre >> sw_task
    # Please make sure workflow with name `wf_downstream.name` exists when we submit or run sub workflow task
    wf_upstream.run()

Dive Into

Task sub workflow.

class pydolphinscheduler.tasks.sub_workflow.SubWorkflow(name: str, workflow_name: str, *args, **kwargs)[source]

Bases: BatchTask

Task SubWorkflow object, declare behavior for SubWorkflow task to dolphinscheduler.

get_workflow_info(workflow_name: str) dict[source]

Get workflow info from java gateway, contains workflow id, name, code.

_downstream_task_codes: set[int]
_task_custom_attr: set = {'process_definition_code'}
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
property process_definition_code: str

Get workflow code, a wrapper for get_workflow_info().

We can not change this function name to workflow_code, because it is a keyword used in dolphinscheduler itself.

YAML file example

# Define the workflow
workflow:
  name: "SubWorkflow"

tasks:
  - name: example_workflow
    task_type: SubWorkflow
    workflow_name: $WORKFLOW{"example_sub_workflow.yaml"}

  - { "task_type": "Shell", "deps": [example_workflow], "name": "task_3", "command": "echo task 3" }

example_sub_workflow.yaml:

# Define the workflow
workflow:
  name: "example_workflow_for_sub_workflow"

# Define the tasks within the workflow
tasks:
  - { "task_type": "Shell", "name": "task_1", "command": "echo task 1" }
  - { "task_type": "Shell", "deps": [task_1], "name": "task_2", "command": "echo task 2" }
  - { "task_type": "Shell", "deps": [task_2], "name": "task_3", "command": "echo task 3" }