Sub Process

Task sub_process.

class pydolphinscheduler.tasks.sub_process.SubProcess(name: str, process_definition_name: str, *args, **kwargs)[source]

Bases: Task

Task SubProcess object, declare behavior for SubProcess task to dolphinscheduler.

get_process_definition_info(process_definition_name: str) Dict[source]

Get process definition info from java gateway, contains process definition id, name, code.

_downstream_task_codes: Set[int]
_task_custom_attr: set = {'process_definition_code'}
_task_relation: Set[TaskRelation]
_upstream_task_codes: Set[int]
property process_definition_code: str

Get process definition code, a wrapper for get_process_definition_info().

YAML file example

# Define the workflow
workflow:
  name: "SubWorkflow"

tasks:
  - name: example_workflow
    task_type: SubProcess
    process_definition_name: $WORKFLOW{"example_sub_workflow.yaml"}

  - { "task_type": "Shell", "deps": [example_workflow], "name": "task_3", "command": "echo task 3" }

example_subprocess.yaml:

# Define the workflow
workflow:
  name: "example_workflow_for_sub_workflow"

# Define the tasks under the workflow
tasks:
  - { "task_type": "Shell", "name": "task_1", "command": "echo task 1" }
  - { "task_type": "Shell", "deps": [task_1], "name": "task_2", "command": "echo task 2" }
  - { "task_type": "Shell", "deps": [task_2], "name": "task_3", "command": "echo task 3" }