Spark

A spark task type’s example and dive into information of PyDolphinScheduler.

Example

"""A example workflow for task spark."""

from pydolphinscheduler.core.workflow import Workflow
from pydolphinscheduler.tasks.spark import DeployMode, ProgramType, Spark

with Workflow(name="task_spark_example", tenant="tenant_exists") as workflow:
    task = Spark(
        name="task_spark",
        main_class="org.apache.spark.examples.SparkPi",
        main_package="spark-examples_2.12-3.2.0.jar",
        program_type=ProgramType.JAVA,
        deploy_mode=DeployMode.LOCAL,
    )
    workflow.run()

Dive Into

Task Spark.

class pydolphinscheduler.tasks.spark.DeployMode[source]

Bases: str

SPARK deploy mode, for now it just contain LOCAL, CLIENT and CLUSTER.

CLIENT = 'client'
CLUSTER = 'cluster'
LOCAL = 'local'
class pydolphinscheduler.tasks.spark.Spark(name: str, main_class: str, main_package: str, program_type: ProgramType | None = 'SCALA', deploy_mode: DeployMode | None = 'cluster', app_name: str | None = None, driver_cores: int | None = 1, driver_memory: str | None = '512M', num_executors: int | None = 2, executor_memory: str | None = '2G', executor_cores: int | None = 2, main_args: str | None = None, others: str | None = None, *args, **kwargs)[source]

Bases: Engine

Task spark object, declare behavior for spark task to dolphinscheduler.

_downstream_task_codes: Set[int]
_task_custom_attr: set = {'app_name', 'deploy_mode', 'driver_cores', 'driver_memory', 'executor_cores', 'executor_memory', 'main_args', 'num_executors', 'others'}
_task_relation: Set[TaskRelation]
_timeout: timedelta
_upstream_task_codes: Set[int]

YAML file example

# Define the workflow
workflow:
  name: "Spark"

# Define the tasks within the workflow
tasks:
  - name: task
    task_type: Spark
    main_class: org.apache.spark.examples.SparkPi
    main_package: test_java.jar
    program_type: SCALA
    deploy_mode: local