OpenMLDB

A OpenMLDB task type’s example and dive into information of PyDolphinScheduler.

Example

"""A example workflow for task openmldb."""

from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.tasks.openmldb import OpenMLDB

sql = """USE demo_db;
set @@job_timeout=200000;
LOAD DATA INFILE 'file:///tmp/train_sample.csv'
INTO TABLE talkingdata OPTIONS(mode='overwrite');
"""

with ProcessDefinition(
    name="task_openmldb_example",
    tenant="tenant_exists",
) as pd:
    task_openmldb = OpenMLDB(
        name="task_openmldb",
        zookeeper="127.0.0.1:2181",
        zookeeper_path="/openmldb",
        execute_mode="offline",
        sql=sql,
    )

    pd.run()

Dive Into

Task OpenMLDB.

class pydolphinscheduler.tasks.openmldb.OpenMLDB(name, zookeeper, zookeeper_path, execute_mode, sql, *args, **kwargs)[source]

Bases: Task

Task OpenMLDB object, declare behavior for OpenMLDB task to dolphinscheduler.

Parameters:
  • name – task name

  • zookeeper – OpenMLDB cluster zookeeper address, e.g. 127.0.0.1:2181.

  • zookeeper_path – OpenMLDB cluster zookeeper path, e.g. /openmldb.

  • execute_mode – Determine the init mode, offline or online. You can switch it in sql statementself.

  • sql – SQL statement.

_downstream_task_codes: Set[int]
_task_custom_attr: set = {'execute_mode', 'sql', 'zk', 'zk_path'}
_task_relation: Set[TaskRelation]
_upstream_task_codes: Set[int]

YAML file example

# Define the workflow
workflow:
  name: "OpenMLDB"

# Define the tasks under the workflow
tasks:
  - name: OpenMLDB
    task_type: OpenMLDB
    zookeeper: "127.0.0.1:2181"
    zookeeper_path: "/openmldb"
    execute_mode: "online"
    sql: |
      USE demo_db;
      set @@job_timeout=200000;
      LOAD DATA INFILE 'file:///tmp/train_sample.csv'
      INTO TABLE talkingdata OPTIONS(mode='overwrite');