API

Core

Init pydolphinscheduler.core package.

class pydolphinscheduler.core.Database(database_name: str, type_key, database_key, *args, **kwargs)[source]

Bases: dict

database object, get information about database.

You provider database_name contain connection information, it decisions which database type and database instance would run task.

clear() None.  Remove all items from D.
copy() a shallow copy of D
fromkeys(value=None, /)

Create a new dictionary with keys from iterable and values set to value.

get(key, default=None, /)

Return the value for key if key is in the dictionary, else default.

get_database_info(name) Dict[source]

Get database info from java gateway, contains database id, type, name.

items() a set-like object providing a view on D's items
keys() a set-like object providing a view on D's keys
pop(k[, d]) v, remove specified key and return the corresponding value.

If the key is not found, return the default if given; otherwise, raise a KeyError.

popitem()

Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.

setdefault(key, default=None, /)

Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.

update([E, ]**F) None.  Update D from dict/iterable E and F.

If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]

values() an object providing a view on D's values
property database_id: str

Get database id from java gateway, a wrapper for get_database_info().

property database_type: str

Get database type from java gateway, a wrapper for get_database_info().

class pydolphinscheduler.core.ProcessDefinition(name: str, description: str | None = None, schedule: str | None = None, start_time: str | None = None, end_time: str | None = None, timezone: str | None = 'Asia/Shanghai', user: str | None = 'userPythonGateway', project: str | None = 'project-pydolphin', tenant: str | None = 'tenant_pydolphin', queue: str | None = 'queuePythonGateway', worker_group: str | None = 'default', timeout: int | None = 0, release_state: str | None = 'ONLINE', param: Dict | None = None)[source]

Bases: Base

process definition object, will define process definition attribute, task, relation.

TODO: maybe we should rename this class, currently use DS object name.

_ensure_side_model_exists()[source]

Ensure process definition side model exists.

For now, side object including pydolphinscheduler.side.project.Project, pydolphinscheduler.side.tenant.Tenant, pydolphinscheduler.side.user.User. If these model not exists, would create default value in pydolphinscheduler.constants.ProcessDefinitionDefault.

_handle_root_relation()[source]

Handle root task property pydolphinscheduler.core.task.TaskRelation.

Root task in DAG do not have dominant upstream node, but we have to add an exactly default upstream task with task_code equal to 0. This is requests from java gateway interface.

static _parse_datetime(val: Any) Any[source]
_pre_submit_check()[source]

Check specific condition satisfy before.

This method should be called before process definition submit to java gateway For now, we have below checker: * self.param should be set if task switch in this workflow.

add_task(task: Task) None[source]

Add a single task to process definition.

add_tasks(tasks: List[Task]) None[source]

Add task sequence to process definition, it a wrapper of add_task().

get_define(camel_attr: bool = True) Dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict

Get object definition attribute by given attr set.

get_one_task_by_name(name: str) Task[source]

Get exact one task from process definition by given name.

Function always return one task even though this process definition have more than one task with this name.

get_task(code: str) Task[source]

Get task object from process definition by given code.

get_tasks_by_name(name: str) Set[Task][source]

Get tasks object by given name, if will return all tasks with this name.

run()[source]

Submit and Start ProcessDefinition instance.

Shortcut for function submit() and function start(). Only support manual start workflow for now, and schedule run will coming soon. :return:

start() None[source]

Create and start ProcessDefinition instance.

which post to start-process-instance to java gateway

submit() int[source]

Submit ProcessDefinition instance to java gateway.

_DEFAULT_ATTR: Dict = {}
_DEFINE_ATTR: set = {'_project', '_tenant', 'description', 'name', 'param', 'release_state', 'task_definition_json', 'task_relation_json', 'tasks', 'timeout', 'worker_group'}
_KEY_ATTR: set = {'name', 'param', 'project', 'release_state', 'tenant'}
property end_time: Any

Get attribute end_time.

property param_json: List[Dict] | None

Return param json base on self.param.

property project: Project

Get attribute project.

property schedule_json: Dict | None

Get schedule parameter json object. This is requests from java gateway interface.

property start_time: Any

Get attribute start_time.

property task_definition_json: List[Dict]

Return all tasks definition in list of dict.

property task_list: List[Task]

Return list of tasks objects.

property task_location: List[Dict]

Return all tasks location for all process definition.

For now, we only set all location with same x and y valued equal to 0. Because we do not find a good way to set task locations. This is requests from java gateway interface.

property task_relation_json: List[Dict]

Return all relation between tasks pair in list of dict.

property tenant: Tenant

Get attribute tenant.

property user: User

Get user object.

For now we just get from python side but not from java gateway side, so it may not correct.

class pydolphinscheduler.core.Task(name: str, task_type: str, description: str | None = None, flag: str | None = 'YES', task_priority: str | None = 'MEDIUM', worker_group: str | None = 'default', delay_time: int | None = 0, fail_retry_times: int | None = 0, fail_retry_interval: int | None = 1, timeout_flag: int | None = 'CLOSE', timeout_notify_strategy: Optional = None, timeout: int | None = 0, process_definition: ProcessDefinition | None = None, local_params: List | None = None, resource_list: List | None = None, dependence: Dict | None = None, wait_start_timeout: Dict | None = None, condition_result: Dict | None = None)[source]

Bases: Base

Task object, parent class for all exactly task type.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None[source]

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

gen_code_and_version() Tuple[source]

Generate task code and version from java gateway.

If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_define(camel_attr: bool = True) Dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict

Get object definition attribute by given attr set.

set_downstream(tasks: Task | Sequence[Task]) None[source]

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None[source]

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: Dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_task_custom_attr: set = {}
property condition_result: Dict

Get attribute condition_result.

property process_definition: ProcessDefinition | None

Get attribute process_definition.

property task_params: Dict | None

Get task parameter object.

Will get result to combine _task_custom_attr and custom_attr.

Sides

Init Side package, Side package keep object related to DolphinScheduler but not in the Core part.

class pydolphinscheduler.side.Project(name: str = 'project-pydolphin', description: str | None = None)[source]

Bases: BaseSide

DolphinScheduler Project object.

create_if_not_exists(user='userPythonGateway') None[source]

Create Project if not exists.

get_define(camel_attr: bool = True) Dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict

Get object definition attribute by given attr set.

_DEFAULT_ATTR: Dict = {}
_DEFINE_ATTR: set = {}
_KEY_ATTR: set = {'description', 'name'}
class pydolphinscheduler.side.Queue(name: str = 'queuePythonGateway', description: str | None = '')[source]

Bases: BaseSide

DolphinScheduler Queue object.

create_if_not_exists(user='userPythonGateway') None[source]

Create Queue if not exists.

get_define(camel_attr: bool = True) Dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict

Get object definition attribute by given attr set.

_DEFAULT_ATTR: Dict = {}
_DEFINE_ATTR: set = {}
_KEY_ATTR: set = {'description', 'name'}
class pydolphinscheduler.side.Tenant(name: str = 'tenant_pydolphin', queue: str = 'queuePythonGateway', description: str | None = None)[source]

Bases: BaseSide

DolphinScheduler Tenant object.

create_if_not_exists(queue_name: str, user='userPythonGateway') None[source]

Create Tenant if not exists.

get_define(camel_attr: bool = True) Dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict

Get object definition attribute by given attr set.

_DEFAULT_ATTR: Dict = {}
_DEFINE_ATTR: set = {}
_KEY_ATTR: set = {'description', 'name'}
class pydolphinscheduler.side.User(name: str, password: str, email: str, phone: str, tenant: str, queue: str | None = None, status: int | None = 1)[source]

Bases: BaseSide

DolphinScheduler User object.

create_if_not_exists(**kwargs)[source]

Create User if not exists.

get_define(camel_attr: bool = True) Dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict

Get object definition attribute by given attr set.

_DEFAULT_ATTR: Dict = {}
_DEFINE_ATTR: set = {}
_KEY_ATTR: set = {'email', 'name', 'password', 'phone', 'queue', 'status', 'tenant'}
class pydolphinscheduler.side.WorkerGroup(name: str, address: str, description: str | None = None)[source]

Bases: BaseSide

DolphinScheduler Worker Group object.

classmethod create_if_not_exists(user='userPythonGateway')

Create Base if not exists.

get_define(camel_attr: bool = True) Dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict

Get object definition attribute by given attr set.

_DEFAULT_ATTR: Dict = {}
_DEFINE_ATTR: set = {}
_KEY_ATTR: set = {'description', 'name'}

Tasks

Init pydolphinscheduler.tasks package.

class pydolphinscheduler.tasks.Condition(name: str, condition: ConditionOperator, success_task: Task, failed_task: Task, *args, **kwargs)[source]

Bases: Task

Task condition object, declare behavior for condition task to dolphinscheduler.

_set_dep() None[source]

Set upstream according to parameter condition.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

gen_code_and_version() Tuple

Generate task code and version from java gateway.

If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_define(camel_attr: bool = True) Dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict

Get object definition attribute by given attr set.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: Dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_downstream_task_codes: Set[int]
_task_custom_attr: set = {}
_task_relation: Set[TaskRelation]
_upstream_task_codes: Set[int]
property condition_result: Dict

Get condition result define for java gateway.

property process_definition: ProcessDefinition | None

Get attribute process_definition.

property task_params: Dict

Override Task.task_params for Condition task.

Condition task have some specials attribute dependence, and in most of the task this attribute is None and use empty dict {} as default value. We do not use class attribute _task_custom_attr due to avoid attribute cover.

class pydolphinscheduler.tasks.DataX(name: str, datasource_name: str, datatarget_name: str, sql: str, target_table: str, job_speed_byte: int | None = 0, job_speed_record: int | None = 1000, pre_statements: List[str] | None = None, post_statements: List[str] | None = None, xms: int | None = 1, xmx: int | None = 1, *args, **kwargs)[source]

Bases: Task

Task DataX object, declare behavior for DataX task to dolphinscheduler.

It should run database datax job in multiply sql link engine, such as: - MySQL - Oracle - Postgresql - SQLServer You provider datasource_name and datatarget_name contain connection information, it decisions which database type and database instance would synchronous data.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

gen_code_and_version() Tuple

Generate task code and version from java gateway.

If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_define(camel_attr: bool = True) Dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict

Get object definition attribute by given attr set.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

CUSTOM_CONFIG = 0
DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: Dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_downstream_task_codes: Set[int]
_task_custom_attr: set = {'custom_config', 'job_speed_byte', 'job_speed_record', 'post_statements', 'pre_statements', 'sql', 'target_table', 'xms', 'xmx'}
_task_relation: Set[TaskRelation]
_upstream_task_codes: Set[int]
property condition_result: Dict

Get attribute condition_result.

property process_definition: ProcessDefinition | None

Get attribute process_definition.

property task_params: Dict

Override Task.task_params for datax task.

datax task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here.

class pydolphinscheduler.tasks.Dependent(name: str, dependence: DependentOperator, *args, **kwargs)[source]

Bases: Task

Task dependent object, declare behavior for dependent task to dolphinscheduler.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

gen_code_and_version() Tuple

Generate task code and version from java gateway.

If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_define(camel_attr: bool = True) Dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict

Get object definition attribute by given attr set.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: Dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_downstream_task_codes: Set[int]
_task_custom_attr: set = {}
_task_relation: Set[TaskRelation]
_upstream_task_codes: Set[int]
property condition_result: Dict

Get attribute condition_result.

property process_definition: ProcessDefinition | None

Get attribute process_definition.

property task_params: Dict

Override Task.task_params for dependent task.

Dependent task have some specials attribute dependence, and in most of the task this attribute is None and use empty dict {} as default value. We do not use class attribute _task_custom_attr due to avoid attribute cover.

Bases: Engine

Task flink object, declare behavior for flink task to dolphinscheduler.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

gen_code_and_version() Tuple

Generate task code and version from java gateway.

If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_define(camel_attr: bool = True) Dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict

Get object definition attribute by given attr set.

get_jar_id() int

Get jar id from java gateway, a wrapper for get_resource_info().

get_resource_info(program_type, main_package)

Get resource info from java gateway, contains resource id, name.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: Dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_downstream_task_codes: Set[int]
_task_custom_attr: set = {'app_name', 'deploy_mode', 'flink_version', 'job_manager_memory', 'main_args', 'others', 'parallelism', 'slot', 'task_manager', 'task_manager_memory'}
_task_relation: Set[TaskRelation]
_upstream_task_codes: Set[int]
property condition_result: Dict

Get attribute condition_result.

property process_definition: ProcessDefinition | None

Get attribute process_definition.

property task_params: Dict

Override Task.task_params for engine children task.

children task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here.

class pydolphinscheduler.tasks.Http(name: str, url: str, http_method: str | None = 'GET', http_params: str | None = None, http_check_condition: str | None = 'STATUS_CODE_DEFAULT', condition: str | None = None, connect_timeout: int | None = 60000, socket_timeout: int | None = 60000, *args, **kwargs)[source]

Bases: Task

Task HTTP object, declare behavior for HTTP task to dolphinscheduler.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

gen_code_and_version() Tuple

Generate task code and version from java gateway.

If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_define(camel_attr: bool = True) Dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict

Get object definition attribute by given attr set.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: Dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_downstream_task_codes: Set[int]
_task_custom_attr: set = {'condition', 'connect_timeout', 'http_check_condition', 'http_method', 'http_params', 'socket_timeout', 'url'}
_task_relation: Set[TaskRelation]
_upstream_task_codes: Set[int]
property condition_result: Dict

Get attribute condition_result.

property process_definition: ProcessDefinition | None

Get attribute process_definition.

property task_params: Dict | None

Get task parameter object.

Will get result to combine _task_custom_attr and custom_attr.

class pydolphinscheduler.tasks.MR(name: str, main_class: str, main_package: str, program_type: ProgramType | None = 'SCALA', app_name: str | None = None, main_args: str | None = None, others: str | None = None, *args, **kwargs)[source]

Bases: Engine

Task mr object, declare behavior for mr task to dolphinscheduler.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

gen_code_and_version() Tuple

Generate task code and version from java gateway.

If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_define(camel_attr: bool = True) Dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict

Get object definition attribute by given attr set.

get_jar_id() int

Get jar id from java gateway, a wrapper for get_resource_info().

get_resource_info(program_type, main_package)

Get resource info from java gateway, contains resource id, name.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: Dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_downstream_task_codes: Set[int]
_task_custom_attr: set = {'app_name', 'main_args', 'others'}
_task_relation: Set[TaskRelation]
_upstream_task_codes: Set[int]
property condition_result: Dict

Get attribute condition_result.

property process_definition: ProcessDefinition | None

Get attribute process_definition.

property task_params: Dict

Override Task.task_params for engine children task.

children task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here.

class pydolphinscheduler.tasks.Procedure(name: str, datasource_name: str, method: str, *args, **kwargs)[source]

Bases: Task

Task Procedure object, declare behavior for Procedure task to dolphinscheduler.

It should run database procedure job in multiply sql lik engine, such as: - ClickHouse - DB2 - HIVE - MySQL - Oracle - Postgresql - Presto - SQLServer You provider datasource_name contain connection information, it decisions which database type and database instance would run this sql.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

gen_code_and_version() Tuple

Generate task code and version from java gateway.

If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_define(camel_attr: bool = True) Dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict

Get object definition attribute by given attr set.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: Dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_downstream_task_codes: Set[int]
_task_custom_attr: set = {'method'}
_task_relation: Set[TaskRelation]
_upstream_task_codes: Set[int]
property condition_result: Dict

Get attribute condition_result.

property process_definition: ProcessDefinition | None

Get attribute process_definition.

property task_params: Dict

Override Task.task_params for produce task.

produce task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here.

class pydolphinscheduler.tasks.Python(name: str, code: Any, *args, **kwargs)[source]

Bases: Task

Task Python object, declare behavior for Python task to dolphinscheduler.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

gen_code_and_version() Tuple

Generate task code and version from java gateway.

If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_define(camel_attr: bool = True) Dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict

Get object definition attribute by given attr set.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: Dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_downstream_task_codes: Set[int]
_task_custom_attr: set = {'raw_script'}
_task_relation: Set[TaskRelation]
_upstream_task_codes: Set[int]
property condition_result: Dict

Get attribute condition_result.

property process_definition: ProcessDefinition | None

Get attribute process_definition.

property raw_script: str

Get python task define attribute raw_script.

property task_params: Dict | None

Get task parameter object.

Will get result to combine _task_custom_attr and custom_attr.

class pydolphinscheduler.tasks.Shell(name: str, command: str, *args, **kwargs)[source]

Bases: Task

Task shell object, declare behavior for shell task to dolphinscheduler.

Parameters:
  • name – A unique, meaningful string for the shell task.

  • command

    One or more command want to run in this task.

    It could be simply command:

    Shell(name=..., command="echo task shell")
    

    or maybe same commands trying to do complex task:

    command = '''echo task shell step 1;
    echo task shell step 2;
    echo task shell step 3
    '''
    
    Shell(name=..., command=command)
    

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

gen_code_and_version() Tuple

Generate task code and version from java gateway.

If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_define(camel_attr: bool = True) Dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict

Get object definition attribute by given attr set.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: Dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_downstream_task_codes: Set[int]
_task_custom_attr: set = {'raw_script'}
_task_relation: Set[TaskRelation]
_upstream_task_codes: Set[int]
property condition_result: Dict

Get attribute condition_result.

property process_definition: ProcessDefinition | None

Get attribute process_definition.

property task_params: Dict | None

Get task parameter object.

Will get result to combine _task_custom_attr and custom_attr.

class pydolphinscheduler.tasks.Spark(name: str, main_class: str, main_package: str, program_type: ProgramType | None = 'SCALA', deploy_mode: DeployMode | None = 'cluster', spark_version: SparkVersion | None = 'SPARK2', app_name: str | None = None, driver_cores: int | None = 1, driver_memory: str | None = '512M', num_executors: int | None = 2, executor_memory: str | None = '2G', executor_cores: int | None = 2, main_args: str | None = None, others: str | None = None, *args, **kwargs)[source]

Bases: Engine

Task spark object, declare behavior for spark task to dolphinscheduler.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

gen_code_and_version() Tuple

Generate task code and version from java gateway.

If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_define(camel_attr: bool = True) Dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict

Get object definition attribute by given attr set.

get_jar_id() int

Get jar id from java gateway, a wrapper for get_resource_info().

get_resource_info(program_type, main_package)

Get resource info from java gateway, contains resource id, name.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: Dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_downstream_task_codes: Set[int]
_task_custom_attr: set = {'app_name', 'deploy_mode', 'driver_cores', 'driver_memory', 'executor_cores', 'executor_memory', 'main_args', 'num_executors', 'others', 'spark_version'}
_task_relation: Set[TaskRelation]
_upstream_task_codes: Set[int]
property condition_result: Dict

Get attribute condition_result.

property process_definition: ProcessDefinition | None

Get attribute process_definition.

property task_params: Dict

Override Task.task_params for engine children task.

children task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here.

class pydolphinscheduler.tasks.Sql(name: str, datasource_name: str, sql: str, pre_statements: str | None = None, post_statements: str | None = None, display_rows: int | None = 10, *args, **kwargs)[source]

Bases: Task

Task SQL object, declare behavior for SQL task to dolphinscheduler.

It should run sql job in multiply sql lik engine, such as: - ClickHouse - DB2 - HIVE - MySQL - Oracle - Postgresql - Presto - SQLServer You provider datasource_name contain connection information, it decisions which database type and database instance would run this sql.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

gen_code_and_version() Tuple

Generate task code and version from java gateway.

If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_define(camel_attr: bool = True) Dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict

Get object definition attribute by given attr set.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: Dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_downstream_task_codes: Set[int]
_task_custom_attr: set = {'display_rows', 'post_statements', 'pre_statements', 'sql', 'sql_type'}
_task_relation: Set[TaskRelation]
_upstream_task_codes: Set[int]
property condition_result: Dict

Get attribute condition_result.

property process_definition: ProcessDefinition | None

Get attribute process_definition.

property sql_type: int

Judgement sql type, use regexp to check which type of the sql is.

property task_params: Dict

Override Task.task_params for sql task.

sql task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here.

class pydolphinscheduler.tasks.SubProcess(name: str, process_definition_name: str, *args, **kwargs)[source]

Bases: Task

Task SubProcess object, declare behavior for SubProcess task to dolphinscheduler.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

gen_code_and_version() Tuple

Generate task code and version from java gateway.

If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_define(camel_attr: bool = True) Dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict

Get object definition attribute by given attr set.

get_process_definition_info(process_definition_name: str) Dict[source]

Get process definition info from java gateway, contains process definition id, name, code.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: Dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_downstream_task_codes: Set[int]
_task_custom_attr: set = {'process_definition_code'}
_task_relation: Set[TaskRelation]
_upstream_task_codes: Set[int]
property condition_result: Dict

Get attribute condition_result.

property process_definition: ProcessDefinition | None

Get attribute process_definition.

property process_definition_code: str

Get process definition code, a wrapper for get_process_definition_info().

property task_params: Dict | None

Get task parameter object.

Will get result to combine _task_custom_attr and custom_attr.

class pydolphinscheduler.tasks.Switch(name: str, condition: SwitchCondition, *args, **kwargs)[source]

Bases: Task

Task switch object, declare behavior for switch task to dolphinscheduler.

_set_dep() None[source]

Set downstream according to parameter condition.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

gen_code_and_version() Tuple

Generate task code and version from java gateway.

If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_define(camel_attr: bool = True) Dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict

Get object definition attribute by given attr set.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: Dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_downstream_task_codes: Set[int]
_task_custom_attr: set = {}
_task_relation: Set[TaskRelation]
_upstream_task_codes: Set[int]
property condition_result: Dict

Get attribute condition_result.

property process_definition: ProcessDefinition | None

Get attribute process_definition.

property task_params: Dict

Override Task.task_params for switch task.

switch task have some specials attribute switch, and in most of the task this attribute is None and use empty dict {} as default value. We do not use class attribute _task_custom_attr due to avoid attribute cover.

Constants

Constants for pydolphinscheduler.

class pydolphinscheduler.constants.DefaultTaskCodeNum[source]

Bases: str

Constants and default value for default task code number.

DEFAULT = 1
class pydolphinscheduler.constants.Delimiter[source]

Bases: str

Constants for delimiter.

BAR = '-'
COLON = ':'
DASH = '/'
DIRECTION = '->'
UNDERSCORE = '_'
class pydolphinscheduler.constants.JavaGatewayDefault[source]

Bases: str

Constants and default value for java gateway.

AUTO_CONVERT = True
RESULT_DATA = 'data'
RESULT_MESSAGE_KEYWORD = 'msg'
RESULT_MESSAGE_SUCCESS = 'success'
RESULT_STATUS_KEYWORD = 'status'
RESULT_STATUS_SUCCESS = 'SUCCESS'
SERVER_ADDRESS = '127.0.0.1'
SERVER_PORT = 25333
class pydolphinscheduler.constants.ProcessDefinitionDefault[source]

Bases: object

Constants default value for pydolphinscheduler.core.process_definition.ProcessDefinition.

PROJECT: str = 'project-pydolphin'
QUEUE: str = 'queuePythonGateway'
TENANT: str = 'tenant_pydolphin'
TIME_ZONE: str = 'Asia/Shanghai'
USER: str = 'userPythonGateway'
USER_EMAIL: str = 'userPythonGateway@dolphinscheduler.com'
USER_PHONE: str = '11111111111'
USER_PWD: str = 'userPythonGateway'
USER_STATE: int = 1
WORKER_GROUP: str = 'default'
class pydolphinscheduler.constants.ProcessDefinitionReleaseState[source]

Bases: object

Constants for pydolphinscheduler.core.process_definition.ProcessDefinition release state.

OFFLINE: str = 'OFFLINE'
ONLINE: str = 'ONLINE'
class pydolphinscheduler.constants.TaskFlag[source]

Bases: str

Constants for task flag.

NO = 'NO'
YES = 'YES'
class pydolphinscheduler.constants.TaskPriority[source]

Bases: str

Constants for task priority.

HIGH = 'HIGH'
HIGHEST = 'HIGHEST'
LOW = 'LOW'
LOWEST = 'LOWEST'
MEDIUM = 'MEDIUM'
class pydolphinscheduler.constants.TaskTimeoutFlag[source]

Bases: str

Constants for task timeout flag.

CLOSE = 'CLOSE'
class pydolphinscheduler.constants.TaskType[source]

Bases: str

Constants for task type, it will also show you which kind we support up to now.

CONDITIONS = 'CONDITIONS'
DATAX = 'DATAX'
DEPENDENT = 'DEPENDENT'
HTTP = 'HTTP'
MR = 'MR'
PROCEDURE = 'PROCEDURE'
PYTHON = 'PYTHON'
SHELL = 'SHELL'
SPARK = 'SPARK'
SQL = 'SQL'
SUB_PROCESS = 'SUB_PROCESS'
SWITCH = 'SWITCH'
class pydolphinscheduler.constants.Time[source]

Bases: str

Constants for date.

FMT_DASH_DATE = '%Y/%m/%d'
FMT_NO_COLON_TIME = '%H%M%S'
FMT_SHORT_DATE = '%Y%m%d'
FMT_STD_DATE = '%Y-%m-%d'
FMT_STD_TIME = '%H:%M:%S'
LEN_SHORT_DATE = 8
LEN_STD_DATE = 10

Exceptions

Exceptions for pydolphinscheduler.

exception pydolphinscheduler.exceptions.PyDSBaseException[source]

Bases: Exception

Base exception for pydolphinscheduler.

exception pydolphinscheduler.exceptions.PyDSJavaGatewayException[source]

Bases: PyDSBaseException

Exception for pydolphinscheduler Java gateway error.

exception pydolphinscheduler.exceptions.PyDSParamException[source]

Bases: PyDSBaseException

Exception for pydolphinscheduler parameter verify error.

exception pydolphinscheduler.exceptions.PyDSProcessDefinitionNotAssignException[source]

Bases: PyDSBaseException

Exception for pydolphinscheduler process definition not assign error.

exception pydolphinscheduler.exceptions.PyDSTaskNoFoundException[source]

Bases: PyDSBaseException

Exception for pydolphinscheduler workflow task no found error.