Sub Workflow
Task trigger exists workflow run, should make sure workflow exists in current project when you create sub workflow task.
Example
we have a simple example about how to use sub workflow task, when we want to create a sub workflow task, we should makeh sure in already exists in current project. So the first thing we do is to create a workflow will be used as sub workflow task.
with Workflow(name="sub_workflow_downstream") as wf_downstream, Workflow(
name="task_sub_workflow_example"
) as wf_upstream:
sub_workflow_ds_task = Shell(
name="task_sub_workflow",
command="echo 'call sub workflow success!'",
workflow=wf_downstream,
)
wf_downstream.submit()
workflow with name sub_workflow_upstream
would be create after we exists submit
method.
Then we create a main workflow, and the sub workflow task will connect to workflow we created before.
sw_task = SubWorkflow(
name="sub_workflow",
workflow_name=wf_downstream.name,
workflow=wf_upstream,
)
Finish we can submit or run sub workflow task by submit
or run
method. And you can also use workflow
already exists in current project instead of create a new one.
Note
We could only run the workflow contains sub workflow task, and the sub workflow task will trigger the sub workflow run.
# [start sub_workflow_declare]
with Workflow(name="sub_workflow_downstream") as wf_downstream, Workflow(
name="task_sub_workflow_example"
) as wf_upstream:
sub_workflow_ds_task = Shell(
name="task_sub_workflow",
command="echo 'call sub workflow success!'",
workflow=wf_downstream,
)
wf_downstream.submit()
# [end sub_workflow_declare]
sub_workflow_pre = Shell(
name="pre-task",
command="echo 'prefix task for sub workflow'",
workflow=wf_upstream,
)
# [start sub_workflow_task_declare]
sw_task = SubWorkflow(
name="sub_workflow",
workflow_name=wf_downstream.name,
workflow=wf_upstream,
)
# [end sub_workflow_task_declare]
sub_workflow_pre >> sw_task
# Please make sure workflow with name `wf_downstream.name` exists when we submit or run sub workflow task
wf_upstream.run()
Dive Into
Task sub workflow.
- class pydolphinscheduler.tasks.sub_workflow.SubWorkflow(name: str, workflow_name: str, *args, **kwargs)[source]
Bases:
BatchTask
Task SubWorkflow object, declare behavior for SubWorkflow task to dolphinscheduler.
- get_workflow_info(workflow_name: str) dict [source]
Get workflow info from java gateway, contains workflow id, name, code.
- _downstream_task_codes: set[int]
- _task_custom_attr: set = {'process_definition_code'}
- _task_relation: set[TaskRelation]
- _timeout: timedelta | int
- _upstream_task_codes: set[int]
- property process_definition_code: str
Get workflow code, a wrapper for
get_workflow_info()
.We can not change this function name to workflow_code, because it is a keyword used in dolphinscheduler itself.
YAML file example
# Define the workflow
workflow:
name: "SubWorkflow"
tasks:
- name: example_workflow
task_type: SubWorkflow
workflow_name: $WORKFLOW{"example_sub_workflow.yaml"}
- { "task_type": "Shell", "deps": [example_workflow], "name": "task_3", "command": "echo task 3" }
example_sub_workflow.yaml:
# Define the workflow
workflow:
name: "example_workflow_for_sub_workflow"
# Define the tasks within the workflow
tasks:
- { "task_type": "Shell", "name": "task_1", "command": "echo task 1" }
- { "task_type": "Shell", "deps": [task_1], "name": "task_2", "command": "echo task 2" }
- { "task_type": "Shell", "deps": [task_2], "name": "task_3", "command": "echo task 3" }