API
Core
Init pydolphinscheduler.core package.
- class pydolphinscheduler.core.Database(database_name: str, type_key, database_key, *args, **kwargs)[source]
Bases:
dictdatabase object, get information about database.
You provider database_name contain connection information, it decisions which database type and database instance would run task.
- clear() None. Remove all items from D.
- copy() a shallow copy of D
- classmethod fromkeys(iterable, value=None, /)
Create a new dictionary with keys from iterable and values set to value.
- get(key, default=None, /)
Return the value for key if key is in the dictionary, else default.
- get_database_info(name) Dict[source]
Get database info from java gateway, contains database id, type, name.
- items() a set-like object providing a view on D's items
- keys() a set-like object providing a view on D's keys
- pop(k[, d]) v, remove specified key and return the corresponding value.
If the key is not found, return the default if given; otherwise, raise a KeyError.
- popitem()
Remove and return a (key, value) pair as a 2-tuple.
Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.
- setdefault(key, default=None, /)
Insert key with a value of default if key is not in the dictionary.
Return the value for key if key is in the dictionary, else default.
- update([E, ]**F) None. Update D from dict/iterable E and F.
If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]
- values() an object providing a view on D's values
- property database_id: str
Get database id from java gateway, a wrapper for
get_database_info().
- property database_type: str
Get database type from java gateway, a wrapper for
get_database_info().
- class pydolphinscheduler.core.Engine(name: str, task_type: str, main_class: str, main_package: str, program_type: ProgramType | None = 'SCALA', *args, **kwargs)[source]
Bases:
TaskTask engine object, declare behavior for engine task to dolphinscheduler.
This is the parent class of spark, flink and mr tasks, and is used to provide the programType, mainClass and mainJar task parameters for reuse.
- _get_attr() Set[str]
Get final task task_params attribute.
Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()andset_downstream().
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- get_jar_id() int[source]
Get jar id from java gateway, a wrapper for
get_resource_info().
- get_resource_info(program_type, main_package)[source]
Get resource info from java gateway, contains resource id, name.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _task_custom_attr: set = {}
- _task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
- _task_ignore_attr: set = {}
- property condition_result: Dict
Get attribute condition_result.
- property environment_code: str
Convert environment name to code.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property resource_list: List
Get task define attribute resource_list.
- property task_params: Dict
Override Task.task_params for engine children task.
children task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here.
- property user_name: str | None
Return user name of process definition.
- class pydolphinscheduler.core.ProcessDefinition(name: str, description: str | None = None, schedule: str | None = None, start_time: str | None = None, end_time: str | None = None, timezone: str | None = 'Asia/Shanghai', user: str | None = 'userPythonGateway', project: str | None = 'project-pydolphin', tenant: str | None = 'tenant_pydolphin', worker_group: str | None = 'default', warning_type: str | None = 'NONE', warning_group_id: int | None = 0, timeout: int | None = 0, release_state: str | None = 'online', param: Dict | None = None, resource_list: List[Resource] | None = None)[source]
Bases:
Baseprocess definition object, will define process definition attribute, task, relation.
TODO: maybe we should rename this class, currently use DS object name.
- Parameters:
user – The user for current process definition. Will create a new one if it do not exists. If your parameter
projectalready exists but project’s create do not belongs touser, will grantprojecttouserautomatically.project – The project for current process definition. You could see the workflow in this project thought Web UI after it
submit()orrun(). It will create a new project belongs touserif it does not exists. And whenprojectexists but project’s create do not belongs touser, will grant project touserautomatically.resource_list – Resource files required by the current process definition.You can create and modify resource files from this field. When the process definition is submitted, these resource files are also submitted along with it.
- _ensure_side_model_exists()[source]
Ensure process definition models model exists.
For now, models object including
pydolphinscheduler.models.project.Project,pydolphinscheduler.models.tenant.Tenant,pydolphinscheduler.models.user.User. If these model not exists, would create default value inpydolphinscheduler.constants.ProcessDefinitionDefault.
- _handle_root_relation()[source]
Handle root task property
pydolphinscheduler.core.task.TaskRelation.Root task in DAG do not have dominant upstream node, but we have to add an exactly default upstream task with task_code equal to 0. This is requests from java gateway interface.
- _pre_submit_check()[source]
Check specific condition satisfy before.
This method should be called before process definition submit to java gateway For now, we have below checker: * self.param or at least one local param of task should be set if task switch in this workflow.
- add_tasks(tasks: List[Task]) None[source]
Add task sequence to process definition, it a wrapper of
add_task().
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- get_one_task_by_name(name: str) Task[source]
Get exact one task from process definition by given name.
Function always return one task even though this process definition have more than one task with this name.
- get_tasks_by_name(name: str) Set[Task][source]
Get tasks object by given name, if will return all tasks with this name.
- run()[source]
Submit and Start ProcessDefinition instance.
Shortcut for function
submit()and functionstart(). Only support manual start workflow for now, and schedule run will coming soon. :return:
- start() None[source]
Create and start ProcessDefinition instance.
which post to start-process-instance to java gateway
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'_project', '_tenant', 'description', 'name', 'param', 'release_state', 'resource_list', 'task_definition_json', 'task_relation_json', 'tasks', 'timeout', 'warning_group_id', 'warning_type', 'worker_group'}
- _KEY_ATTR: set = {'name', 'param', 'project', 'release_state', 'tenant'}
- property end_time: Any
Get attribute end_time.
- property param_json: List[Dict] | None
Return param json base on self.param.
- property release_state: int
Get attribute release_state.
- property schedule_json: Dict | None
Get schedule parameter json object. This is requests from java gateway interface.
- property start_time: Any
Get attribute start_time.
- property task_definition_json: List[Dict]
Return all tasks definition in list of dict.
- property task_relation_json: List[Dict]
Return all relation between tasks pair in list of dict.
- class pydolphinscheduler.core.Task(name: str, task_type: str, description: str | None = None, flag: str | None = 'YES', task_priority: str | None = 'MEDIUM', worker_group: str | None = 'default', environment_name: str | None = None, delay_time: int | None = 0, fail_retry_times: int | None = 0, fail_retry_interval: int | None = 1, timeout_flag: int | None = 'CLOSE', timeout_notify_strategy: Optional = None, timeout: int | None = 0, process_definition: ProcessDefinition | None = None, local_params: List | None = None, resource_list: List | None = None, dependence: Dict | None = None, wait_start_timeout: Dict | None = None, condition_result: Dict | None = None)[source]
Bases:
BaseTask object, parent class for all exactly task type.
- _get_attr() Set[str][source]
Get final task task_params attribute.
Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None[source]
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()andset_downstream().
- gen_code_and_version() Tuple[source]
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- set_downstream(tasks: Task | Sequence[Task]) None[source]
Set parameter tasks as downstream to current task.
- set_upstream(tasks: Task | Sequence[Task]) None[source]
Set parameter tasks as upstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _task_custom_attr: set = {}
- _task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
- _task_ignore_attr: set = {}
- property condition_result: Dict
Get attribute condition_result.
- property environment_code: str
Convert environment name to code.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property resource_list: List
Get task define attribute resource_list.
- property task_params: Dict | None
Get task parameter object.
Will get result to combine _task_custom_attr and custom_attr.
- property user_name: str | None
Return user name of process definition.
Models
Init Models package, keeping object related to DolphinScheduler covert from Java Gateway Service.
- class pydolphinscheduler.models.Base(name: str, description: str | None = None)[source]
Bases:
objectDolphinScheduler Base object.
- get_define(camel_attr: bool = True) Dict[source]
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict[source]
Get object definition attribute by given attr set.
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {}
- _KEY_ATTR: set = {'description', 'name'}
- class pydolphinscheduler.models.BaseSide(name: str, description: str | None = None)[source]
Bases:
BaseBase class for models object, it declare base behavior for them.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {}
- _KEY_ATTR: set = {'description', 'name'}
- class pydolphinscheduler.models.Project(name: str = 'project-pydolphin', description: str | None = None)[source]
Bases:
BaseSideDolphinScheduler Project object.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {}
- _KEY_ATTR: set = {'description', 'name'}
- class pydolphinscheduler.models.Queue(name: str = 'queuePythonGateway', description: str | None = '')[source]
Bases:
BaseSideDolphinScheduler Queue object.
- classmethod create_if_not_exists(user='userPythonGateway')
Create Base if not exists.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {}
- _KEY_ATTR: set = {'description', 'name'}
- class pydolphinscheduler.models.Tenant(name: str = 'tenant_pydolphin', queue: str = 'queuePythonGateway', description: str | None = None)[source]
Bases:
BaseSideDolphinScheduler Tenant object.
- create_if_not_exists(queue_name: str, user='userPythonGateway') None[source]
Create Tenant if not exists.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {}
- _KEY_ATTR: set = {'description', 'name'}
- class pydolphinscheduler.models.User(name: str, password: str | None = 'userPythonGateway', email: str | None = 'userPythonGateway@dolphinscheduler.com', phone: str | None = '11111111111', tenant: str | None = 'tenant_pydolphin', queue: str | None = 'queuePythonGateway', status: int | None = 1)[source]
Bases:
BaseSideDolphinScheduler User object.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {}
- _KEY_ATTR: set = {'email', 'name', 'password', 'phone', 'queue', 'status', 'tenant'}
- class pydolphinscheduler.models.WorkerGroup(name: str, address: str, description: str | None = None)[source]
Bases:
BaseSideDolphinScheduler Worker Group object.
- classmethod create_if_not_exists(user='userPythonGateway')
Create Base if not exists.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {}
- _KEY_ATTR: set = {'description', 'name'}
Tasks
Init pydolphinscheduler.tasks package.
- class pydolphinscheduler.tasks.Condition(name: str, condition: ConditionOperator, success_task: Task, failed_task: Task, *args, **kwargs)[source]
Bases:
TaskTask condition object, declare behavior for condition task to dolphinscheduler.
- _get_attr() Set[str]
Get final task task_params attribute.
Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()andset_downstream().
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {}
- _task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
- _task_ignore_attr: set = {}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get condition result define for java gateway.
- property environment_code: str
Convert environment name to code.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property resource_list: List
Get task define attribute resource_list.
- property task_params: Dict
Override Task.task_params for Condition task.
Condition task have some specials attribute dependence, and in most of the task this attribute is None and use empty dict {} as default value. We do not use class attribute _task_custom_attr due to avoid attribute cover.
- property user_name: str | None
Return user name of process definition.
- class pydolphinscheduler.tasks.CustomDataX(name: str, json: str, xms: int | None = 1, xmx: int | None = 1, *args, **kwargs)[source]
Bases:
TaskTask CustomDatax object, declare behavior for custom DataX task to dolphinscheduler.
You provider json template for DataX, it can synchronize data according to the template you provided.
- _get_attr() Set[str]
Get final task task_params attribute.
Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()andset_downstream().
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- CUSTOM_CONFIG = 1
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'custom_config', 'json', 'xms', 'xmx'}
- _task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
- _task_ignore_attr: set = {}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- property environment_code: str
Convert environment name to code.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property resource_list: List
Get task define attribute resource_list.
- property task_params: Dict | None
Get task parameter object.
Will get result to combine _task_custom_attr and custom_attr.
- property user_name: str | None
Return user name of process definition.
- class pydolphinscheduler.tasks.DVCDownload(name: str, repository: str, data_path_in_dvc_repository: str, data_path_in_worker: str, version: str, *args, **kwargs)[source]
Bases:
BaseDVCTask DVC Download object, declare behavior for DVC Download task to dolphinscheduler.
- _get_attr() Set[str]
Get final task task_params attribute.
Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()andset_downstream().
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _child_task_dvc_attr = {'dvc_data_location', 'dvc_load_save_data_path', 'dvc_version'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'dvc_repository', 'dvc_task_type'}
- _task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
- _task_ignore_attr: set = {}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- dvc_task_type = 'Download'
- property environment_code: str
Convert environment name to code.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property resource_list: List
Get task define attribute resource_list.
- property task_params: Dict
Return task params.
- property user_name: str | None
Return user name of process definition.
- class pydolphinscheduler.tasks.DVCInit(name: str, repository: str, store_url: str, *args, **kwargs)[source]
Bases:
BaseDVCTask DVC Init object, declare behavior for DVC Init task to dolphinscheduler.
- _get_attr() Set[str]
Get final task task_params attribute.
Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()andset_downstream().
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _child_task_dvc_attr = {'dvc_store_url'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'dvc_repository', 'dvc_task_type'}
- _task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
- _task_ignore_attr: set = {}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- dvc_task_type = 'Init DVC'
- property environment_code: str
Convert environment name to code.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property resource_list: List
Get task define attribute resource_list.
- property task_params: Dict
Return task params.
- property user_name: str | None
Return user name of process definition.
- class pydolphinscheduler.tasks.DVCUpload(name: str, repository: str, data_path_in_worker: str, data_path_in_dvc_repository: str, version: str, message: str, *args, **kwargs)[source]
Bases:
BaseDVCTask DVC Upload object, declare behavior for DVC Upload task to dolphinscheduler.
- _get_attr() Set[str]
Get final task task_params attribute.
Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()andset_downstream().
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _child_task_dvc_attr = {'dvc_data_location', 'dvc_load_save_data_path', 'dvc_message', 'dvc_version'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'dvc_repository', 'dvc_task_type'}
- _task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
- _task_ignore_attr: set = {}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- dvc_task_type = 'Upload'
- property environment_code: str
Convert environment name to code.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property resource_list: List
Get task define attribute resource_list.
- property task_params: Dict
Return task params.
- property user_name: str | None
Return user name of process definition.
- class pydolphinscheduler.tasks.DataX(name: str, datasource_name: str, datatarget_name: str, sql: str, target_table: str, job_speed_byte: int | None = 0, job_speed_record: int | None = 1000, pre_statements: List[str] | None = None, post_statements: List[str] | None = None, xms: int | None = 1, xmx: int | None = 1, *args, **kwargs)[source]
Bases:
TaskTask DataX object, declare behavior for DataX task to dolphinscheduler.
It should run database datax job in multiply sql link engine, such as: - MySQL - Oracle - Postgresql - SQLServer You provider datasource_name and datatarget_name contain connection information, it decisions which database type and database instance would synchronous data.
- _get_attr() Set[str]
Get final task task_params attribute.
Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()andset_downstream().
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- CUSTOM_CONFIG = 0
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'custom_config', 'job_speed_byte', 'job_speed_record', 'post_statements', 'pre_statements', 'sql', 'target_table', 'xms', 'xmx'}
- _task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
- _task_ignore_attr: set = {}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- property environment_code: str
Convert environment name to code.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property resource_list: List
Get task define attribute resource_list.
- property task_params: Dict
Override Task.task_params for datax task.
datax task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here.
- property user_name: str | None
Return user name of process definition.
- class pydolphinscheduler.tasks.Dependent(name: str, dependence: DependentOperator, *args, **kwargs)[source]
Bases:
TaskTask dependent object, declare behavior for dependent task to dolphinscheduler.
- _get_attr() Set[str]
Get final task task_params attribute.
Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()andset_downstream().
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {}
- _task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
- _task_ignore_attr: set = {}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- property environment_code: str
Convert environment name to code.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property resource_list: List
Get task define attribute resource_list.
- property task_params: Dict
Override Task.task_params for dependent task.
Dependent task have some specials attribute dependence, and in most of the task this attribute is None and use empty dict {} as default value. We do not use class attribute _task_custom_attr due to avoid attribute cover.
- property user_name: str | None
Return user name of process definition.
- class pydolphinscheduler.tasks.Flink(name: str, main_class: str, main_package: str, program_type: ProgramType | None = 'SCALA', deploy_mode: DeployMode | None = 'cluster', flink_version: FlinkVersion | None = '<1.10', app_name: str | None = None, job_manager_memory: str | None = '1G', task_manager_memory: str | None = '2G', slot: int | None = 1, task_manager: int | None = 2, parallelism: int | None = 1, main_args: str | None = None, others: str | None = None, *args, **kwargs)[source]
Bases:
EngineTask flink object, declare behavior for flink task to dolphinscheduler.
- _get_attr() Set[str]
Get final task task_params attribute.
Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()andset_downstream().
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- get_jar_id() int
Get jar id from java gateway, a wrapper for
get_resource_info().
- get_resource_info(program_type, main_package)
Get resource info from java gateway, contains resource id, name.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'app_name', 'deploy_mode', 'flink_version', 'job_manager_memory', 'main_args', 'others', 'parallelism', 'slot', 'task_manager', 'task_manager_memory'}
- _task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
- _task_ignore_attr: set = {}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- property environment_code: str
Convert environment name to code.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property resource_list: List
Get task define attribute resource_list.
- property task_params: Dict
Override Task.task_params for engine children task.
children task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here.
- property user_name: str | None
Return user name of process definition.
- class pydolphinscheduler.tasks.Http(name: str, url: str, http_method: str | None = 'GET', http_params: str | None = None, http_check_condition: str | None = 'STATUS_CODE_DEFAULT', condition: str | None = None, connect_timeout: int | None = 60000, socket_timeout: int | None = 60000, *args, **kwargs)[source]
Bases:
TaskTask HTTP object, declare behavior for HTTP task to dolphinscheduler.
- _get_attr() Set[str]
Get final task task_params attribute.
Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()andset_downstream().
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'condition', 'connect_timeout', 'http_check_condition', 'http_method', 'http_params', 'socket_timeout', 'url'}
- _task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
- _task_ignore_attr: set = {}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- property environment_code: str
Convert environment name to code.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property resource_list: List
Get task define attribute resource_list.
- property task_params: Dict | None
Get task parameter object.
Will get result to combine _task_custom_attr and custom_attr.
- property user_name: str | None
Return user name of process definition.
- class pydolphinscheduler.tasks.MLFlowProjectsAutoML(name: str, data_path: str, automl_tool: str | None = 'flaml', mlflow_tracking_uri: str | None = 'http://127.0.0.1:5000', experiment_name: str | None = '', model_name: str | None = '', parameters: str | None = '', *args, **kwargs)[source]
Bases:
BaseMLflowTask MLflow projects object, declare behavior for AutoML task to dolphinscheduler.
- Parameters:
name – task name
data_path – data path of MLflow Project, Support git address and directory on worker.
automl_tool – The AutoML tool used, currently supports autosklearn and flaml.
mlflow_tracking_uri – MLflow tracking server uri, default is http://127.0.0.1:5000
experiment_name – MLflow experiment name, default is empty
model_name – MLflow model name, default is empty
parameters – MLflow project parameters, default is empty
- _get_attr() Set[str]
Get final task task_params attribute.
Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()andset_downstream().
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _child_task_mlflow_attr = {'automl_tool', 'data_path', 'experiment_name', 'mlflow_job_type', 'model_name', 'params', 'register_model'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'mlflow_task_type', 'mlflow_tracking_uri'}
- _task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
- _task_ignore_attr: set = {}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- property environment_code: str
Convert environment name to code.
- mlflow_job_type = 'AutoML'
- mlflow_task_type = 'MLflow Projects'
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property resource_list: List
Get task define attribute resource_list.
- property task_params: Dict
Return task params.
- property user_name: str | None
Return user name of process definition.
- class pydolphinscheduler.tasks.MLFlowProjectsBasicAlgorithm(name: str, data_path: str, algorithm: str | None = 'lightgbm', mlflow_tracking_uri: str | None = 'http://127.0.0.1:5000', experiment_name: str | None = '', model_name: str | None = '', parameters: str | None = '', search_params: str | None = '', *args, **kwargs)[source]
Bases:
BaseMLflowTask MLflow projects object, declare behavior for BasicAlgorithm task to dolphinscheduler.
- Parameters:
name – task name
data_path – data path of MLflow Project, Support git address and directory on worker.
algorithm – The selected algorithm currently supports LR, SVM, LightGBM and XGboost based on scikit-learn form.
mlflow_tracking_uri – MLflow tracking server uri, default is http://127.0.0.1:5000
experiment_name – MLflow experiment name, default is empty
model_name – MLflow model name, default is empty
parameters – MLflow project parameters, default is empty
search_params – Whether to search the parameters, default is empty
- _get_attr() Set[str]
Get final task task_params attribute.
Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()andset_downstream().
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _child_task_mlflow_attr = {'algorithm', 'data_path', 'experiment_name', 'mlflow_job_type', 'model_name', 'params', 'register_model', 'search_params'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'mlflow_task_type', 'mlflow_tracking_uri'}
- _task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
- _task_ignore_attr: set = {}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- property environment_code: str
Convert environment name to code.
- mlflow_job_type = 'BasicAlgorithm'
- mlflow_task_type = 'MLflow Projects'
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property resource_list: List
Get task define attribute resource_list.
- property task_params: Dict
Return task params.
- property user_name: str | None
Return user name of process definition.
- class pydolphinscheduler.tasks.MLFlowProjectsCustom(name: str, repository: str, mlflow_tracking_uri: str | None = 'http://127.0.0.1:5000', experiment_name: str | None = '', parameters: str | None = '', version: str | None = 'master', *args, **kwargs)[source]
Bases:
BaseMLflowTask MLflow projects object, declare behavior for MLflow Custom projects task to dolphinscheduler.
- Parameters:
name – task name
repository – Repository url of MLflow Project, Support git address and directory on worker. If it’s in a subdirectory, We add # to support this (same as mlflow run) , for example https://github.com/mlflow/mlflow#examples/xgboost/xgboost_native.
mlflow_tracking_uri – MLflow tracking server uri, default is http://127.0.0.1:5000
experiment_name – MLflow experiment name, default is empty
parameters – MLflow project parameters, default is empty
version – MLflow project version, default is master
- _get_attr() Set[str]
Get final task task_params attribute.
Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()andset_downstream().
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _child_task_mlflow_attr = {'experiment_name', 'mlflow_job_type', 'mlflow_project_repository', 'mlflow_project_version', 'params'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'mlflow_task_type', 'mlflow_tracking_uri'}
- _task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
- _task_ignore_attr: set = {}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- property environment_code: str
Convert environment name to code.
- mlflow_job_type = 'CustomProject'
- mlflow_task_type = 'MLflow Projects'
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property resource_list: List
Get task define attribute resource_list.
- property task_params: Dict
Return task params.
- property user_name: str | None
Return user name of process definition.
- class pydolphinscheduler.tasks.MLflowModels(name: str, model_uri: str, mlflow_tracking_uri: str | None = 'http://127.0.0.1:5000', deploy_mode: str | None = 'DOCKER', port: int | None = 7000, *args, **kwargs)[source]
Bases:
BaseMLflowTask MLflow models object, declare behavior for MLflow models task to dolphinscheduler.
Deploy machine learning models in diverse serving environments.
- Parameters:
name – task name
model_uri – Model-URI of MLflow , support models:/<model_name>/suffix format and runs:/ format. See https://mlflow.org/docs/latest/tracking.html#artifact-stores
mlflow_tracking_uri – MLflow tracking server uri, default is http://127.0.0.1:5000
deploy_mode – MLflow deploy mode, support MLFLOW, DOCKER, default is DOCKER
port – deploy port, default is 7000
- _get_attr() Set[str]
Get final task task_params attribute.
Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()andset_downstream().
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _child_task_mlflow_attr = {'deploy_model_key', 'deploy_port', 'deploy_type'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'mlflow_task_type', 'mlflow_tracking_uri'}
- _task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
- _task_ignore_attr: set = {}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- property environment_code: str
Convert environment name to code.
- mlflow_task_type = 'MLflow Models'
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property resource_list: List
Get task define attribute resource_list.
- property task_params: Dict
Return task params.
- property user_name: str | None
Return user name of process definition.
- class pydolphinscheduler.tasks.MR(name: str, main_class: str, main_package: str, program_type: ProgramType | None = 'SCALA', app_name: str | None = None, main_args: str | None = None, others: str | None = None, *args, **kwargs)[source]
Bases:
EngineTask mr object, declare behavior for mr task to dolphinscheduler.
- _get_attr() Set[str]
Get final task task_params attribute.
Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()andset_downstream().
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- get_jar_id() int
Get jar id from java gateway, a wrapper for
get_resource_info().
- get_resource_info(program_type, main_package)
Get resource info from java gateway, contains resource id, name.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'app_name', 'main_args', 'others'}
- _task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
- _task_ignore_attr: set = {}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- property environment_code: str
Convert environment name to code.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property resource_list: List
Get task define attribute resource_list.
- property task_params: Dict
Override Task.task_params for engine children task.
children task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here.
- property user_name: str | None
Return user name of process definition.
- class pydolphinscheduler.tasks.OpenMLDB(name, zookeeper, zookeeper_path, execute_mode, sql, *args, **kwargs)[source]
Bases:
TaskTask OpenMLDB object, declare behavior for OpenMLDB task to dolphinscheduler.
- Parameters:
name – task name
zookeeper – OpenMLDB cluster zookeeper address, e.g. 127.0.0.1:2181.
zookeeper_path – OpenMLDB cluster zookeeper path, e.g. /openmldb.
execute_mode – Determine the init mode, offline or online. You can switch it in sql statementself.
sql – SQL statement.
- _get_attr() Set[str]
Get final task task_params attribute.
Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()andset_downstream().
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'execute_mode', 'sql', 'zk', 'zk_path'}
- _task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
- _task_ignore_attr: set = {}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- property environment_code: str
Convert environment name to code.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property resource_list: List
Get task define attribute resource_list.
- property task_params: Dict | None
Get task parameter object.
Will get result to combine _task_custom_attr and custom_attr.
- property user_name: str | None
Return user name of process definition.
- class pydolphinscheduler.tasks.Procedure(name: str, datasource_name: str, method: str, *args, **kwargs)[source]
Bases:
TaskTask Procedure object, declare behavior for Procedure task to dolphinscheduler.
It should run database procedure job in multiply sql lik engine, such as: - ClickHouse - DB2 - HIVE - MySQL - Oracle - Postgresql - Presto - SQLServer You provider datasource_name contain connection information, it decisions which database type and database instance would run this sql.
- _get_attr() Set[str]
Get final task task_params attribute.
Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()andset_downstream().
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'method'}
- _task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
- _task_ignore_attr: set = {}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- property environment_code: str
Convert environment name to code.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property resource_list: List
Get task define attribute resource_list.
- property task_params: Dict
Override Task.task_params for produce task.
produce task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here.
- property user_name: str | None
Return user name of process definition.
- class pydolphinscheduler.tasks.Python(name: str, definition: str | LambdaType, *args, **kwargs)[source]
Bases:
TaskTask Python object, declare behavior for Python task to dolphinscheduler.
Python task support two types of parameters for :param:
code, and here is an example:Using str type of :param:
codepython_task = Python(name="str_type", code="print('Hello Python task.')")
Or using Python callable type of :param:
codedef foo(): print("Hello Python task.") python_task = Python(name="str_type", code=foo)
- Parameters:
name – The name for Python task. It define the task name.
definition – String format of Python script you want to execute or Python callable you want to execute.
- _build_exe_str() str[source]
Build executable string from given definition.
Attribute
self.definitionalmost is a function, we need to call this function after parsing it to string. The easier way to call a function is using syntaxfunc()and we use it to call it too.
- _get_attr() Set[str]
Get final task task_params attribute.
Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()andset_downstream().
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'raw_script'}
- _task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
- _task_ignore_attr: set = {}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- property environment_code: str
Convert environment name to code.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property raw_script: str
Get python task define attribute raw_script.
- property resource_list: List
Get task define attribute resource_list.
- property task_params: Dict | None
Get task parameter object.
Will get result to combine _task_custom_attr and custom_attr.
- property user_name: str | None
Return user name of process definition.
- class pydolphinscheduler.tasks.Pytorch(name: str, script: str, script_params: str = '', project_path: str | None = '.', is_create_environment: bool | None = False, python_command: str | None = '${PYTHON_HOME}', python_env_tool: str | None = 'conda', requirements: str | None = 'requirements.txt', conda_python_version: str | None = '3.7', *args, **kwargs)[source]
Bases:
TaskTask Pytorch object, declare behavior for Pytorch task to dolphinscheduler.
See also: DolphinScheduler Pytorch Task Plugin
- Parameters:
name – task name
script – Entry to the Python script file that you want to run.
script_params – Input parameters at run time.
project_path – The path to the project. Default “.” .
is_create_environment – is create environment. Default False.
python_command – The path to the python command. Default “${PYTHON_HOME}”.
python_env_tool – The python environment tool. Default “conda”.
requirements – The path to the requirements.txt file. Default “requirements.txt”.
conda_python_version – The python version of conda environment. Default “3.7”.
- _get_attr() Set[str]
Get final task task_params attribute.
Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()andset_downstream().
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'conda_python_version', 'is_create_environment', 'other_params', 'python_command', 'python_env_tool', 'python_path', 'requirements', 'script', 'script_params'}
- _task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
- _task_ignore_attr: set = {}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- property environment_code: str
Convert environment name to code.
- property other_params
Return other params.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property resource_list: List
Get task define attribute resource_list.
- property task_params: Dict | None
Get task parameter object.
Will get result to combine _task_custom_attr and custom_attr.
- property user_name: str | None
Return user name of process definition.
- class pydolphinscheduler.tasks.SageMaker(name: str, sagemaker_request_json: str, *args, **kwargs)[source]
Bases:
TaskTask SageMaker object, declare behavior for SageMaker task to dolphinscheduler.
- Parameters:
name – A unique, meaningful string for the SageMaker task.
sagemaker_request_json – Request parameters of StartPipelineExecution, see also AWS API
- _get_attr() Set[str]
Get final task task_params attribute.
Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()andset_downstream().
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'sagemaker_request_json'}
- _task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
- _task_ignore_attr: set = {}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- property environment_code: str
Convert environment name to code.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property resource_list: List
Get task define attribute resource_list.
- property task_params: Dict | None
Get task parameter object.
Will get result to combine _task_custom_attr and custom_attr.
- property user_name: str | None
Return user name of process definition.
- class pydolphinscheduler.tasks.Shell(name: str, command: str, *args, **kwargs)[source]
Bases:
TaskTask shell object, declare behavior for shell task to dolphinscheduler.
- Parameters:
name – A unique, meaningful string for the shell task.
command –
One or more command want to run in this task.
It could be simply command:
Shell(name=..., command="echo task shell")
or maybe same commands trying to do complex task:
command = '''echo task shell step 1; echo task shell step 2; echo task shell step 3 ''' Shell(name=..., command=command)
- _get_attr() Set[str]
Get final task task_params attribute.
Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()andset_downstream().
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'raw_script'}
- _task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
- _task_ignore_attr: set = {}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- property environment_code: str
Convert environment name to code.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property resource_list: List
Get task define attribute resource_list.
- property task_params: Dict | None
Get task parameter object.
Will get result to combine _task_custom_attr and custom_attr.
- property user_name: str | None
Return user name of process definition.
- class pydolphinscheduler.tasks.Spark(name: str, main_class: str, main_package: str, program_type: ProgramType | None = 'SCALA', deploy_mode: DeployMode | None = 'cluster', spark_version: SparkVersion | None = 'SPARK2', app_name: str | None = None, driver_cores: int | None = 1, driver_memory: str | None = '512M', num_executors: int | None = 2, executor_memory: str | None = '2G', executor_cores: int | None = 2, main_args: str | None = None, others: str | None = None, *args, **kwargs)[source]
Bases:
EngineTask spark object, declare behavior for spark task to dolphinscheduler.
- _get_attr() Set[str]
Get final task task_params attribute.
Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()andset_downstream().
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- get_jar_id() int
Get jar id from java gateway, a wrapper for
get_resource_info().
- get_resource_info(program_type, main_package)
Get resource info from java gateway, contains resource id, name.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'app_name', 'deploy_mode', 'driver_cores', 'driver_memory', 'executor_cores', 'executor_memory', 'main_args', 'num_executors', 'others', 'spark_version'}
- _task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
- _task_ignore_attr: set = {}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- property environment_code: str
Convert environment name to code.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property resource_list: List
Get task define attribute resource_list.
- property task_params: Dict
Override Task.task_params for engine children task.
children task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here.
- property user_name: str | None
Return user name of process definition.
- class pydolphinscheduler.tasks.Sql(name: str, datasource_name: str, sql: str, sql_type: str | None = None, pre_statements: str | None = None, post_statements: str | None = None, display_rows: int | None = 10, *args, **kwargs)[source]
Bases:
TaskTask SQL object, declare behavior for SQL task to dolphinscheduler.
It should run sql job in multiply sql lik engine, such as: - ClickHouse - DB2 - HIVE - MySQL - Oracle - Postgresql - Presto - SQLServer You provider datasource_name contain connection information, it decisions which database type and database instance would run this sql.
- _get_attr() Set[str]
Get final task task_params attribute.
Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()andset_downstream().
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'display_rows', 'post_statements', 'pre_statements', 'sql', 'sql_type'}
- _task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
- _task_ignore_attr: set = {}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- property environment_code: str
Convert environment name to code.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property resource_list: List
Get task define attribute resource_list.
- property sql_type: str
Judgement sql type, it will return the SQL type for type SELECT or NOT_SELECT.
If param_sql_type dot not specific, will use regexp to check which type of the SQL is. But if param_sql_type is specific will use the parameter overwrites the regexp way
- property task_params: Dict
Override Task.task_params for sql task.
sql task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here.
- property user_name: str | None
Return user name of process definition.
- class pydolphinscheduler.tasks.SubProcess(name: str, process_definition_name: str, *args, **kwargs)[source]
Bases:
TaskTask SubProcess object, declare behavior for SubProcess task to dolphinscheduler.
- _get_attr() Set[str]
Get final task task_params attribute.
Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()andset_downstream().
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- get_process_definition_info(process_definition_name: str) Dict[source]
Get process definition info from java gateway, contains process definition id, name, code.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {'process_definition_code'}
- _task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
- _task_ignore_attr: set = {}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- property environment_code: str
Convert environment name to code.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property process_definition_code: str
Get process definition code, a wrapper for
get_process_definition_info().
- property resource_list: List
Get task define attribute resource_list.
- property task_params: Dict | None
Get task parameter object.
Will get result to combine _task_custom_attr and custom_attr.
- property user_name: str | None
Return user name of process definition.
- class pydolphinscheduler.tasks.Switch(name: str, condition: SwitchCondition, *args, **kwargs)[source]
Bases:
TaskTask switch object, declare behavior for switch task to dolphinscheduler.
Param of process definition or at least one local param of task must be set if task switch in this workflow.
- _get_attr() Set[str]
Get final task task_params attribute.
Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.
- _set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None
Set parameter tasks dependent to current task.
it is a wrapper for
set_upstream()andset_downstream().
- gen_code_and_version() Tuple
Generate task code and version from java gateway.
If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.
- get_define(camel_attr: bool = True) Dict
Get object definition attribute communicate to Java gateway server.
use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.
- get_define_custom(camel_attr: bool = True, custom_attr: set = None) Dict
Get object definition attribute by given attr set.
- set_downstream(tasks: Task | Sequence[Task]) None
Set parameter tasks as downstream to current task.
- DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
- _DEFAULT_ATTR: Dict = {}
- _DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'name', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
- _KEY_ATTR: set = {'description', 'name'}
- _downstream_task_codes: Set[int]
- _task_custom_attr: set = {}
- _task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
- _task_ignore_attr: set = {'condition_result', 'dependence'}
- _task_relation: Set[TaskRelation]
- _upstream_task_codes: Set[int]
- property condition_result: Dict
Get attribute condition_result.
- property environment_code: str
Convert environment name to code.
- property process_definition: ProcessDefinition | None
Get attribute process_definition.
- property resource_list: List
Get task define attribute resource_list.
- property task_params: Dict
Override Task.task_params for switch task.
switch task have some specials attribute switch, and in most of the task this attribute is None and use empty dict {} as default value. We do not use class attribute _task_custom_attr due to avoid attribute cover.
- property user_name: str | None
Return user name of process definition.
Constants
Constants for pydolphinscheduler.
- class pydolphinscheduler.constants.DefaultTaskCodeNum[source]
Bases:
strConstants and default value for default task code number.
- DEFAULT = 1
- class pydolphinscheduler.constants.Delimiter[source]
Bases:
strConstants for delimiter.
- BAR = '-'
- COLON = ':'
- DASH = '/'
- DIRECTION = '->'
- UNDERSCORE = '_'
- class pydolphinscheduler.constants.JavaGatewayDefault[source]
Bases:
strConstants and default value for java gateway.
- RESULT_DATA = 'data'
- RESULT_MESSAGE_KEYWORD = 'msg'
- RESULT_MESSAGE_SUCCESS = 'success'
- RESULT_STATUS_KEYWORD = 'status'
- RESULT_STATUS_SUCCESS = 'SUCCESS'
- class pydolphinscheduler.constants.ResourceKey[source]
Bases:
strConstants for key of resource.
- ID = 'id'
- class pydolphinscheduler.constants.TaskFlag[source]
Bases:
strConstants for task flag.
- NO = 'NO'
- YES = 'YES'
- class pydolphinscheduler.constants.TaskPriority[source]
Bases:
strConstants for task priority.
- HIGH = 'HIGH'
- HIGHEST = 'HIGHEST'
- LOW = 'LOW'
- LOWEST = 'LOWEST'
- MEDIUM = 'MEDIUM'
- class pydolphinscheduler.constants.TaskTimeoutFlag[source]
Bases:
strConstants for task timeout flag.
- CLOSE = 'CLOSE'
- class pydolphinscheduler.constants.TaskType[source]
Bases:
strConstants for task type, it will also show you which kind we support up to now.
- CONDITIONS = 'CONDITIONS'
- DATAX = 'DATAX'
- DEPENDENT = 'DEPENDENT'
- DVC = 'DVC'
- FLINK = 'FLINK'
- HTTP = 'HTTP'
- MLFLOW = 'MLFLOW'
- MR = 'MR'
- OPENMLDB = 'OPENMLDB'
- PROCEDURE = 'PROCEDURE'
- PYTHON = 'PYTHON'
- PYTORCH = 'PYTORCH'
- SAGEMAKER = 'SAGEMAKER'
- SHELL = 'SHELL'
- SPARK = 'SPARK'
- SQL = 'SQL'
- SUB_PROCESS = 'SUB_PROCESS'
- SWITCH = 'SWITCH'
Exceptions
Exceptions for pydolphinscheduler.
- exception pydolphinscheduler.exceptions.PyDSBaseException[source]
Bases:
ExceptionBase exception for pydolphinscheduler.
- exception pydolphinscheduler.exceptions.PyDSConfException[source]
Bases:
PyDSBaseExceptionException for pydolphinscheduler configuration error.
- exception pydolphinscheduler.exceptions.PyDSJavaGatewayException[source]
Bases:
PyDSBaseExceptionException for pydolphinscheduler Java gateway error.
- exception pydolphinscheduler.exceptions.PyDSParamException[source]
Bases:
PyDSBaseExceptionException for pydolphinscheduler parameter verify error.
- exception pydolphinscheduler.exceptions.PyDSProcessDefinitionNotAssignException[source]
Bases:
PyDSBaseExceptionException for pydolphinscheduler process definition not assign error.
- exception pydolphinscheduler.exceptions.PyDSTaskNoFoundException[source]
Bases:
PyDSBaseExceptionException for pydolphinscheduler workflow task no found error.