SQL
Task sql.
- class pydolphinscheduler.tasks.sql.Sql(name: str, datasource_name: str, sql: str, sql_type: str | None = None, pre_statements: str | None = None, post_statements: str | None = None, display_rows: int | None = 10, *args, **kwargs)[source]
Bases:
Task
Task SQL object, declare behavior for SQL task to dolphinscheduler.
It should run sql job in multiply sql lik engine, such as: - ClickHouse - DB2 - HIVE - MySQL - Oracle - Postgresql - Presto - SQLServer You provider datasource_name contain connection information, it decisions which database type and database instance would run this sql.
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'display_rows', 'post_statements', 'pre_statements', 'sql', 'sql_type'}
- _task_relation: Set[TaskRelation]
- _timeout: timedelta
- _upstream_task_codes: Set[int]
- ext: set = {'.sql'}
- ext_attr: str = '_sql'
- property sql_type: str
Judgement sql type, it will return the SQL type for type SELECT or NOT_SELECT.
If param_sql_type dot not specific, will use regexp to check which type of the SQL is. But if param_sql_type is specific will use the parameter overwrites the regexp way
- property task_params: Dict
Override Task.task_params for sql task.
sql task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here.
- class pydolphinscheduler.tasks.sql.SqlType[source]
Bases:
object
SQL type, for now it just contain SELECT and NO_SELECT.
- NOT_SELECT = '1'
- SELECT = '0'
YAML file example
# Define the workflow
workflow:
name: "Sql"
# Define the tasks within the workflow
tasks:
- name: task_base
task_type: Sql
datasource_name: "db"
sql: show tables;
- name: task_multi_line
task_type: Sql
datasource_name: "db"
sql: |
show tables;
select id from version where id=1;
- name: task_file
task_type: Sql
datasource_name: "db"
sql: $FILE{"example_sql.sql"}
# Or you can define task "task_union" it with one line
- { "task_type": "Sql", "name": "task_base_one_line", "datasource_name": "db", "sql": "select id from version where id=1;"}
# Or you can define task "task_union" it with one line
- { "task_type": "Sql", "name": "task_file_one_line", "datasource_name": "db", "sql": '$FILE{"example_sql.sql"}'}
example_sql.sql:
select id from version where id=1;
select id from version where id=2;
select id from version where id=3;
select id from version where id=4;
select id from version where id=5;