OpenMLDB
A OpenMLDB task type’s example and dive into information of PyDolphinScheduler.
Example
"""A example workflow for task openmldb."""
from pydolphinscheduler.core.workflow import Workflow
from pydolphinscheduler.tasks.openmldb import OpenMLDB
sql = """USE demo_db;
set @@job_timeout=200000;
LOAD DATA INFILE 'file:///tmp/train_sample.csv'
INTO TABLE talkingdata OPTIONS(mode='overwrite');
"""
with Workflow(
name="task_openmldb_example",
) as workflow:
task_openmldb = OpenMLDB(
name="task_openmldb",
zookeeper="127.0.0.1:2181",
zookeeper_path="/openmldb",
execute_mode="offline",
sql=sql,
)
workflow.run()
Dive Into
Task OpenMLDB.
- class pydolphinscheduler.tasks.openmldb.OpenMLDB(name, zookeeper, zookeeper_path, execute_mode, sql, *args, **kwargs)[source]
Bases:
Task
Task OpenMLDB object, declare behavior for OpenMLDB task to dolphinscheduler.
- Parameters:
name – task name
zookeeper – OpenMLDB cluster zookeeper address, e.g. 127.0.0.1:2181.
zookeeper_path – OpenMLDB cluster zookeeper path, e.g. /openmldb.
execute_mode – Determine the init mode, offline or online. You can switch it in sql statementself.
sql – SQL statement.
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'execute_mode', 'sql', 'zk', 'zk_path'}
- _task_relation: Set[TaskRelation]
- _timeout: timedelta
- _upstream_task_codes: Set[int]
YAML file example
# Define the workflow
workflow:
name: "OpenMLDB"
# Define the tasks within the workflow
tasks:
- name: OpenMLDB
task_type: OpenMLDB
zookeeper: "127.0.0.1:2181"
zookeeper_path: "/openmldb"
execute_mode: "online"
sql: |
USE demo_db;
set @@job_timeout=200000;
LOAD DATA INFILE 'file:///tmp/train_sample.csv'
INTO TABLE talkingdata OPTIONS(mode='overwrite');