Flink
A flink task type’s example and dive into information of PyDolphinScheduler.
Example
"""A example workflow for task flink."""
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.tasks.flink import DeployMode, Flink, ProgramType
with ProcessDefinition(name="task_flink_example", tenant="tenant_exists") as pd:
task = Flink(
name="task_flink",
main_class="org.apache.flink.streaming.examples.wordcount.WordCount",
main_package="WordCount.jar",
program_type=ProgramType.JAVA,
deploy_mode=DeployMode.LOCAL,
)
pd.run()
Dive Into
Task Flink.
- class pydolphinscheduler.tasks.flink.DeployMode[source]
Bases:
str
Flink deploy mode, for now it just contain LOCAL and CLUSTER.
- CLUSTER = 'cluster'
- LOCAL = 'local'
- class pydolphinscheduler.tasks.flink.Flink(name: str, main_class: str, main_package: str, program_type: ProgramType | None = 'SCALA', deploy_mode: DeployMode | None = 'cluster', flink_version: FlinkVersion | None = '<1.10', app_name: str | None = None, job_manager_memory: str | None = '1G', task_manager_memory: str | None = '2G', slot: int | None = 1, task_manager: int | None = 2, parallelism: int | None = 1, main_args: str | None = None, others: str | None = None, *args, **kwargs)[source]
Bases:
Engine
Task flink object, declare behavior for flink task to dolphinscheduler.
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'app_name', 'deploy_mode', 'flink_version', 'job_manager_memory', 'main_args', 'others', 'parallelism', 'slot', 'task_manager', 'task_manager_memory'}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
YAML file example
# Define the workflow
workflow:
name: "Flink"
# Define the tasks under the workflow
tasks:
- name: task
task_type: Flink
main_class: org.apache.flink.streaming.examples.wordcount.WordCount
main_package: test_java.jar
program_type: JAVA
deploy_mode: local