SQL

Task sql.

class pydolphinscheduler.tasks.sql.Sql(name: str, datasource_name: str, sql: str, sql_type: str | None = None, pre_statements: str | None = None, post_statements: str | None = None, display_rows: int | None = 10, *args, **kwargs)[source]

Bases: Task

Task SQL object, declare behavior for SQL task to dolphinscheduler.

It should run sql job in multiply sql lik engine, such as: - ClickHouse - DB2 - HIVE - MySQL - Oracle - Postgresql - Presto - SQLServer You provider datasource_name contain connection information, it decisions which database type and database instance would run this sql.

_downstream_task_codes: Set[int]
_task_custom_attr: set = {'display_rows', 'post_statements', 'pre_statements', 'sql', 'sql_type'}
_task_relation: Set[TaskRelation]
_upstream_task_codes: Set[int]
property sql_type: str

Judgement sql type, it will return the SQL type for type SELECT or NOT_SELECT.

If param_sql_type dot not specific, will use regexp to check which type of the SQL is. But if param_sql_type is specific will use the parameter overwrites the regexp way

property task_params: Dict

Override Task.task_params for sql task.

sql task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here.

class pydolphinscheduler.tasks.sql.SqlType[source]

Bases: object

SQL type, for now it just contain SELECT and NO_SELECT.

NOT_SELECT = '1'
SELECT = '0'

YAML file example

# Define the workflow
workflow:
  name: "Sql"

# Define the tasks under the workflow
tasks:
  - name: task_base
    task_type: Sql
    datasource_name: "db"
    sql: show tables;

  - name: task_multi_line
    task_type: Sql
    datasource_name: "db"
    sql: |
      show tables;
      select id from version where id=1;

  - name: task_file
    task_type: Sql
    datasource_name: "db"
    sql: $FILE{"example_sql.sql"}

  # Or you can define task "task_union" it with one line
  - { "task_type": "Sql", "name": "task_base_one_line", "datasource_name": "db", "sql": "select id from version where id=1;"}

  # Or you can define task "task_union" it with one line
  - { "task_type": "Sql", "name": "task_file_one_line", "datasource_name": "db", "sql": '$FILE{"example_sql.sql"}'}

example_sql.sql:

select id from version where id=1;
select id from version where id=2;
select id from version where id=3;
select id from version where id=4;
select id from version where id=5;