Python
A Python task type’s example and dive into information of PyDolphinScheduler.
Example
"""An example workflow for task python."""
from pydolphinscheduler.core.workflow import Workflow
from pydolphinscheduler.tasks.python import Python
with Workflow(
    name="task_python_example",
) as workflow:
    task_python = Python(
        name="task",
        definition="print('hello world.')",
    )
    # [start resource_limit]
    python_resources_limit = Python(
        name="python_resources_limit",
        definition="print('hello world.')",
        cpu_quota=1,
        memory_max=100,
    )
    # [end resource_limit]
    task_python >> python_resources_limit
    workflow.submit()
Resource Limit Example
We can add resource limit like CPU quota and max memory by passing parameters when declaring tasks.
    python_resources_limit = Python(
        name="python_resources_limit",
        definition="print('hello world.')",
        cpu_quota=1,
        memory_max=100,
    )
Dive Into
Task Python.
- class pydolphinscheduler.tasks.python.Python(name: str, definition: str | LambdaType, *args, **kwargs)[source]
 Bases:
WorkerResourceMixin,BatchTaskTask Python object, declare behavior for Python task to dolphinscheduler.
Python task support two types of parameters for :param:
definition, and here is an example:Using str type of :param:
definitionpython_task = Python(name="str_type", definition="print('Hello Python task.')")
Or using Python callable type of :param:
definitiondef foo(): print("Hello Python task.") python_task = Python(name="str_type", definition=foo)
- Parameters:
 name – The name for Python task. It define the task name.
definition – String format of Python script you want to execute or Python callable you want to execute.
- _build_exe_str() str[source]
 Build executable string from given definition.
Attribute
self.definitionalmost is a function, we need to call this function after parsing it to string. The easier way to call a function is using syntaxfunc()and we use it to call it too.
- _downstream_task_codes: set[int]
 
- _task_custom_attr: set = {'raw_script'}
 
- _task_relation: set[TaskRelation]
 
- _timeout: timedelta | int
 
- _upstream_task_codes: set[int]
 
- ext: set = {'.py'}
 
- ext_attr: str | types.FunctionType = '_definition'
 
- property raw_script: str
 Get python task define attribute raw_script.
YAML file example
# Define the workflow
workflow:
  name: "Python"
# Define the tasks within the workflow
tasks:
  - name: python
    task_type: Python
    definition: |
      import os
      print(os)
      print("1")
      print("2")