Python

A Python task type’s example and dive into information of PyDolphinScheduler.

Example

"""An example workflow for task python."""

from pydolphinscheduler.core.workflow import Workflow
from pydolphinscheduler.tasks.python import Python

with Workflow(
    name="task_python_example",
) as workflow:
    task_python = Python(
        name="task",
        definition="print('hello world.')",
    )

    # [start resource_limit]
    python_resources_limit = Python(
        name="python_resources_limit",
        definition="print('hello world.')",
        cpu_quota=1,
        memory_max=100,
    )
    # [end resource_limit]

    task_python >> python_resources_limit
    workflow.submit()

Resource Limit Example

We can add resource limit like CPU quota and max memory by passing parameters when declaring tasks.

    python_resources_limit = Python(
        name="python_resources_limit",
        definition="print('hello world.')",
        cpu_quota=1,
        memory_max=100,
    )

Dive Into

Task Python.

class pydolphinscheduler.tasks.python.Python(name: str, definition: str | LambdaType, *args, **kwargs)[source]

Bases: WorkerResourceMixin, BatchTask

Task Python object, declare behavior for Python task to dolphinscheduler.

Python task support two types of parameters for :param:definition, and here is an example:

Using str type of :param:definition

python_task = Python(name="str_type", definition="print('Hello Python task.')")

Or using Python callable type of :param:definition

def foo():
    print("Hello Python task.")

python_task = Python(name="str_type", definition=foo)
Parameters:
  • name – The name for Python task. It define the task name.

  • definition – String format of Python script you want to execute or Python callable you want to execute.

_build_exe_str() str[source]

Build executable string from given definition.

Attribute self.definition almost is a function, we need to call this function after parsing it to string. The easier way to call a function is using syntax func() and we use it to call it too.

_downstream_task_codes: set[int]
_task_custom_attr: set = {'raw_script'}
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
ext: set = {'.py'}
ext_attr: str | types.FunctionType = '_definition'
property raw_script: str

Get python task define attribute raw_script.

YAML file example

# Define the workflow
workflow:
  name: "Python"

# Define the tasks within the workflow
tasks:
  - name: python
    task_type: Python
    definition: |
      import os
      print(os)
      print("1")
      print("2")