API
Core
Init pydolphinscheduler.core package.
- class pydolphinscheduler.core.Database(database_name: str, type_key, database_key, *args, **kwargs)[source]
Bases:
dict
database object, get information about database.
You provider database_name contain connection information, it decisions which database type and database instance would run task.
- clear() None. Remove all items from D.
- copy() a shallow copy of D
- fromkeys(value=None, /)
Create a new dictionary with keys from iterable and values set to value.
- get(key, default=None, /)
Return the value for key if key is in the dictionary, else default.
- get_database_info(name) Dict [source]
Get database info from java gateway, contains database id, type, name.
- items() a set-like object providing a view on D's items
- keys() a set-like object providing a view on D's keys
- pop(k[, d]) v, remove specified key and return the corresponding value.
If the key is not found, return the default if given; otherwise, raise a KeyError.
- popitem()
Remove and return a (key, value) pair as a 2-tuple.
Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.
- setdefault(key, default=None, /)
Insert key with a value of default if key is not in the dictionary.
Return the value for key if key is in the dictionary, else default.
- update([E, ]**F) None. Update D from dict/iterable E and F.
If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]
- values() an object providing a view on D's values
- property database_id: str
Get database id from java gateway, a wrapper for
get_database_info()
.
- property database_type: str
Get database type from java gateway, a wrapper for
get_database_info()
.
- class pydolphinscheduler.core.ProcessDefinition(name: str, description: str | None = None, schedule: str | None = None, start_time: str | None = None, end_time: str | None = None, timezone: str | None = 'Asia/Shanghai', user: str | None = 'userPythonGateway', project: str | None = 'project-pydolphin', tenant: str | None = 'tenant_pydolphin', queue: str | None = 'queuePythonGateway', worker_group: str | None = 'default', timeout: int | None = 0, release_state: str | None = 'ONLINE', param: Dict | None = None)[source]
Bases:
Base
process definition object, will define process definition attribute, task, relation.
TODO: maybe we should rename this class, currently use DS object name.
- _ensure_side_model_exists()[source]
Ensure process definition side model exists.
For now, side object including
pydolphinscheduler.side.project.Project
,pydolphinscheduler.side.tenant.Tenant
,pydolphinscheduler.side.user.User
. If these model not exists, would create default value inpydolphinscheduler.constants.ProcessDefinitionDefault
.
- _handle_root_relation()[source]
Handle root task property
pydolphinscheduler.core.task.TaskRelation
.Root task in DAG do not have dominant upstream node, but we have to add an exactly default upstream task with task_code equal to 0. This is requests from java gateway interface.
- _pre_submit_check()[source]
Check specific condition satisfy before.
This method should be called before process definition submit to java gateway For now, we have below checker: * self.param should be set if task switch in this workflow.
- add_tasks(tasks: List[Task]) None [source]
Add task sequence to process definition, it a wrapper of
add_task()
.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- get_one_task_by_name(name: str) Task [source]
Get exact one task from process definition by given name.
Function always return one task even though this process definition have more than one task with this name.
- get_tasks_by_name(name: str) Set[Task] [source]
Get tasks object by given name, if will return all tasks with this name.
- run()[source]
Submit and Start ProcessDefinition instance.
Shortcut for function
submit()
and functionstart()
. Only support manual start workflow for now, and schedule run will coming soon. :return:
- start() None [source]
Create and start ProcessDefinition instance.
which post to start-process-instance to java gateway
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'_project', '_tenant', 'description', 'name', 'param', 'release_state', 'task_definition_json', 'task_relation_json', 'tasks', 'timeout', 'worker_group'}
- _KEY_ATTR: set = {'name', 'param', 'project', 'release_state', 'tenant'}
- property end_time: Any
Get attribute end_time.
- property param_json: List[Dict] | None
Return param json base on self.param.
- property schedule_json: Dict | None
Get schedule parameter json object. This is requests from java gateway interface.
- property start_time: Any
Get attribute start_time.
- property task_definition_json: List[Dict]
Return all tasks definition in list of dict.
- property task_location: List[Dict]
Return all tasks location for all process definition.
For now, we only set all location with same x and y valued equal to 0. Because we do not find a good way to set task locations. This is requests from java gateway interface.
- property task_relation_json: List[Dict]
Return all relation between tasks pair in list of dict.
- class pydolphinscheduler.core.Task(name: str, task_type: str, description: str | None = None, flag: str | None = 'YES', task_priority: str | None = 'MEDIUM', worker_group: str | None = 'default', delay_time: int | None = 0, fail_retry_times: int | None = 0, fail_retry_interval: int | None = 1, timeout_flag: int | None = 'CLOSE', timeout_notify_strategy: Optional = None, timeout: int | None = 0, process_definition: ProcessDefinition | None = None, local_params: List | None = None, resource_list: List | None = None, dependence: Dict | None = None, wait_start_timeout: Dict | None = None, condition_result: Dict | None = None)[source]
Bases:
Base
Task object, parent class for all exactly task type.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None [source]
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()
andset_downstream()
.
- gen_code_and_version() Tuple [source]
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- set_downstream(tasks: Task | Sequence[Task]) None [source]
Set parameter tasks as downstream to current task.
- set_upstream(tasks: Task | Sequence[Task]) None [source]
Set parameter tasks as upstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _task_custom_attr: set = {}
- property condition_result: Dict
Get attribute condition_result.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property task_params: Dict | None
Get task parameter object.
Will get result to combine _task_custom_attr and custom_attr.
Sides
Init Side package, Side package keep object related to DolphinScheduler but not in the Core part.
- class pydolphinscheduler.side.Project(name: str = 'project-pydolphin', description: str | None = None)[source]
Bases:
BaseSide
DolphinScheduler Project object.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {}
- _KEY_ATTR: set = {'description', 'name'}
- class pydolphinscheduler.side.Queue(name: str = 'queuePythonGateway', description: str | None = '')[source]
Bases:
BaseSide
DolphinScheduler Queue object.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {}
- _KEY_ATTR: set = {'description', 'name'}
- class pydolphinscheduler.side.Tenant(name: str = 'tenant_pydolphin', queue: str = 'queuePythonGateway', description: str | None = None)[source]
Bases:
BaseSide
DolphinScheduler Tenant object.
- create_if_not_exists(queue_name: str, user='userPythonGateway') None [source]
Create Tenant if not exists.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {}
- _KEY_ATTR: set = {'description', 'name'}
- class pydolphinscheduler.side.User(name: str, password: str, email: str, phone: str, tenant: str, queue: str | None = None, status: int | None = 1)[source]
Bases:
BaseSide
DolphinScheduler User object.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {}
- _KEY_ATTR: set = {'email', 'name', 'password', 'phone', 'queue', 'status', 'tenant'}
- class pydolphinscheduler.side.WorkerGroup(name: str, address: str, description: str | None = None)[source]
Bases:
BaseSide
DolphinScheduler Worker Group object.
- classmethod create_if_not_exists(user='userPythonGateway')
Create Base if not exists.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {}
- _KEY_ATTR: set = {'description', 'name'}
Tasks
Init pydolphinscheduler.tasks package.
- class pydolphinscheduler.tasks.Condition(name: str, condition: ConditionOperator, success_task: Task, failed_task: Task, *args, **kwargs)[source]
Bases:
Task
Task condition object, declare behavior for condition task to dolphinscheduler.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()
andset_downstream()
.
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get condition result define for java gateway.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property task_params: Dict
Override Task.task_params for Condition task.
Condition task have some specials attribute dependence, and in most of the task this attribute is None and use empty dict {} as default value. We do not use class attribute _task_custom_attr due to avoid attribute cover.
- class pydolphinscheduler.tasks.DataX(name: str, datasource_name: str, datatarget_name: str, sql: str, target_table: str, job_speed_byte: int | None = 0, job_speed_record: int | None = 1000, pre_statements: List[str] | None = None, post_statements: List[str] | None = None, xms: int | None = 1, xmx: int | None = 1, *args, **kwargs)[source]
Bases:
Task
Task DataX object, declare behavior for DataX task to dolphinscheduler.
It should run database datax job in multiply sql link engine, such as: - MySQL - Oracle - Postgresql - SQLServer You provider datasource_name and datatarget_name contain connection information, it decisions which database type and database instance would synchronous data.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()
andset_downstream()
.
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- CUSTOM_CONFIG = 0
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'custom_config', 'job_speed_byte', 'job_speed_record', 'post_statements', 'pre_statements', 'sql', 'target_table', 'xms', 'xmx'}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property task_params: Dict
Override Task.task_params for datax task.
datax task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here.
- class pydolphinscheduler.tasks.Dependent(name: str, dependence: DependentOperator, *args, **kwargs)[source]
Bases:
Task
Task dependent object, declare behavior for dependent task to dolphinscheduler.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()
andset_downstream()
.
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property task_params: Dict
Override Task.task_params for dependent task.
Dependent task have some specials attribute dependence, and in most of the task this attribute is None and use empty dict {} as default value. We do not use class attribute _task_custom_attr due to avoid attribute cover.
- class pydolphinscheduler.tasks.Flink(name: str, main_class: str, main_package: str, program_type: ProgramType | None = 'SCALA', deploy_mode: DeployMode | None = 'cluster', flink_version: FlinkVersion | None = '<1.10', app_name: str | None = None, job_manager_memory: str | None = '1G', task_manager_memory: str | None = '2G', slot: int | None = 1, task_manager: int | None = 2, parallelism: int | None = 1, main_args: str | None = None, others: str | None = None, *args, **kwargs)[source]
Bases:
Engine
Task flink object, declare behavior for flink task to dolphinscheduler.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()
andset_downstream()
.
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- get_jar_id() int
Get jar id from java gateway, a wrapper for
get_resource_info()
.
- get_resource_info(program_type, main_package)
Get resource info from java gateway, contains resource id, name.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'app_name', 'deploy_mode', 'flink_version', 'job_manager_memory', 'main_args', 'others', 'parallelism', 'slot', 'task_manager', 'task_manager_memory'}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property task_params: Dict
Override Task.task_params for engine children task.
children task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here.
- class pydolphinscheduler.tasks.Http(name: str, url: str, http_method: str | None = 'GET', http_params: str | None = None, http_check_condition: str | None = 'STATUS_CODE_DEFAULT', condition: str | None = None, connect_timeout: int | None = 60000, socket_timeout: int | None = 60000, *args, **kwargs)[source]
Bases:
Task
Task HTTP object, declare behavior for HTTP task to dolphinscheduler.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()
andset_downstream()
.
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'condition', 'connect_timeout', 'http_check_condition', 'http_method', 'http_params', 'socket_timeout', 'url'}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property task_params: Dict | None
Get task parameter object.
Will get result to combine _task_custom_attr and custom_attr.
- class pydolphinscheduler.tasks.MR(name: str, main_class: str, main_package: str, program_type: ProgramType | None = 'SCALA', app_name: str | None = None, main_args: str | None = None, others: str | None = None, *args, **kwargs)[source]
Bases:
Engine
Task mr object, declare behavior for mr task to dolphinscheduler.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()
andset_downstream()
.
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- get_jar_id() int
Get jar id from java gateway, a wrapper for
get_resource_info()
.
- get_resource_info(program_type, main_package)
Get resource info from java gateway, contains resource id, name.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'app_name', 'main_args', 'others'}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property task_params: Dict
Override Task.task_params for engine children task.
children task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here.
- class pydolphinscheduler.tasks.Procedure(name: str, datasource_name: str, method: str, *args, **kwargs)[source]
Bases:
Task
Task Procedure object, declare behavior for Procedure task to dolphinscheduler.
It should run database procedure job in multiply sql lik engine, such as: - ClickHouse - DB2 - HIVE - MySQL - Oracle - Postgresql - Presto - SQLServer You provider datasource_name contain connection information, it decisions which database type and database instance would run this sql.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()
andset_downstream()
.
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'method'}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property task_params: Dict
Override Task.task_params for produce task.
produce task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here.
- class pydolphinscheduler.tasks.Python(name: str, code: Any, *args, **kwargs)[source]
Bases:
Task
Task Python object, declare behavior for Python task to dolphinscheduler.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()
andset_downstream()
.
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'raw_script'}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property raw_script: str
Get python task define attribute raw_script.
- property task_params: Dict | None
Get task parameter object.
Will get result to combine _task_custom_attr and custom_attr.
- class pydolphinscheduler.tasks.Shell(name: str, command: str, *args, **kwargs)[source]
Bases:
Task
Task shell object, declare behavior for shell task to dolphinscheduler.
- Parameters:
name – A unique, meaningful string for the shell task.
command –
One or more command want to run in this task.
It could be simply command:
Shell(name=..., command="echo task shell")
or maybe same commands trying to do complex task:
command = '''echo task shell step 1; echo task shell step 2; echo task shell step 3 ''' Shell(name=..., command=command)
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()
andset_downstream()
.
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'raw_script'}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property task_params: Dict | None
Get task parameter object.
Will get result to combine _task_custom_attr and custom_attr.
- class pydolphinscheduler.tasks.Spark(name: str, main_class: str, main_package: str, program_type: ProgramType | None = 'SCALA', deploy_mode: DeployMode | None = 'cluster', spark_version: SparkVersion | None = 'SPARK2', app_name: str | None = None, driver_cores: int | None = 1, driver_memory: str | None = '512M', num_executors: int | None = 2, executor_memory: str | None = '2G', executor_cores: int | None = 2, main_args: str | None = None, others: str | None = None, *args, **kwargs)[source]
Bases:
Engine
Task spark object, declare behavior for spark task to dolphinscheduler.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()
andset_downstream()
.
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- get_jar_id() int
Get jar id from java gateway, a wrapper for
get_resource_info()
.
- get_resource_info(program_type, main_package)
Get resource info from java gateway, contains resource id, name.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'app_name', 'deploy_mode', 'driver_cores', 'driver_memory', 'executor_cores', 'executor_memory', 'main_args', 'num_executors', 'others', 'spark_version'}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property task_params: Dict
Override Task.task_params for engine children task.
children task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here.
- class pydolphinscheduler.tasks.Sql(name: str, datasource_name: str, sql: str, pre_statements: str | None = None, post_statements: str | None = None, display_rows: int | None = 10, *args, **kwargs)[source]
Bases:
Task
Task SQL object, declare behavior for SQL task to dolphinscheduler.
It should run sql job in multiply sql lik engine, such as: - ClickHouse - DB2 - HIVE - MySQL - Oracle - Postgresql - Presto - SQLServer You provider datasource_name contain connection information, it decisions which database type and database instance would run this sql.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()
andset_downstream()
.
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'display_rows', 'post_statements', 'pre_statements', 'sql', 'sql_type'}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property sql_type: int
Judgement sql type, use regexp to check which type of the sql is.
- property task_params: Dict
Override Task.task_params for sql task.
sql task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here.
- class pydolphinscheduler.tasks.SubProcess(name: str, process_definition_name: str, *args, **kwargs)[source]
Bases:
Task
Task SubProcess object, declare behavior for SubProcess task to dolphinscheduler.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()
andset_downstream()
.
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- get_process_definition_info(process_definition_name: str) Dict [source]
Get process definition info from java gateway, contains process definition id, name, code.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'process_definition_code'}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property process_definition_code: str
Get process definition code, a wrapper for
get_process_definition_info()
.
- property task_params: Dict | None
Get task parameter object.
Will get result to combine _task_custom_attr and custom_attr.
- class pydolphinscheduler.tasks.Switch(name: str, condition: SwitchCondition, *args, **kwargs)[source]
Bases:
Task
Task switch object, declare behavior for switch task to dolphinscheduler.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()
andset_downstream()
.
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property task_params: Dict
Override Task.task_params for switch task.
switch task have some specials attribute switch, and in most of the task this attribute is None and use empty dict {} as default value. We do not use class attribute _task_custom_attr due to avoid attribute cover.
Constants
Constants for pydolphinscheduler.
- class pydolphinscheduler.constants.DefaultTaskCodeNum[source]
Bases:
str
Constants and default value for default task code number.
- DEFAULT = 1
- class pydolphinscheduler.constants.Delimiter[source]
Bases:
str
Constants for delimiter.
- BAR = '-'
- COLON = ':'
- DASH = '/'
- DIRECTION = '->'
- UNDERSCORE = '_'
- class pydolphinscheduler.constants.JavaGatewayDefault[source]
Bases:
str
Constants and default value for java gateway.
- AUTO_CONVERT = True
- RESULT_DATA = 'data'
- RESULT_MESSAGE_KEYWORD = 'msg'
- RESULT_MESSAGE_SUCCESS = 'success'
- RESULT_STATUS_KEYWORD = 'status'
- RESULT_STATUS_SUCCESS = 'SUCCESS'
- SERVER_ADDRESS = '127.0.0.1'
- SERVER_PORT = 25333
- class pydolphinscheduler.constants.ProcessDefinitionDefault[source]
Bases:
object
Constants default value for
pydolphinscheduler.core.process_definition.ProcessDefinition
.- PROJECT: str = 'project-pydolphin'
- QUEUE: str = 'queuePythonGateway'
- TENANT: str = 'tenant_pydolphin'
- TIME_ZONE: str = 'Asia/Shanghai'
- USER: str = 'userPythonGateway'
- USER_EMAIL: str = 'userPythonGateway@dolphinscheduler.com'
- USER_PHONE: str = '11111111111'
- USER_PWD: str = 'userPythonGateway'
- USER_STATE: int = 1
- WORKER_GROUP: str = 'default'
- class pydolphinscheduler.constants.ProcessDefinitionReleaseState[source]
Bases:
object
Constants for
pydolphinscheduler.core.process_definition.ProcessDefinition
release state.- OFFLINE: str = 'OFFLINE'
- ONLINE: str = 'ONLINE'
- class pydolphinscheduler.constants.TaskFlag[source]
Bases:
str
Constants for task flag.
- NO = 'NO'
- YES = 'YES'
- class pydolphinscheduler.constants.TaskPriority[source]
Bases:
str
Constants for task priority.
- HIGH = 'HIGH'
- HIGHEST = 'HIGHEST'
- LOW = 'LOW'
- LOWEST = 'LOWEST'
- MEDIUM = 'MEDIUM'
- class pydolphinscheduler.constants.TaskTimeoutFlag[source]
Bases:
str
Constants for task timeout flag.
- CLOSE = 'CLOSE'
- class pydolphinscheduler.constants.TaskType[source]
Bases:
str
Constants for task type, it will also show you which kind we support up to now.
- CONDITIONS = 'CONDITIONS'
- DATAX = 'DATAX'
- DEPENDENT = 'DEPENDENT'
- FLINK = 'FLINK'
- HTTP = 'HTTP'
- MR = 'MR'
- PROCEDURE = 'PROCEDURE'
- PYTHON = 'PYTHON'
- SHELL = 'SHELL'
- SPARK = 'SPARK'
- SQL = 'SQL'
- SUB_PROCESS = 'SUB_PROCESS'
- SWITCH = 'SWITCH'
Exceptions
Exceptions for pydolphinscheduler.
- exception pydolphinscheduler.exceptions.PyDSBaseException[source]
Bases:
Exception
Base exception for pydolphinscheduler.
- exception pydolphinscheduler.exceptions.PyDSJavaGatewayException[source]
Bases:
PyDSBaseException
Exception for pydolphinscheduler Java gateway error.
- exception pydolphinscheduler.exceptions.PyDSParamException[source]
Bases:
PyDSBaseException
Exception for pydolphinscheduler parameter verify error.
- exception pydolphinscheduler.exceptions.PyDSProcessDefinitionNotAssignException[source]
Bases:
PyDSBaseException
Exception for pydolphinscheduler process definition not assign error.
- exception pydolphinscheduler.exceptions.PyDSTaskNoFoundException[source]
Bases:
PyDSBaseException
Exception for pydolphinscheduler workflow task no found error.