Kubernetes

A Kubernetes task type’s example and dive into information of PyDolphinScheduler.

Example

"""A example workflow for task kubernetes."""

from pydolphinscheduler.core.workflow import Workflow
from pydolphinscheduler.tasks.kubernetes import Kubernetes

with Workflow(
    name="task_kubernetes_example",
    tenant="tenant_exists",
) as workflow:
    task_k8s = Kubernetes(
        name="task_k8s",
        image="ds-dev",
        namespace=str({"name": "default", "cluster": "lab"}),
        min_cpu_cores=2.0,
        min_memory_space=10.0,
    )
    workflow.submit()

Dive Into

Task Kubernetes.

class pydolphinscheduler.tasks.kubernetes.Kubernetes(name: str, image: str, namespace: str, min_cpu_cores: float, min_memory_space: float, *args, **kwargs)[source]

Bases: Task

Task Kubernetes object, declare behavior for Kubernetes task to dolphinscheduler.

Parameters:
  • name – task name

  • image – the registry url for image.

  • namespace – the namespace for running Kubernetes task.

  • min_cpu_cores – min CPU requirement for running Kubernetes task.

  • min_memory_space – min memory requirement for running Kubernetes task.

  • params_map – It is a local user-defined parameter for Kubernetes task.

_downstream_task_codes: Set[int]
_task_custom_attr: set = {'image', 'min_cpu_cores', 'min_memory_space', 'namespace'}
_task_relation: Set[TaskRelation]
_timeout: timedelta
_upstream_task_codes: Set[int]

YAML file example

# Define the workflow
workflow:
  name: "kubernetes"

# Define the tasks within the workflow
tasks:
  - name: kubernetes
    task_type: K8S
    image: ds-dev
    namespace: '{ "name": "default","cluster": "lab" }'
    minCpuCores: 2.0
    minMemorySpace: 10.0