Shell
A shell task type’s example and dive into information of PyDolphinScheduler.
Example
with Workflow(
name="tutorial",
schedule="0 0 0 * * ? *",
start_time="2021-01-01",
) as workflow:
# [end workflow_declare]
# [start task_declare]
task_parent = Shell(name="task_parent", command="echo hello pydolphinscheduler")
task_child_one = Shell(
name="task_child_one",
command="""
echo "Executing line 1 with parameter str type ${param1}"
echo "Executing line 2 with parameter int type ${param2}"
echo "Executing line 3 with parameter build-in parameter currently date ${param3}"
""",
params={"param1": "str1", "param2": 123, "param3": "$[yyyy-MM-dd]"},
)
task_child_two = Shell(name="task_child_two", command="echo 'child two'")
task_union = Shell(name="task_union", command="echo union")
# [start resource_limit]
resource_limit = Shell(
name="resource_limit",
command="echo resource limit",
cpu_quota=1,
memory_max=100,
)
# [end resource_limit]
# [end task_declare]
# [start task_relation_declare]
task_group = [task_child_one, task_child_two]
task_parent.set_downstream(task_group)
resource_limit << task_union << task_group
Resource Limit Example
We can add resource limit like CPU quota and max memory by passing parameters when declaring tasks.
resource_limit = Shell(
name="resource_limit",
command="echo resource limit",
cpu_quota=1,
memory_max=100,
)
Dive Into
Task shell.
- class pydolphinscheduler.tasks.shell.Shell(name: str, command: str, *args, **kwargs)[source]
Bases:
WorkerResourceMixin
,BatchTask
Task shell object, declare behavior for shell task to dolphinscheduler.
- Parameters:
name – A unique, meaningful string for the shell task.
command –
One or more command want to run in this task.
It could be simply command:
Shell(name=..., command="echo task shell")
or maybe same commands trying to do complex task:
command = '''echo task shell step 1; echo task shell step 2; echo task shell step 3 ''' Shell(name=..., command=command)
- _downstream_task_codes: set[int]
- _task_custom_attr: set = {'raw_script'}
- _task_relation: set[TaskRelation]
- _timeout: timedelta | int
- _upstream_task_codes: set[int]
- ext: set = {'.sh', '.zsh'}
- ext_attr: str = '_raw_script'
YAML file example
# Define the workflow
workflow:
name: "Shell"
release_state: "offline"
run: true
# Define the tasks within the workflow
tasks:
- name: task_parent
task_type: Shell
command: |
echo hello pydolphinscheduler
echo run task parent
- name: task_child_one
task_type: Shell
deps: [task_parent]
command: echo "child one"
- name: task_child_two
task_type: Shell
deps: [task_parent]
command: echo "child two"