Sub Workflow
Task sub workflow.
- class pydolphinscheduler.tasks.sub_workflow.SubWorkflow(name: str, workflow_name: str, *args, **kwargs)[source]
Bases:
Task
Task SubWorkflow object, declare behavior for SubWorkflow task to dolphinscheduler.
- get_workflow_info(workflow_name: str) Dict [source]
Get workflow info from java gateway, contains workflow id, name, code.
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'process_definition_code'}
- _task_relation: Set[TaskRelation]
- _timeout: timedelta
- _upstream_task_codes: Set[int]
- property process_definition_code: str
Get workflow code, a wrapper for
get_workflow_info()
.We can not change this function name to workflow_code, because it is a keyword used in dolphinscheduler itself.
YAML file example
# Define the workflow
workflow:
name: "SubWorkflow"
tasks:
- name: example_workflow
task_type: SubWorkflow
workflow_name: $WORKFLOW{"example_sub_workflow.yaml"}
- { "task_type": "Shell", "deps": [example_workflow], "name": "task_3", "command": "echo task 3" }
example_sub_workflow.yaml:
# Define the workflow
workflow:
name: "example_workflow_for_sub_workflow"
# Define the tasks within the workflow
tasks:
- { "task_type": "Shell", "name": "task_1", "command": "echo task 1" }
- { "task_type": "Shell", "deps": [task_1], "name": "task_2", "command": "echo task 2" }
- { "task_type": "Shell", "deps": [task_2], "name": "task_3", "command": "echo task 3" }