SQL
An example for SQL task, including how to use it and the detail of it parameters.
This task type can execute multiple type of database SQL, which includes
MySQL
PostgreSQL
Oracle
SQL Server
DB2
Hive
Presto
Trino
ClickHouse
Example
"""A example workflow for task SQL."""
from pathlib import Path
from pydolphinscheduler.core.workflow import Workflow
from pydolphinscheduler.resources_plugin import Local
from pydolphinscheduler.tasks.sql import Sql, SqlType
with Workflow(
name="task_sql_example",
) as workflow:
# [start bare_sql_desc]
bare_sql = Sql(
name="bare_sql",
datasource_name="metadata",
sql="select * from t_ds_version",
)
# [end bare_sql_desc]
# [start sql_file_desc]
sql_file = Sql(
name="sql_file",
datasource_name="metadata",
sql="ext/example.sql",
sql_type=SqlType.SELECT,
resource_plugin=Local(prefix=str(Path(__file__).parent)),
)
# [end sql_file_desc]
# [start sql_with_pre_post_desc]
sql_with_pre_post = Sql(
name="sql_with_pre_post",
datasource_name="metadata",
sql="select * from t_ds_version",
pre_statements=[
"update table_one set version = '1.3.6'",
"delete from table_two where version = '1.3.6'",
],
post_statements="update table_one set version = '3.0.0'",
)
# [end sql_with_pre_post_desc]
bare_sql >> [
sql_file,
sql_with_pre_post,
]
workflow.submit()
You can see that SQL task support three ways to declare SQL, which are
Bare SQL: Put bare SQL statement in the
sql
parameter, such asselect * from table1
.bare_sql = Sql( name="bare_sql", datasource_name="metadata", sql="select * from t_ds_version", )
SQL Files: .
sql_file = Sql( name="sql_file", datasource_name="metadata", sql="ext/example.sql", sql_type=SqlType.SELECT, resource_plugin=Local(prefix=str(Path(__file__).parent)), )
If you want to do some preparation before executing SQL, or do some clean up after executing SQL, you can use
pre_statements
and post_statements
parameters to do that. Both pre_statements
and post_statements
support one or multiple statements, you can assign type sequence of SQL statements to them if you want to execute
multiple statements. But if you only need to execute one statement, you can assign a string to them.
sql_with_pre_post = Sql( name="sql_with_pre_post", datasource_name="metadata", sql="select * from t_ds_version", pre_statements=[ "update table_one set version = '1.3.6'", "delete from table_two where version = '1.3.6'", ], post_statements="update table_one set version = '3.0.0'", )
Note
Parameter pre_statements
and post_statements
only support not query statements, such as create table
,
drop table
, update table
currently. And also it only support bare SQL instead of SQL files now.
Dive Into
Task sql.
- class pydolphinscheduler.tasks.sql.Sql(name: str, datasource_name: str, sql: str, datasource_type: str | None = None, sql_type: str | None = None, pre_statements: str | Sequence[str] | None = None, post_statements: str | Sequence[str] | None = None, display_rows: int | None = 10, *args, **kwargs)[source]
Bases:
Task
Task SQL object, declare behavior for SQL task to dolphinscheduler.
It should run sql job in multiply sql lik engine, such as: - ClickHouse - DB2 - HIVE - MySQL - Oracle - Postgresql - Presto - SQLServer You provider datasource_name contain connection information, it decisions which database type and database instance would run this sql.
- Parameters:
name – SQL task name
datasource_name – datasource name in dolphinscheduler, the name must exists and must be
online
datasource instead oftest
.sql – SQL statement, the sql script you want to run. Support resource plugin in this parameter.
sql_type – SQL type, whether sql statement is select query or not. If not provided, it will be auto detected according to sql statement using
pydolphinscheduler.tasks.sql.Sql.sql_type()
, and you can also set it manually. bySqlType.SELECT
for query statement orSqlType.NOT_SELECT
for not query statement.pre_statements – SQL statements to be executed before the main SQL statement.
post_statements – SQL statements to be executed after the main SQL statement.
display_rows – The number of record rows number to be displayed in the SQL task log, default is 10.
- static get_stm_list(stm: str | Sequence[str]) List[str] [source]
Convert statement to str of list.
- Parameters:
stm – statements string
- Returns:
statements list
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'display_rows', 'post_statements', 'pre_statements', 'sql', 'sql_type'}
- _task_relation: Set[TaskRelation]
- _timeout: timedelta
- _upstream_task_codes: Set[int]
- property datasource: Dict
Get datasource for procedure sql.
- ext: set = {'.sql'}
- ext_attr: str = '_sql'
- property sql_type: str
Judgement sql type, it will return the SQL type for type SELECT or NOT_SELECT.
If param_sql_type dot not specific, will use regexp to check which type of the SQL is. But if param_sql_type is specific will use the parameter overwrites the regexp way
- property task_params: Dict
Override Task.task_params for sql task.
sql task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here.
YAML file example
# Define the workflow
workflow:
name: "Sql"
# Define the tasks within the workflow
tasks:
- name: task_base
task_type: Sql
datasource_name: "db"
sql: show tables;
- name: task_multi_line
task_type: Sql
datasource_name: "db"
sql: |
show tables;
select id from version where id=1;
- name: task_file
task_type: Sql
datasource_name: "db"
sql: $FILE{"example_sql.sql"}
# Or you can define task "task_union" it with one line
- { "task_type": "Sql", "name": "task_base_one_line", "datasource_name": "db", "sql": "select id from version where id=1;"}
# Or you can define task "task_union" it with one line
- { "task_type": "Sql", "name": "task_file_one_line", "datasource_name": "db", "sql": '$FILE{"example_sql.sql"}'}
example_sql.sql:
select id from version where id=1;
select id from version where id=2;
select id from version where id=3;
select id from version where id=4;
select id from version where id=5;