SQL

An example for SQL task, including how to use it and the detail of it parameters.

This task type can execute multiple type of database SQL, which includes

  • MySQL

  • PostgreSQL

  • Oracle

  • SQL Server

  • DB2

  • Hive

  • Presto

  • Trino

  • ClickHouse

Example


"""A example workflow for task SQL."""
from pathlib import Path

from pydolphinscheduler.core.workflow import Workflow
from pydolphinscheduler.resources_plugin import Local
from pydolphinscheduler.tasks.sql import Sql, SqlType

with Workflow(
    name="task_sql_example",
) as workflow:
    # [start bare_sql_desc]
    bare_sql = Sql(
        name="bare_sql",
        datasource_name="metadata",
        sql="select * from t_ds_version",
    )
    # [end bare_sql_desc]
    # [start sql_file_desc]
    sql_file = Sql(
        name="sql_file",
        datasource_name="metadata",
        sql="ext/example.sql",
        sql_type=SqlType.SELECT,
        resource_plugin=Local(prefix=str(Path(__file__).parent)),
    )
    # [end sql_file_desc]
    # [start sql_with_pre_post_desc]
    sql_with_pre_post = Sql(
        name="sql_with_pre_post",
        datasource_name="metadata",
        sql="select * from t_ds_version",
        pre_statements=[
            "update table_one set version = '1.3.6'",
            "delete from table_two where version = '1.3.6'",
        ],
        post_statements="update table_one set version = '3.0.0'",
    )
    # [end sql_with_pre_post_desc]

    bare_sql >> [
        sql_file,
        sql_with_pre_post,
    ]

    workflow.submit()

You can see that SQL task support three ways to declare SQL, which are

  • Bare SQL: Put bare SQL statement in the sql parameter, such as select * from table1.

        bare_sql = Sql(
            name="bare_sql",
            datasource_name="metadata",
            sql="select * from t_ds_version",
        )
    
  • SQL Files: .

        sql_file = Sql(
            name="sql_file",
            datasource_name="metadata",
            sql="ext/example.sql",
            sql_type=SqlType.SELECT,
            resource_plugin=Local(prefix=str(Path(__file__).parent)),
        )
    

If you want to do some preparation before executing SQL, or do some clean up after executing SQL, you can use pre_statements and post_statements parameters to do that. Both pre_statements and post_statements support one or multiple statements, you can assign type sequence of SQL statements to them if you want to execute multiple statements. But if you only need to execute one statement, you can assign a string to them.

    sql_with_pre_post = Sql(
        name="sql_with_pre_post",
        datasource_name="metadata",
        sql="select * from t_ds_version",
        pre_statements=[
            "update table_one set version = '1.3.6'",
            "delete from table_two where version = '1.3.6'",
        ],
        post_statements="update table_one set version = '3.0.0'",
    )

Note

Parameter pre_statements and post_statements only support not query statements, such as create table, drop table, update table currently. And also it only support bare SQL instead of SQL files now.

Dive Into

Task sql.

class pydolphinscheduler.tasks.sql.Sql(name: str, datasource_name: str, sql: str, datasource_type: str | None = None, sql_type: str | None = None, pre_statements: str | Sequence[str] | None = None, post_statements: str | Sequence[str] | None = None, display_rows: int | None = 10, *args, **kwargs)[source]

Bases: BatchTask

Task SQL object, declare behavior for SQL task to dolphinscheduler.

It should run sql job in multiply sql lik engine, such as: - ClickHouse - DB2 - HIVE - MySQL - Oracle - Postgresql - Presto - SQLServer You provider datasource_name contain connection information, it decisions which database type and database instance would run this sql.

Parameters:
  • name – SQL task name

  • datasource_name – datasource name in dolphinscheduler, the name must exists and must be online datasource instead of test.

  • sql – SQL statement, the sql script you want to run. Support resource plugin in this parameter.

  • sql_type – SQL type, whether sql statement is select query or not. If not provided, it will be auto detected according to sql statement using pydolphinscheduler.tasks.sql.Sql.sql_type(), and you can also set it manually. by SqlType.SELECT for query statement or SqlType.NOT_SELECT for not query statement.

  • pre_statements – SQL statements to be executed before the main SQL statement.

  • post_statements – SQL statements to be executed after the main SQL statement.

  • display_rows – The number of record rows number to be displayed in the SQL task log, default is 10.

static get_stm_list(stm: str | Sequence[str]) list[str][source]

Convert statement to str of list.

Parameters:

stm – statements string

Returns:

statements list

_downstream_task_codes: set[int]
_task_custom_attr: set = {'display_rows', 'post_statements', 'pre_statements', 'sql', 'sql_type'}
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
property datasource: dict

Get datasource for procedure sql.

ext: set = {'.sql'}
ext_attr: str = '_sql'
property sql_type: str

Judgement sql type, it will return the SQL type for type SELECT or NOT_SELECT.

If param_sql_type dot not specific, will use regexp to check which type of the SQL is. But if param_sql_type is specific will use the parameter overwrites the regexp way

property task_params: dict

Override Task.task_params for sql task.

sql task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here.

class pydolphinscheduler.tasks.sql.SqlType[source]

Bases: object

SQL type, for now it just contain SELECT and NO_SELECT.

NOT_SELECT = '1'
SELECT = '0'

YAML file example

# Define the workflow
workflow:
  name: "Sql"

# Define the tasks within the workflow
tasks:
  - name: task_base
    task_type: Sql
    datasource_name: "db"
    sql: show tables;

  - name: task_multi_line
    task_type: Sql
    datasource_name: "db"
    sql: |
      show tables;
      select id from version where id=1;

  - name: task_file
    task_type: Sql
    datasource_name: "db"
    sql: $FILE{"example_sql.sql"}

  # Or you can define task "task_union" it with one line
  - { "task_type": "Sql", "name": "task_base_one_line", "datasource_name": "db", "sql": "select id from version where id=1;"}

  # Or you can define task "task_union" it with one line
  - { "task_type": "Sql", "name": "task_file_one_line", "datasource_name": "db", "sql": '$FILE{"example_sql.sql"}'}

example_sql.sql:

select id from version where id=1;
select id from version where id=2;
select id from version where id=3;
select id from version where id=4;
select id from version where id=5;