Python
A Python task type’s example and dive into information of PyDolphinScheduler.
Example
"""An example workflow for task python."""
from pydolphinscheduler.core.workflow import Workflow
from pydolphinscheduler.tasks.python import Python
with Workflow(
name="task_python_example",
) as workflow:
task_python = Python(
name="task",
definition="print('hello world.')",
)
# [start resource_limit]
python_resources_limit = Python(
name="python_resources_limit",
definition="print('hello world.')",
cpu_quota=1,
memory_max=100,
)
# [end resource_limit]
task_python >> python_resources_limit
workflow.submit()
Resource Limit Example
We can add resource limit like CPU quota and max memory by passing parameters when declaring tasks.
python_resources_limit = Python(
name="python_resources_limit",
definition="print('hello world.')",
cpu_quota=1,
memory_max=100,
)
Dive Into
Task Python.
- class pydolphinscheduler.tasks.python.Python(name: str, definition: str | LambdaType, *args, **kwargs)[source]
Bases:
WorkerResourceMixin
,BatchTask
Task Python object, declare behavior for Python task to dolphinscheduler.
Python task support two types of parameters for :param:
definition
, and here is an example:Using str type of :param:
definition
python_task = Python(name="str_type", definition="print('Hello Python task.')")
Or using Python callable type of :param:
definition
def foo(): print("Hello Python task.") python_task = Python(name="str_type", definition=foo)
- Parameters:
name – The name for Python task. It define the task name.
definition – String format of Python script you want to execute or Python callable you want to execute.
- _build_exe_str() str [source]
Build executable string from given definition.
Attribute
self.definition
almost is a function, we need to call this function after parsing it to string. The easier way to call a function is using syntaxfunc()
and we use it to call it too.
- _downstream_task_codes: set[int]
- _task_custom_attr: set = {'raw_script'}
- _task_relation: set[TaskRelation]
- _timeout: timedelta | int
- _upstream_task_codes: set[int]
- ext: set = {'.py'}
- ext_attr: str | types.FunctionType = '_definition'
- property raw_script: str
Get python task define attribute raw_script.
YAML file example
# Define the workflow
workflow:
name: "Python"
# Define the tasks within the workflow
tasks:
- name: python
task_type: Python
definition: |
import os
print(os)
print("1")
print("2")