Sub Workflow

Task sub workflow.

class pydolphinscheduler.tasks.sub_workflow.SubWorkflow(name: str, workflow_name: str, *args, **kwargs)[source]

Bases: Task

Task SubWorkflow object, declare behavior for SubWorkflow task to dolphinscheduler.

get_workflow_info(workflow_name: str) Dict[source]

Get workflow info from java gateway, contains workflow id, name, code.

_downstream_task_codes: Set[int]
_task_custom_attr: set = {'process_definition_code'}
_task_relation: Set[TaskRelation]
_timeout: timedelta
_upstream_task_codes: Set[int]
property process_definition_code: str

Get workflow code, a wrapper for get_workflow_info().

We can not change this function name to workflow_code, because it is a keyword used in dolphinscheduler itself.

YAML file example

# Define the workflow
workflow:
  name: "SubWorkflow"

tasks:
  - name: example_workflow
    task_type: SubWorkflow
    workflow_name: $WORKFLOW{"example_sub_workflow.yaml"}

  - { "task_type": "Shell", "deps": [example_workflow], "name": "task_3", "command": "echo task 3" }

example_sub_workflow.yaml:

# Define the workflow
workflow:
  name: "example_workflow_for_sub_workflow"

# Define the tasks within the workflow
tasks:
  - { "task_type": "Shell", "name": "task_1", "command": "echo task 1" }
  - { "task_type": "Shell", "deps": [task_1], "name": "task_2", "command": "echo task 2" }
  - { "task_type": "Shell", "deps": [task_2], "name": "task_3", "command": "echo task 3" }