API

Core

Init pydolphinscheduler.core package.

class pydolphinscheduler.core.Engine(name: str, task_type: str, main_class: str, main_package: str, program_type: ProgramType | None = 'SCALA', *args, **kwargs)[source]

Bases: BatchTask

Task engine object, declare behavior for engine task to dolphinscheduler.

This is the parent class of spark, flink and mr tasks, and is used to provide the programType, mainClass and mainJar task parameters for reuse.

_get_attr() set[str]

Get final task task_params attribute.

Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

add_in(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add input parameters.

Parameters:
  • name – name of the input parameter.

  • value – value of the input parameter.

It could be simply command:

task.add_in("a")
task.add_in("b", 123)
task.add_in("c", bool)
task.add_in("d", ParameterType.LONG(123))
add_out(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add output parameters.

Parameters:
  • name – name of the output parameter.

  • value – value of the output parameter.

It could be simply command:

task.add_out("a")
task.add_out("b", 123)
task.add_out("c", bool)
task.add_out("d", ParameterType.LONG(123))
gen_code_and_version() tuple

Generate task code and version from java gateway.

If task name do not exists in workflow before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_content()

Get the file content according to the resource plugin.

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

get_jar_id() int[source]

Get jar id from java gateway, a wrapper for get_resource_info().

get_plugin()

Return the resource plug-in.

according to parameter resource_plugin and parameter workflow.resource_plugin.

get_resource_info(program_type, main_package)[source]

Get resource info from java gateway, contains resource id, name.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'is_cache', 'name', 'task_execute_type', 'task_group_id', 'task_group_priority', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_task_custom_attr: set = {}
_task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
_task_ignore_attr: set = {}
property condition_result: dict

Get attribute condition_result.

property environment_code: str

Convert environment name to code.

ext: set = None
ext_attr: str | types.FunctionType = None
property is_cache: str

Whether the cache is being set or not.

property local_params

Convert local params.

property resource_list: list[dict[str, Resource]]

Get task define attribute resource_list.

property task_params: dict

Override Task.task_params for engine children task.

children task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here.

property timeout: int

Get attribute timeout.

property timeout_flag: str

Whether the timeout attribute is being set or not.

property user_name: str | None

Return username of workflow.

property workflow: Workflow | None

Get attribute workflow.

class pydolphinscheduler.core.Task(name: str, task_type: str, description: str | None = None, flag: str | None = 'YES', task_priority: str | None = 'MEDIUM', worker_group: str | None = 'default', environment_name: str | None = None, task_group_id: int | None = 0, task_group_priority: int | None = 0, delay_time: int | None = 0, fail_retry_times: int | None = 0, fail_retry_interval: int | None = 1, timeout_notify_strategy: str | None = None, timeout: timedelta | int | None = None, workflow: Workflow | None = None, resource_list: list | None = None, dependence: dict | None = None, wait_start_timeout: dict | None = None, condition_result: dict | None = None, resource_plugin: ResourcePlugin | None = None, is_cache: bool | None = False, input_params: dict | None = None, output_params: dict | None = None, *args, **kwargs)[source]

Bases: Base

Task object, parent class for all exactly task type.

Parameters:
  • name – The name of the task. Node names within the same workflow must be unique.

  • task_type

  • description – default None

  • flag – default TaskFlag.YES,

  • task_priority – default TaskPriority.MEDIUM

  • worker_group – default configuration.WORKFLOW_WORKER_GROUP

  • environment_name – default None

  • task_group_id – Identify of task group to restrict the parallelism of tasks instance run, default 0.

  • task_group_priority – Priority for same task group to, the higher the value, the higher the priority, default 0.

  • delay_time – deault 0

  • fail_retry_times – default 0

  • fail_retry_interval – default 1

  • timeout_notify_strategy – default, None

  • timeout – Timeout attribute for task, in minutes. Task is consider as timed out task when the running time of a task exceeds than this value. when data type is datetime.timedelta will be converted to int(in minutes). default None

  • resource_list – default None

  • wait_start_timeout – default None

  • condition_result – default None,

  • resource_plugin – default None

  • is_cache – default False

  • input_params – default None, input parameters, {param_name: param_value}

  • output_params – default None, input parameters, {param_name: param_value}

_get_attr() set[str][source]

Get final task task_params attribute.

Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None[source]

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

add_in(name: str, value: int | str | float | bool | BaseDataType | None = None)[source]

Add input parameters.

Parameters:
  • name – name of the input parameter.

  • value – value of the input parameter.

It could be simply command:

task.add_in("a")
task.add_in("b", 123)
task.add_in("c", bool)
task.add_in("d", ParameterType.LONG(123))
add_out(name: str, value: int | str | float | bool | BaseDataType | None = None)[source]

Add output parameters.

Parameters:
  • name – name of the output parameter.

  • value – value of the output parameter.

It could be simply command:

task.add_out("a")
task.add_out("b", 123)
task.add_out("c", bool)
task.add_out("d", ParameterType.LONG(123))
gen_code_and_version() tuple[source]

Generate task code and version from java gateway.

If task name do not exists in workflow before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_content()[source]

Get the file content according to the resource plugin.

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

get_plugin()[source]

Return the resource plug-in.

according to parameter resource_plugin and parameter workflow.resource_plugin.

set_downstream(tasks: Task | Sequence[Task]) None[source]

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None[source]

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'is_cache', 'name', 'task_group_id', 'task_group_priority', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_task_custom_attr: set = {}
_task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
_task_ignore_attr: set = {}
property condition_result: dict

Get attribute condition_result.

property environment_code: str

Convert environment name to code.

ext: set = None
ext_attr: str | LambdaType = None
property is_cache: str

Whether the cache is being set or not.

property local_params

Convert local params.

property resource_list: list[dict[str, Resource]]

Get task define attribute resource_list.

property task_params: dict | None

Get task parameter object.

Will get result to combine _task_custom_attr and custom_attr.

property timeout: int

Get attribute timeout.

property timeout_flag: str

Whether the timeout attribute is being set or not.

property user_name: str | None

Return username of workflow.

property workflow: Workflow | None

Get attribute workflow.

class pydolphinscheduler.core.Workflow(name: str, description: str | None = None, schedule: str | None = None, online_schedule: bool | None = None, start_time: str | datetime | None = None, end_time: str | datetime | None = None, timezone: str | None = 'Asia/Shanghai', user: str | None = 'userPythonGateway', project: str | None = 'project-pydolphin', worker_group: str | None = 'default', warning_type: str | None = 'NONE', warning_group_id: int | None = 0, execution_type: str | None = 'parallel', timeout: timedelta | int | None = 0, release_state: str | None = 'online', param: dict | None = None, resource_plugin: ResourcePlugin | None = None, resource_list: list[Resource] | None = None, *args, **kwargs)[source]

Bases: Base

Workflow object, will define workflow attribute, task, relation.

TODO: maybe we should rename this class, currently use DS object name.

Parameters:
  • online_schedule – Whether the online workflow is schedule. It will be automatically configured according to :param:schedule configuration. If the :param:schedule is assigned with valid value, :param:online_schedule will be set to True. But you can also manually specify :param:online_schedule. For example if you only want to set the workflow :param:schedule but do not want to online the workflow schedule, you can set :param:online_schedule to False.

  • execution_type

    Decision which behavior to run when workflow have multiple instances. when workflow schedule interval is too short, it may cause multiple instances run at the same time. We can use this parameter to control the behavior about how to run those workflows instances. Currently we have four execution type:

    • PARALLEL: Default value, all instances will allow to run even though the previous instance is not finished.

    • SERIAL_WAIT: All instance will wait for the previous instance to finish, and all the waiting instances will be executed base on scheduling order.

    • SERIAL_DISCARD: All instances will be discard(abandon) if the previous instance is not finished.

    • SERIAL_PRIORITY: means the all instance will wait for the previous instance to finish, and all the waiting instances will be executed base on workflow priority order.

  • timeout – Timeout attribute for task, in minutes. Task is consider as timed out task when the running time of a task exceeds than this value. when data type is datetime.timedelta will be converted to int(in minutes). default 0

  • user – The user for current workflow. Will create a new one if it do not exists. If your parameter project already exists but project’s create do not belongs to user, will grant project to user automatically.

  • project – The project for current workflow. You could see the workflow in this project thought Web UI after it submit() or run(). It will create a new project belongs to user if it does not exists. And when project exists but project’s create do not belongs to user, will grant project to user automatically.

  • resource_list – Resource files required by the current workflow.You can create and modify resource files from this field. When the workflow is submitted, these resource files are also submitted along with it.

_ensure_side_model_exists()[source]

Ensure workflow models model exists.

For now, models object including pydolphinscheduler.models.project.Project, pydolphinscheduler.models.tenant.Tenant, pydolphinscheduler.models.user.User. If these model not exists, would create default value according to pydolphinscheduler.configuration.

_handle_root_relation()[source]

Handle root task property pydolphinscheduler.core.task.TaskRelation.

Root task in DAG do not have dominant upstream node, but we have to add an exactly default upstream task with task_code equal to 0. This is requests from java gateway interface.

static _parse_datetime(val: Any) Any[source]
_pre_submit_check()[source]

Check specific condition satisfy before.

This method should be called before workflow submit to java gateway For now, we have below checker: * self.param or at least one local param of task should be set if task switch in this workflow.

add_task(task: Task) None[source]

Add a single task to workflow.

add_tasks(tasks: list[Task]) None[source]

Add task sequence to workflow, it a wrapper of add_task().

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

get_one_task_by_name(name: str) Task[source]

Get exact one task from workflow by given name.

Function always return one task even though this workflow have more than one task with this name.

get_task(code: str) Task[source]

Get task object from workflow by given code.

get_tasks_by_name(name: str) set[Task][source]

Get tasks object by given name, if will return all tasks with this name.

run()[source]

Submit and Start Workflow instance.

Shortcut for function submit() and function start(). Only support manual start workflow for now, and schedule run will coming soon. :return:

start() None[source]

Create and start Workflow instance.

which post to start-process-instance to java gateway

submit() int[source]

Submit Workflow instance to java gateway.

_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {'_project', 'description', 'execution_type', 'name', 'param', 'release_state', 'resource_list', 'task_definition_json', 'task_relation_json', 'tasks', 'timeout', 'warning_group_id', 'warning_type', 'worker_group'}
_EXPECT_SCHEDULE_CHAR_NUM = 7
_KEY_ATTR: set = {'name', 'param', 'project', 'release_state'}
property end_time: Any

Get attribute end_time.

property execution_type: str

Get attribute execution_type.

property param_json: list[dict] | None

Return param json base on self.param.

property project: Project

Get attribute project.

property release_state: int

Get attribute release_state.

property schedule_json: dict | None

Get schedule parameter json object. This is requests from java gateway interface.

property start_time: Any

Get attribute start_time.

property task_definition_json: list[dict]

Return all tasks definition in list of dict.

property task_list: list[Task]

Return list of tasks objects.

property task_relation_json: list[dict]

Return all relation between tasks pair in list of dict.

property timeout: int

Get attribute timeout.

property user: User

Get user object.

For now we just get from python models but not from java gateway models, so it may not correct.

Models

Init Models package, keeping object related to DolphinScheduler covert from Java Gateway Service.

class pydolphinscheduler.models.Base(name: str, description: str | None = None)[source]

Bases: object

DolphinScheduler Base object.

get_define(camel_attr: bool = True) dict[source]

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict[source]

Get object definition attribute by given attr set.

_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {}
_KEY_ATTR: set = {'description', 'name'}
class pydolphinscheduler.models.BaseSide(name: str, description: str | None = None)[source]

Bases: Base

Base class for models object, it declare base behavior for them.

classmethod create_if_not_exists(user='userPythonGateway')[source]

Create Base if not exists.

delete_all()[source]

Delete all method.

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {}
_KEY_ATTR: set = {'description', 'name'}
class pydolphinscheduler.models.Project(name: str = 'project-pydolphin', description: str | None = None, code: int | None = None)[source]

Bases: BaseSide

DolphinScheduler Project object.

create_if_not_exists(user='userPythonGateway') None[source]

Create Project if not exists.

delete(user='userPythonGateway') None[source]

Delete Project.

delete_all()

Delete all method.

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

classmethod get_project_by_name(user='userPythonGateway', name=None) Project[source]

Get Project by name.

update(user='userPythonGateway', project_code=None, project_name=None, description=None) None[source]

Update Project.

_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {}
_KEY_ATTR: set = {'description', 'name'}
class pydolphinscheduler.models.Queue(name: str = 'queuePythonGateway', description: str | None = '')[source]

Bases: BaseSide

DolphinScheduler Queue object.

classmethod create_if_not_exists(user='userPythonGateway')

Create Base if not exists.

delete_all()

Delete all method.

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {}
_KEY_ATTR: set = {'description', 'name'}
class pydolphinscheduler.models.Tenant(name: str = 'tenant_pydolphin', queue: str = 'queuePythonGateway', description: str | None = None, tenant_id: int | None = None, code: str | None = None, user_name: str | None = None)[source]

Bases: BaseSide

DolphinScheduler Tenant object.

create_if_not_exists(queue_name: str, user='userPythonGateway') None[source]

Create Tenant if not exists.

delete() None[source]

Delete Tenant.

delete_all()

Delete all method.

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

classmethod get_tenant(code: str) Tenant[source]

Get Tenant list.

update(user='userPythonGateway', code=None, queue_id=None, description=None) None[source]

Update Tenant.

_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {}
_KEY_ATTR: set = {'description', 'name'}
class pydolphinscheduler.models.User(name: str, password: str | None = 'userPythonGateway', email: str | None = 'userPythonGateway@dolphinscheduler.com', phone: str | None = '11111111111', tenant: str | None = 'tenant_pydolphin', queue: str | None = 'queuePythonGateway', status: int | None = 1)[source]

Bases: BaseSide

DolphinScheduler User object.

create_if_not_exists(**kwargs)[source]

Create User if not exists.

create_tenant_if_not_exists() None[source]

Create tenant object.

delete() None[source]

Delete User.

delete_all()

Delete all method.

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

classmethod get_user(user_id) User[source]

Get User.

update(password=None, email=None, phone=None, tenant=None, queue=None, status=None) None[source]

Update User.

_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {}
_KEY_ATTR: set = {'email', 'name', 'password', 'phone', 'queue', 'status', 'tenant'}
class pydolphinscheduler.models.WorkerGroup(name: str, address: str, description: str | None = None)[source]

Bases: BaseSide

DolphinScheduler Worker Group object.

classmethod create_if_not_exists(user='userPythonGateway')

Create Base if not exists.

delete_all()

Delete all method.

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {}
_KEY_ATTR: set = {'description', 'name'}

Tasks

Init pydolphinscheduler.tasks package.

class pydolphinscheduler.tasks.Condition(name: str, condition: ConditionOperator, success_task: Task, failed_task: Task, *args, **kwargs)[source]

Bases: BatchTask

Task condition object, declare behavior for condition task to dolphinscheduler.

_get_attr() set[str]

Get final task task_params attribute.

Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.

_set_dep() None[source]

Set upstream according to parameter condition.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

add_in(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add input parameters.

Parameters:
  • name – name of the input parameter.

  • value – value of the input parameter.

It could be simply command:

task.add_in("a")
task.add_in("b", 123)
task.add_in("c", bool)
task.add_in("d", ParameterType.LONG(123))
add_out(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add output parameters.

Parameters:
  • name – name of the output parameter.

  • value – value of the output parameter.

It could be simply command:

task.add_out("a")
task.add_out("b", 123)
task.add_out("c", bool)
task.add_out("d", ParameterType.LONG(123))
gen_code_and_version() tuple

Generate task code and version from java gateway.

If task name do not exists in workflow before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_content()

Get the file content according to the resource plugin.

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

get_plugin()

Return the resource plug-in.

according to parameter resource_plugin and parameter workflow.resource_plugin.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'is_cache', 'name', 'task_execute_type', 'task_group_id', 'task_group_priority', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_downstream_task_codes: set[int]
_task_custom_attr: set = {}
_task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
_task_ignore_attr: set = {}
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
property condition_result: dict

Get condition result define for java gateway.

property environment_code: str

Convert environment name to code.

ext: set = None
ext_attr: str | types.FunctionType = None
property is_cache: str

Whether the cache is being set or not.

property local_params

Convert local params.

property resource_list: list[dict[str, Resource]]

Get task define attribute resource_list.

property task_params: dict

Override Task.task_params for Condition task.

Condition task have some specials attribute dependence, and in most of the task this attribute is None and use empty dict {} as default value. We do not use class attribute _task_custom_attr due to avoid attribute cover.

property timeout: int

Get attribute timeout.

property timeout_flag: str

Whether the timeout attribute is being set or not.

property user_name: str | None

Return username of workflow.

property workflow: Workflow | None

Get attribute workflow.

class pydolphinscheduler.tasks.CustomDataX(name: str, json: str, xms: int | None = 1, xmx: int | None = 1, *args, **kwargs)[source]

Bases: WorkerResourceMixin, BatchTask

Task CustomDatax object, declare behavior for custom DataX task to dolphinscheduler.

You provider json template for DataX, it can synchronize data according to the template you provided.

Parameters:
  • name – task name for this task

  • json

    json template string, or json file path for custom DataX task, CustomDataX will not format json template, you should format by yourself.

    • Use config string directly instead of json file path * should use json.dumps() to format it if your json template is dict

      import json
      
      custom = CustomDataX(
          name="custom_datax",
          json=json.dumps({"job": {"content": [{"reader": {"name": "mysqlreader"}}]}}),
      )
      
      • or format it by manual if your json template is native str.

    • Use json file path, the format it shows in web UI is depended on your json file content.

      import json
      
      custom = CustomDataX(
          name="custom_datax",
          # web UI datax config will show as json file content
          json="/path/to/datax.json",
      )
      

  • xms – jvm param about min memory for task datax running, default is 1g

  • xmx – jvm param about max memory for task datax running, default is 1g

_get_attr() set[str]

Get final task task_params attribute.

Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

add_attr(**kwargs)

Add attributes to WorkerResource, include cpu_quota and memory_max now.

add_in(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add input parameters.

Parameters:
  • name – name of the input parameter.

  • value – value of the input parameter.

It could be simply command:

task.add_in("a")
task.add_in("b", 123)
task.add_in("c", bool)
task.add_in("d", ParameterType.LONG(123))
add_out(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add output parameters.

Parameters:
  • name – name of the output parameter.

  • value – value of the output parameter.

It could be simply command:

task.add_out("a")
task.add_out("b", 123)
task.add_out("c", bool)
task.add_out("d", ParameterType.LONG(123))
gen_code_and_version() tuple

Generate task code and version from java gateway.

If task name do not exists in workflow before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_content()

Get the file content according to the resource plugin.

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

get_plugin()

Return the resource plug-in.

according to parameter resource_plugin and parameter workflow.resource_plugin.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

CUSTOM_CONFIG = 1
DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'is_cache', 'name', 'task_execute_type', 'task_group_id', 'task_group_priority', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_downstream_task_codes: set[int]
_task_custom_attr: set = {'custom_config', 'json', 'xms', 'xmx'}
_task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
_task_ignore_attr: set = {}
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
property condition_result: dict

Get attribute condition_result.

property cpu_quota

Get cpu_quota.

property environment_code: str

Convert environment name to code.

ext: set = {'.json'}
ext_attr: str = '_json'
property is_cache: str

Whether the cache is being set or not.

property local_params

Convert local params.

property memory_max

Get memory_max.

property resource_list: list[dict[str, Resource]]

Get task define attribute resource_list.

property task_params: dict | None

Get task parameter object.

Will get result to combine _task_custom_attr and custom_attr.

property timeout: int

Get attribute timeout.

property timeout_flag: str

Whether the timeout attribute is being set or not.

property user_name: str | None

Return username of workflow.

property workflow: Workflow | None

Get attribute workflow.

class pydolphinscheduler.tasks.DVCDownload(name: str, repository: str, data_path_in_dvc_repository: str, data_path_in_worker: str, version: str, *args, **kwargs)[source]

Bases: BaseDVC

Task DVC Download object, declare behavior for DVC Download task to dolphinscheduler.

_get_attr() set[str]

Get final task task_params attribute.

Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

add_in(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add input parameters.

Parameters:
  • name – name of the input parameter.

  • value – value of the input parameter.

It could be simply command:

task.add_in("a")
task.add_in("b", 123)
task.add_in("c", bool)
task.add_in("d", ParameterType.LONG(123))
add_out(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add output parameters.

Parameters:
  • name – name of the output parameter.

  • value – value of the output parameter.

It could be simply command:

task.add_out("a")
task.add_out("b", 123)
task.add_out("c", bool)
task.add_out("d", ParameterType.LONG(123))
gen_code_and_version() tuple

Generate task code and version from java gateway.

If task name do not exists in workflow before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_content()

Get the file content according to the resource plugin.

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

get_plugin()

Return the resource plug-in.

according to parameter resource_plugin and parameter workflow.resource_plugin.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'is_cache', 'name', 'task_execute_type', 'task_group_id', 'task_group_priority', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_child_task_dvc_attr = {'dvc_data_location', 'dvc_load_save_data_path', 'dvc_version'}
_downstream_task_codes: set[int]
_task_custom_attr: set = {'dvc_repository', 'dvc_task_type'}
_task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
_task_ignore_attr: set = {}
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
property condition_result: dict

Get attribute condition_result.

dvc_task_type = 'Download'
property environment_code: str

Convert environment name to code.

ext: set = None
ext_attr: str | types.FunctionType = None
property is_cache: str

Whether the cache is being set or not.

property local_params

Convert local params.

property resource_list: list[dict[str, Resource]]

Get task define attribute resource_list.

property task_params: dict

Return task params.

property timeout: int

Get attribute timeout.

property timeout_flag: str

Whether the timeout attribute is being set or not.

property user_name: str | None

Return username of workflow.

property workflow: Workflow | None

Get attribute workflow.

class pydolphinscheduler.tasks.DVCInit(name: str, repository: str, store_url: str, *args, **kwargs)[source]

Bases: BaseDVC

Task DVC Init object, declare behavior for DVC Init task to dolphinscheduler.

_get_attr() set[str]

Get final task task_params attribute.

Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

add_in(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add input parameters.

Parameters:
  • name – name of the input parameter.

  • value – value of the input parameter.

It could be simply command:

task.add_in("a")
task.add_in("b", 123)
task.add_in("c", bool)
task.add_in("d", ParameterType.LONG(123))
add_out(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add output parameters.

Parameters:
  • name – name of the output parameter.

  • value – value of the output parameter.

It could be simply command:

task.add_out("a")
task.add_out("b", 123)
task.add_out("c", bool)
task.add_out("d", ParameterType.LONG(123))
gen_code_and_version() tuple

Generate task code and version from java gateway.

If task name do not exists in workflow before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_content()

Get the file content according to the resource plugin.

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

get_plugin()

Return the resource plug-in.

according to parameter resource_plugin and parameter workflow.resource_plugin.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'is_cache', 'name', 'task_execute_type', 'task_group_id', 'task_group_priority', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_child_task_dvc_attr = {'dvc_store_url'}
_downstream_task_codes: set[int]
_task_custom_attr: set = {'dvc_repository', 'dvc_task_type'}
_task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
_task_ignore_attr: set = {}
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
property condition_result: dict

Get attribute condition_result.

dvc_task_type = 'Init DVC'
property environment_code: str

Convert environment name to code.

ext: set = None
ext_attr: str | types.FunctionType = None
property is_cache: str

Whether the cache is being set or not.

property local_params

Convert local params.

property resource_list: list[dict[str, Resource]]

Get task define attribute resource_list.

property task_params: dict

Return task params.

property timeout: int

Get attribute timeout.

property timeout_flag: str

Whether the timeout attribute is being set or not.

property user_name: str | None

Return username of workflow.

property workflow: Workflow | None

Get attribute workflow.

class pydolphinscheduler.tasks.DVCUpload(name: str, repository: str, data_path_in_worker: str, data_path_in_dvc_repository: str, version: str, message: str, *args, **kwargs)[source]

Bases: BaseDVC

Task DVC Upload object, declare behavior for DVC Upload task to dolphinscheduler.

_get_attr() set[str]

Get final task task_params attribute.

Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

add_in(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add input parameters.

Parameters:
  • name – name of the input parameter.

  • value – value of the input parameter.

It could be simply command:

task.add_in("a")
task.add_in("b", 123)
task.add_in("c", bool)
task.add_in("d", ParameterType.LONG(123))
add_out(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add output parameters.

Parameters:
  • name – name of the output parameter.

  • value – value of the output parameter.

It could be simply command:

task.add_out("a")
task.add_out("b", 123)
task.add_out("c", bool)
task.add_out("d", ParameterType.LONG(123))
gen_code_and_version() tuple

Generate task code and version from java gateway.

If task name do not exists in workflow before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_content()

Get the file content according to the resource plugin.

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

get_plugin()

Return the resource plug-in.

according to parameter resource_plugin and parameter workflow.resource_plugin.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'is_cache', 'name', 'task_execute_type', 'task_group_id', 'task_group_priority', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_child_task_dvc_attr = {'dvc_data_location', 'dvc_load_save_data_path', 'dvc_message', 'dvc_version'}
_downstream_task_codes: set[int]
_task_custom_attr: set = {'dvc_repository', 'dvc_task_type'}
_task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
_task_ignore_attr: set = {}
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
property condition_result: dict

Get attribute condition_result.

dvc_task_type = 'Upload'
property environment_code: str

Convert environment name to code.

ext: set = None
ext_attr: str | types.FunctionType = None
property is_cache: str

Whether the cache is being set or not.

property local_params

Convert local params.

property resource_list: list[dict[str, Resource]]

Get task define attribute resource_list.

property task_params: dict

Return task params.

property timeout: int

Get attribute timeout.

property timeout_flag: str

Whether the timeout attribute is being set or not.

property user_name: str | None

Return username of workflow.

property workflow: Workflow | None

Get attribute workflow.

class pydolphinscheduler.tasks.DataX(name: str, datasource_name: str, datatarget_name: str, sql: str, target_table: str, datasource_type: str | None = None, datatarget_type: str | None = None, job_speed_byte: int | None = 0, job_speed_record: int | None = 1000, pre_statements: list[str] | None = None, post_statements: list[str] | None = None, xms: int | None = 1, xmx: int | None = 1, *args, **kwargs)[source]

Bases: WorkerResourceMixin, BatchTask

Task DataX object, declare behavior for DataX task to dolphinscheduler.

It should run database datax job in multiply sql link engine, such as:

  • MySQL

  • Oracle

  • Postgresql

  • SQLServer

You provider datasource_name and datatarget_name contain connection information, it decisions which database type and database instance would synchronous data.

Parameters:
  • name – task name for this task

  • datasource_name – source database name for task datax to extract data, it must exist in dolphinscheduler’s datasource center otherwise task datax will raise exception.

  • datatarget_name – target database name for task datax to load data, it must exist in dolphinscheduler’s datasource center otherwise task datax will raise exception.

  • sql – sql statement for task datax to extract data form source database.

  • target_table – target table name for task datax to load data into target database.

  • datasource_type – source database type, dolphinscheduler use it to find :param:datasource_name in datasource center.

  • datasource_type – target database type, dolphinscheduler use it to find :param:datatarget_name in datasource center.

  • job_speed_byte – task datax job speed byte, default is 0. For more detail you can get from :seealso: https://github.com/alibaba/DataX

  • job_speed_record – task datax job speed record, default is 1000. For more detail you can get from :seealso: https://github.com/alibaba/DataX

  • pre_statements – task datax job pre statements, it will execute before task datax job start to load. default is None.

  • post_statements – task datax job post statements, it will execute after task datax job finish load. default is None.

_get_attr() set[str]

Get final task task_params attribute.

Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

add_attr(**kwargs)

Add attributes to WorkerResource, include cpu_quota and memory_max now.

add_in(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add input parameters.

Parameters:
  • name – name of the input parameter.

  • value – value of the input parameter.

It could be simply command:

task.add_in("a")
task.add_in("b", 123)
task.add_in("c", bool)
task.add_in("d", ParameterType.LONG(123))
add_out(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add output parameters.

Parameters:
  • name – name of the output parameter.

  • value – value of the output parameter.

It could be simply command:

task.add_out("a")
task.add_out("b", 123)
task.add_out("c", bool)
task.add_out("d", ParameterType.LONG(123))
gen_code_and_version() tuple

Generate task code and version from java gateway.

If task name do not exists in workflow before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_content()

Get the file content according to the resource plugin.

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

get_plugin()

Return the resource plug-in.

according to parameter resource_plugin and parameter workflow.resource_plugin.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

CUSTOM_CONFIG = 0
DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'is_cache', 'name', 'task_execute_type', 'task_group_id', 'task_group_priority', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_downstream_task_codes: set[int]
_task_custom_attr: set = {'custom_config', 'job_speed_byte', 'job_speed_record', 'post_statements', 'pre_statements', 'sql', 'target_table', 'xms', 'xmx'}
_task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
_task_ignore_attr: set = {}
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
property condition_result: dict

Get attribute condition_result.

property cpu_quota

Get cpu_quota.

property environment_code: str

Convert environment name to code.

ext: set = {'.sql'}
ext_attr: str = '_sql'
property is_cache: str

Whether the cache is being set or not.

property local_params

Convert local params.

property memory_max

Get memory_max.

property resource_list: list[dict[str, Resource]]

Get task define attribute resource_list.

property source_params: dict

Get source params for datax task.

property target_params: dict

Get target params for datax task.

property task_params: dict

Override Task.task_params for datax task.

datax task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here.

property timeout: int

Get attribute timeout.

property timeout_flag: str

Whether the timeout attribute is being set or not.

property user_name: str | None

Return username of workflow.

property workflow: Workflow | None

Get attribute workflow.

class pydolphinscheduler.tasks.Dependent(name: str, dependence: DependentOperator, *args, **kwargs)[source]

Bases: BatchTask

Task dependent object, declare behavior for dependent task to dolphinscheduler.

_get_attr() set[str]

Get final task task_params attribute.

Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

add_in(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add input parameters.

Parameters:
  • name – name of the input parameter.

  • value – value of the input parameter.

It could be simply command:

task.add_in("a")
task.add_in("b", 123)
task.add_in("c", bool)
task.add_in("d", ParameterType.LONG(123))
add_out(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add output parameters.

Parameters:
  • name – name of the output parameter.

  • value – value of the output parameter.

It could be simply command:

task.add_out("a")
task.add_out("b", 123)
task.add_out("c", bool)
task.add_out("d", ParameterType.LONG(123))
gen_code_and_version() tuple

Generate task code and version from java gateway.

If task name do not exists in workflow before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_content()

Get the file content according to the resource plugin.

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

get_plugin()

Return the resource plug-in.

according to parameter resource_plugin and parameter workflow.resource_plugin.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'is_cache', 'name', 'task_execute_type', 'task_group_id', 'task_group_priority', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_downstream_task_codes: set[int]
_task_custom_attr: set = {}
_task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
_task_ignore_attr: set = {}
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
property condition_result: dict

Get attribute condition_result.

property environment_code: str

Convert environment name to code.

ext: set = None
ext_attr: str | types.FunctionType = None
property is_cache: str

Whether the cache is being set or not.

property local_params

Convert local params.

property resource_list: list[dict[str, Resource]]

Get task define attribute resource_list.

property task_params: dict

Override Task.task_params for dependent task.

Dependent task have some specials attribute dependence, and in most of the task this attribute is None and use empty dict {} as default value. We do not use class attribute _task_custom_attr due to avoid attribute cover.

property timeout: int

Get attribute timeout.

property timeout_flag: str

Whether the timeout attribute is being set or not.

property user_name: str | None

Return username of workflow.

property workflow: Workflow | None

Get attribute workflow.

Bases: Engine

Task flink object, declare behavior for flink task to dolphinscheduler.

_get_attr() set[str]

Get final task task_params attribute.

Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

add_in(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add input parameters.

Parameters:
  • name – name of the input parameter.

  • value – value of the input parameter.

It could be simply command:

task.add_in("a")
task.add_in("b", 123)
task.add_in("c", bool)
task.add_in("d", ParameterType.LONG(123))
add_out(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add output parameters.

Parameters:
  • name – name of the output parameter.

  • value – value of the output parameter.

It could be simply command:

task.add_out("a")
task.add_out("b", 123)
task.add_out("c", bool)
task.add_out("d", ParameterType.LONG(123))
gen_code_and_version() tuple

Generate task code and version from java gateway.

If task name do not exists in workflow before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_content()

Get the file content according to the resource plugin.

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

get_jar_id() int

Get jar id from java gateway, a wrapper for get_resource_info().

get_plugin()

Return the resource plug-in.

according to parameter resource_plugin and parameter workflow.resource_plugin.

get_resource_info(program_type, main_package)

Get resource info from java gateway, contains resource id, name.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'is_cache', 'name', 'task_execute_type', 'task_group_id', 'task_group_priority', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_downstream_task_codes: set[int]
_task_custom_attr: set = {'app_name', 'deploy_mode', 'flink_version', 'job_manager_memory', 'main_args', 'others', 'parallelism', 'slot', 'task_manager', 'task_manager_memory'}
_task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
_task_ignore_attr: set = {}
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
property condition_result: dict

Get attribute condition_result.

property environment_code: str

Convert environment name to code.

ext: set = None
ext_attr: str | types.FunctionType = None
property is_cache: str

Whether the cache is being set or not.

property local_params

Convert local params.

property resource_list: list[dict[str, Resource]]

Get task define attribute resource_list.

property task_params: dict

Override Task.task_params for engine children task.

children task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here.

property timeout: int

Get attribute timeout.

property timeout_flag: str

Whether the timeout attribute is being set or not.

property user_name: str | None

Return username of workflow.

property workflow: Workflow | None

Get attribute workflow.

class pydolphinscheduler.tasks.Http(name: str, url: str, http_method: str | None = 'GET', http_params: dict | None = None, http_check_condition: str | None = 'STATUS_CODE_DEFAULT', condition: str | None = None, connect_timeout: int | None = 60000, socket_timeout: int | None = 60000, *args, **kwargs)[source]

Bases: BatchTask

Task HTTP object, declare behavior for HTTP task to dolphinscheduler.

param name:

The name or identifier for the HTTP task.

param url:

The URL endpoint for the HTTP request.

param http_method:

The HTTP method for the request (GET, POST, etc.). Defaults to HttpMethod.GET.

param http_params:

Parameters for the HTTP request. Defaults to None.

param http_check_condition:

Condition for checking the HTTP response status. Defaults to HttpCheckCondition.STATUS_CODE_DEFAULT.

param condition:

Additional condition to evaluate if http_check_condition is not STATUS_CODE_DEFAULT.

param connect_timeout:

Connection timeout for the HTTP request in milliseconds. Defaults to 60000.

param socket_timeout:

Socket timeout for the HTTP request in milliseconds. Defaults to 60000.

Attributes:
_task_custom_attr (set): A set containing custom attributes specific to the Http task,

including ‘url’, ‘http_method’, ‘http_params’, and more.

Raises:

PyDSParamException: Exception raised for invalid parameters, such as unsupported HTTP methods or conditions.

Example:

Usage example for creating an HTTP task: http_task = Http(name=”http_task”, url=”https://api.example.com”, http_method=”POST”, http_params={“key”: “value”})

_get_attr() set[str]

Get final task task_params attribute.

Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

add_in(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add input parameters.

Parameters:
  • name – name of the input parameter.

  • value – value of the input parameter.

It could be simply command:

task.add_in("a")
task.add_in("b", 123)
task.add_in("c", bool)
task.add_in("d", ParameterType.LONG(123))
add_out(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add output parameters.

Parameters:
  • name – name of the output parameter.

  • value – value of the output parameter.

It could be simply command:

task.add_out("a")
task.add_out("b", 123)
task.add_out("c", bool)
task.add_out("d", ParameterType.LONG(123))
gen_code_and_version() tuple

Generate task code and version from java gateway.

If task name do not exists in workflow before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_content()

Get the file content according to the resource plugin.

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

get_plugin()

Return the resource plug-in.

according to parameter resource_plugin and parameter workflow.resource_plugin.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'is_cache', 'name', 'task_execute_type', 'task_group_id', 'task_group_priority', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_downstream_task_codes: set[int]
_task_custom_attr: set = {'condition', 'connect_timeout', 'http_check_condition', 'http_method', 'http_params', 'socket_timeout', 'url'}
_task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
_task_ignore_attr: set = {}
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
property condition_result: dict

Get attribute condition_result.

property environment_code: str

Convert environment name to code.

ext: set = None
ext_attr: str | types.FunctionType = None
property http_params

Property to convert http_params using ParameterHelper when accessed.

property is_cache: str

Whether the cache is being set or not.

property local_params

Convert local params.

property resource_list: list[dict[str, Resource]]

Get task define attribute resource_list.

property task_params: dict | None

Get task parameter object.

Will get result to combine _task_custom_attr and custom_attr.

property timeout: int

Get attribute timeout.

property timeout_flag: str

Whether the timeout attribute is being set or not.

property user_name: str | None

Return username of workflow.

property workflow: Workflow | None

Get attribute workflow.

class pydolphinscheduler.tasks.Kubernetes(name: str, image: str, namespace: str, min_cpu_cores: float, min_memory_space: float, *args, **kwargs)[source]

Bases: BatchTask

Task Kubernetes object, declare behavior for Kubernetes task to dolphinscheduler.

Parameters:
  • name – task name

  • image – the registry url for image.

  • namespace – the namespace for running Kubernetes task.

  • min_cpu_cores – min CPU requirement for running Kubernetes task.

  • min_memory_space – min memory requirement for running Kubernetes task.

  • params_map – It is a local user-defined parameter for Kubernetes task.

_get_attr() set[str]

Get final task task_params attribute.

Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

add_in(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add input parameters.

Parameters:
  • name – name of the input parameter.

  • value – value of the input parameter.

It could be simply command:

task.add_in("a")
task.add_in("b", 123)
task.add_in("c", bool)
task.add_in("d", ParameterType.LONG(123))
add_out(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add output parameters.

Parameters:
  • name – name of the output parameter.

  • value – value of the output parameter.

It could be simply command:

task.add_out("a")
task.add_out("b", 123)
task.add_out("c", bool)
task.add_out("d", ParameterType.LONG(123))
gen_code_and_version() tuple

Generate task code and version from java gateway.

If task name do not exists in workflow before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_content()

Get the file content according to the resource plugin.

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

get_plugin()

Return the resource plug-in.

according to parameter resource_plugin and parameter workflow.resource_plugin.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'is_cache', 'name', 'task_execute_type', 'task_group_id', 'task_group_priority', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_downstream_task_codes: set[int]
_task_custom_attr: set = {'image', 'min_cpu_cores', 'min_memory_space', 'namespace'}
_task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
_task_ignore_attr: set = {}
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
property condition_result: dict

Get attribute condition_result.

property environment_code: str

Convert environment name to code.

ext: set = None
ext_attr: str | types.FunctionType = None
property is_cache: str

Whether the cache is being set or not.

property local_params

Convert local params.

property resource_list: list[dict[str, Resource]]

Get task define attribute resource_list.

property task_params: dict | None

Get task parameter object.

Will get result to combine _task_custom_attr and custom_attr.

property timeout: int

Get attribute timeout.

property timeout_flag: str

Whether the timeout attribute is being set or not.

property user_name: str | None

Return username of workflow.

property workflow: Workflow | None

Get attribute workflow.

class pydolphinscheduler.tasks.MLFlowProjectsAutoML(name: str, data_path: str, automl_tool: str | None = 'flaml', mlflow_tracking_uri: str | None = 'http://127.0.0.1:5000', experiment_name: str | None = '', model_name: str | None = '', parameters: str | None = '', *args, **kwargs)[source]

Bases: BaseMLflow

Task MLflow projects object, declare behavior for AutoML task to dolphinscheduler.

Parameters:
  • name – task name

  • data_path – data path of MLflow Project, Support git address and directory on worker.

  • automl_tool – The AutoML tool used, currently supports autosklearn and flaml.

  • mlflow_tracking_uri – MLflow tracking server uri, default is http://127.0.0.1:5000

  • experiment_name – MLflow experiment name, default is empty

  • model_name – MLflow model name, default is empty

  • parameters – MLflow project parameters, default is empty

_get_attr() set[str]

Get final task task_params attribute.

Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

add_in(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add input parameters.

Parameters:
  • name – name of the input parameter.

  • value – value of the input parameter.

It could be simply command:

task.add_in("a")
task.add_in("b", 123)
task.add_in("c", bool)
task.add_in("d", ParameterType.LONG(123))
add_out(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add output parameters.

Parameters:
  • name – name of the output parameter.

  • value – value of the output parameter.

It could be simply command:

task.add_out("a")
task.add_out("b", 123)
task.add_out("c", bool)
task.add_out("d", ParameterType.LONG(123))
gen_code_and_version() tuple

Generate task code and version from java gateway.

If task name do not exists in workflow before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_content()

Get the file content according to the resource plugin.

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

get_plugin()

Return the resource plug-in.

according to parameter resource_plugin and parameter workflow.resource_plugin.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'is_cache', 'name', 'task_execute_type', 'task_group_id', 'task_group_priority', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_child_task_mlflow_attr = {'automl_tool', 'data_path', 'experiment_name', 'mlflow_job_type', 'model_name', 'params', 'register_model'}
_downstream_task_codes: set[int]
_task_custom_attr: set = {'mlflow_task_type', 'mlflow_tracking_uri'}
_task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
_task_ignore_attr: set = {}
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
property condition_result: dict

Get attribute condition_result.

property environment_code: str

Convert environment name to code.

ext: set = None
ext_attr: str | types.FunctionType = None
property is_cache: str

Whether the cache is being set or not.

property local_params

Convert local params.

mlflow_job_type = 'AutoML'
mlflow_task_type = 'MLflow Projects'
property resource_list: list[dict[str, Resource]]

Get task define attribute resource_list.

property task_params: dict

Return task params.

property timeout: int

Get attribute timeout.

property timeout_flag: str

Whether the timeout attribute is being set or not.

property user_name: str | None

Return username of workflow.

property workflow: Workflow | None

Get attribute workflow.

class pydolphinscheduler.tasks.MLFlowProjectsBasicAlgorithm(name: str, data_path: str, algorithm: str | None = 'lightgbm', mlflow_tracking_uri: str | None = 'http://127.0.0.1:5000', experiment_name: str | None = '', model_name: str | None = '', parameters: str | None = '', search_params: str | None = '', *args, **kwargs)[source]

Bases: BaseMLflow

Task MLflow projects object, declare behavior for BasicAlgorithm task to dolphinscheduler.

Parameters:
  • name – task name

  • data_path – data path of MLflow Project, Support git address and directory on worker.

  • algorithm – The selected algorithm currently supports LR, SVM, LightGBM and XGboost based on scikit-learn form.

  • mlflow_tracking_uri – MLflow tracking server uri, default is http://127.0.0.1:5000

  • experiment_name – MLflow experiment name, default is empty

  • model_name – MLflow model name, default is empty

  • parameters – MLflow project parameters, default is empty

  • search_params – Whether to search the parameters, default is empty

_get_attr() set[str]

Get final task task_params attribute.

Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

add_in(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add input parameters.

Parameters:
  • name – name of the input parameter.

  • value – value of the input parameter.

It could be simply command:

task.add_in("a")
task.add_in("b", 123)
task.add_in("c", bool)
task.add_in("d", ParameterType.LONG(123))
add_out(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add output parameters.

Parameters:
  • name – name of the output parameter.

  • value – value of the output parameter.

It could be simply command:

task.add_out("a")
task.add_out("b", 123)
task.add_out("c", bool)
task.add_out("d", ParameterType.LONG(123))
gen_code_and_version() tuple

Generate task code and version from java gateway.

If task name do not exists in workflow before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_content()

Get the file content according to the resource plugin.

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

get_plugin()

Return the resource plug-in.

according to parameter resource_plugin and parameter workflow.resource_plugin.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'is_cache', 'name', 'task_execute_type', 'task_group_id', 'task_group_priority', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_child_task_mlflow_attr = {'algorithm', 'data_path', 'experiment_name', 'mlflow_job_type', 'model_name', 'params', 'register_model', 'search_params'}
_downstream_task_codes: set[int]
_task_custom_attr: set = {'mlflow_task_type', 'mlflow_tracking_uri'}
_task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
_task_ignore_attr: set = {}
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
property condition_result: dict

Get attribute condition_result.

property environment_code: str

Convert environment name to code.

ext: set = None
ext_attr: str | types.FunctionType = None
property is_cache: str

Whether the cache is being set or not.

property local_params

Convert local params.

mlflow_job_type = 'BasicAlgorithm'
mlflow_task_type = 'MLflow Projects'
property resource_list: list[dict[str, Resource]]

Get task define attribute resource_list.

property task_params: dict

Return task params.

property timeout: int

Get attribute timeout.

property timeout_flag: str

Whether the timeout attribute is being set or not.

property user_name: str | None

Return username of workflow.

property workflow: Workflow | None

Get attribute workflow.

class pydolphinscheduler.tasks.MLFlowProjectsCustom(name: str, repository: str, mlflow_tracking_uri: str | None = 'http://127.0.0.1:5000', experiment_name: str | None = '', parameters: str | None = '', version: str | None = 'master', *args, **kwargs)[source]

Bases: BaseMLflow

Task MLflow projects object, declare behavior for MLflow Custom projects task to dolphinscheduler.

Parameters:
  • name – task name

  • repository – Repository url of MLflow Project, Support git address and directory on worker. If it’s in a subdirectory, We add # to support this (same as mlflow run) , for example https://github.com/mlflow/mlflow#examples/xgboost/xgboost_native.

  • mlflow_tracking_uri – MLflow tracking server uri, default is http://127.0.0.1:5000

  • experiment_name – MLflow experiment name, default is empty

  • parameters – MLflow project parameters, default is empty

  • version – MLflow project version, default is master

_get_attr() set[str]

Get final task task_params attribute.

Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

add_in(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add input parameters.

Parameters:
  • name – name of the input parameter.

  • value – value of the input parameter.

It could be simply command:

task.add_in("a")
task.add_in("b", 123)
task.add_in("c", bool)
task.add_in("d", ParameterType.LONG(123))
add_out(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add output parameters.

Parameters:
  • name – name of the output parameter.

  • value – value of the output parameter.

It could be simply command:

task.add_out("a")
task.add_out("b", 123)
task.add_out("c", bool)
task.add_out("d", ParameterType.LONG(123))
gen_code_and_version() tuple

Generate task code and version from java gateway.

If task name do not exists in workflow before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_content()

Get the file content according to the resource plugin.

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

get_plugin()

Return the resource plug-in.

according to parameter resource_plugin and parameter workflow.resource_plugin.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'is_cache', 'name', 'task_execute_type', 'task_group_id', 'task_group_priority', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_child_task_mlflow_attr = {'experiment_name', 'mlflow_job_type', 'mlflow_project_repository', 'mlflow_project_version', 'params'}
_downstream_task_codes: set[int]
_task_custom_attr: set = {'mlflow_task_type', 'mlflow_tracking_uri'}
_task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
_task_ignore_attr: set = {}
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
property condition_result: dict

Get attribute condition_result.

property environment_code: str

Convert environment name to code.

ext: set = None
ext_attr: str | types.FunctionType = None
property is_cache: str

Whether the cache is being set or not.

property local_params

Convert local params.

mlflow_job_type = 'CustomProject'
mlflow_task_type = 'MLflow Projects'
property resource_list: list[dict[str, Resource]]

Get task define attribute resource_list.

property task_params: dict

Return task params.

property timeout: int

Get attribute timeout.

property timeout_flag: str

Whether the timeout attribute is being set or not.

property user_name: str | None

Return username of workflow.

property workflow: Workflow | None

Get attribute workflow.

class pydolphinscheduler.tasks.MLflowModels(name: str, model_uri: str, mlflow_tracking_uri: str | None = 'http://127.0.0.1:5000', deploy_mode: str | None = 'DOCKER', port: int | None = 7000, *args, **kwargs)[source]

Bases: BaseMLflow

Task MLflow models object, declare behavior for MLflow models task to dolphinscheduler.

Deploy machine learning models in diverse serving environments.

Parameters:
  • name – task name

  • model_uri – Model-URI of MLflow , support models:/<model_name>/suffix format and runs:/ format. See https://mlflow.org/docs/latest/tracking.html#artifact-stores

  • mlflow_tracking_uri – MLflow tracking server uri, default is http://127.0.0.1:5000

  • deploy_mode – MLflow deploy mode, support MLFLOW, DOCKER, default is DOCKER

  • port – deploy port, default is 7000

_get_attr() set[str]

Get final task task_params attribute.

Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

add_in(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add input parameters.

Parameters:
  • name – name of the input parameter.

  • value – value of the input parameter.

It could be simply command:

task.add_in("a")
task.add_in("b", 123)
task.add_in("c", bool)
task.add_in("d", ParameterType.LONG(123))
add_out(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add output parameters.

Parameters:
  • name – name of the output parameter.

  • value – value of the output parameter.

It could be simply command:

task.add_out("a")
task.add_out("b", 123)
task.add_out("c", bool)
task.add_out("d", ParameterType.LONG(123))
gen_code_and_version() tuple

Generate task code and version from java gateway.

If task name do not exists in workflow before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_content()

Get the file content according to the resource plugin.

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

get_plugin()

Return the resource plug-in.

according to parameter resource_plugin and parameter workflow.resource_plugin.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'is_cache', 'name', 'task_execute_type', 'task_group_id', 'task_group_priority', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_child_task_mlflow_attr = {'deploy_model_key', 'deploy_port', 'deploy_type'}
_downstream_task_codes: set[int]
_task_custom_attr: set = {'mlflow_task_type', 'mlflow_tracking_uri'}
_task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
_task_ignore_attr: set = {}
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
property condition_result: dict

Get attribute condition_result.

property environment_code: str

Convert environment name to code.

ext: set = None
ext_attr: str | types.FunctionType = None
property is_cache: str

Whether the cache is being set or not.

property local_params

Convert local params.

mlflow_task_type = 'MLflow Models'
property resource_list: list[dict[str, Resource]]

Get task define attribute resource_list.

property task_params: dict

Return task params.

property timeout: int

Get attribute timeout.

property timeout_flag: str

Whether the timeout attribute is being set or not.

property user_name: str | None

Return username of workflow.

property workflow: Workflow | None

Get attribute workflow.

class pydolphinscheduler.tasks.MR(name: str, main_class: str, main_package: str, program_type: ProgramType | None = 'SCALA', app_name: str | None = None, main_args: str | None = None, others: str | None = None, *args, **kwargs)[source]

Bases: Engine

Task mr object, declare behavior for mr task to dolphinscheduler.

_get_attr() set[str]

Get final task task_params attribute.

Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

add_in(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add input parameters.

Parameters:
  • name – name of the input parameter.

  • value – value of the input parameter.

It could be simply command:

task.add_in("a")
task.add_in("b", 123)
task.add_in("c", bool)
task.add_in("d", ParameterType.LONG(123))
add_out(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add output parameters.

Parameters:
  • name – name of the output parameter.

  • value – value of the output parameter.

It could be simply command:

task.add_out("a")
task.add_out("b", 123)
task.add_out("c", bool)
task.add_out("d", ParameterType.LONG(123))
gen_code_and_version() tuple

Generate task code and version from java gateway.

If task name do not exists in workflow before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_content()

Get the file content according to the resource plugin.

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

get_jar_id() int

Get jar id from java gateway, a wrapper for get_resource_info().

get_plugin()

Return the resource plug-in.

according to parameter resource_plugin and parameter workflow.resource_plugin.

get_resource_info(program_type, main_package)

Get resource info from java gateway, contains resource id, name.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'is_cache', 'name', 'task_execute_type', 'task_group_id', 'task_group_priority', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_downstream_task_codes: set[int]
_task_custom_attr: set = {'app_name', 'main_args', 'others'}
_task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
_task_ignore_attr: set = {}
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
property condition_result: dict

Get attribute condition_result.

property environment_code: str

Convert environment name to code.

ext: set = None
ext_attr: str | types.FunctionType = None
property is_cache: str

Whether the cache is being set or not.

property local_params

Convert local params.

property resource_list: list[dict[str, Resource]]

Get task define attribute resource_list.

property task_params: dict

Override Task.task_params for engine children task.

children task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here.

property timeout: int

Get attribute timeout.

property timeout_flag: str

Whether the timeout attribute is being set or not.

property user_name: str | None

Return username of workflow.

property workflow: Workflow | None

Get attribute workflow.

class pydolphinscheduler.tasks.OpenMLDB(name, zookeeper, zookeeper_path, execute_mode, sql, *args, **kwargs)[source]

Bases: BatchTask

Task OpenMLDB object, declare behavior for OpenMLDB task to dolphinscheduler.

Parameters:
  • name – task name

  • zookeeper – OpenMLDB cluster zookeeper address, e.g. 127.0.0.1:2181.

  • zookeeper_path – OpenMLDB cluster zookeeper path, e.g. /openmldb.

  • execute_mode – Determine the init mode, offline or online. You can switch it in sql statementself.

  • sql – SQL statement.

_get_attr() set[str]

Get final task task_params attribute.

Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

add_in(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add input parameters.

Parameters:
  • name – name of the input parameter.

  • value – value of the input parameter.

It could be simply command:

task.add_in("a")
task.add_in("b", 123)
task.add_in("c", bool)
task.add_in("d", ParameterType.LONG(123))
add_out(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add output parameters.

Parameters:
  • name – name of the output parameter.

  • value – value of the output parameter.

It could be simply command:

task.add_out("a")
task.add_out("b", 123)
task.add_out("c", bool)
task.add_out("d", ParameterType.LONG(123))
gen_code_and_version() tuple

Generate task code and version from java gateway.

If task name do not exists in workflow before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_content()

Get the file content according to the resource plugin.

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

get_plugin()

Return the resource plug-in.

according to parameter resource_plugin and parameter workflow.resource_plugin.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'is_cache', 'name', 'task_execute_type', 'task_group_id', 'task_group_priority', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_downstream_task_codes: set[int]
_task_custom_attr: set = {'execute_mode', 'sql', 'zk', 'zk_path'}
_task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
_task_ignore_attr: set = {}
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
property condition_result: dict

Get attribute condition_result.

property environment_code: str

Convert environment name to code.

ext: set = None
ext_attr: str | types.FunctionType = None
property is_cache: str

Whether the cache is being set or not.

property local_params

Convert local params.

property resource_list: list[dict[str, Resource]]

Get task define attribute resource_list.

property task_params: dict | None

Get task parameter object.

Will get result to combine _task_custom_attr and custom_attr.

property timeout: int

Get attribute timeout.

property timeout_flag: str

Whether the timeout attribute is being set or not.

property user_name: str | None

Return username of workflow.

property workflow: Workflow | None

Get attribute workflow.

class pydolphinscheduler.tasks.Procedure(name: str, datasource_name: str, method: str, datasource_type: str | None = None, *args, **kwargs)[source]

Bases: BatchTask

Task Procedure object, declare behavior for Procedure task to dolphinscheduler.

It should run database procedure job in multiply sql lik engine, such as: - ClickHouse - DB2 - HIVE - MySQL - Oracle - Postgresql - Presto - SQLServer You provider datasource_name contain connection information, it decisions which database type and database instance would run this sql.

_get_attr() set[str]

Get final task task_params attribute.

Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

add_in(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add input parameters.

Parameters:
  • name – name of the input parameter.

  • value – value of the input parameter.

It could be simply command:

task.add_in("a")
task.add_in("b", 123)
task.add_in("c", bool)
task.add_in("d", ParameterType.LONG(123))
add_out(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add output parameters.

Parameters:
  • name – name of the output parameter.

  • value – value of the output parameter.

It could be simply command:

task.add_out("a")
task.add_out("b", 123)
task.add_out("c", bool)
task.add_out("d", ParameterType.LONG(123))
gen_code_and_version() tuple

Generate task code and version from java gateway.

If task name do not exists in workflow before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_content()

Get the file content according to the resource plugin.

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

get_plugin()

Return the resource plug-in.

according to parameter resource_plugin and parameter workflow.resource_plugin.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'is_cache', 'name', 'task_execute_type', 'task_group_id', 'task_group_priority', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_downstream_task_codes: set[int]
_task_custom_attr: set = {'method'}
_task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
_task_ignore_attr: set = {}
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
property condition_result: dict

Get attribute condition_result.

property datasource: dict

Get datasource for procedure task.

property environment_code: str

Convert environment name to code.

ext: set = None
ext_attr: str | types.FunctionType = None
property is_cache: str

Whether the cache is being set or not.

property local_params

Convert local params.

property resource_list: list[dict[str, Resource]]

Get task define attribute resource_list.

property task_params: dict

Override Task.task_params for produce task.

produce task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here.

property timeout: int

Get attribute timeout.

property timeout_flag: str

Whether the timeout attribute is being set or not.

property user_name: str | None

Return username of workflow.

property workflow: Workflow | None

Get attribute workflow.

class pydolphinscheduler.tasks.Python(name: str, definition: str | LambdaType, *args, **kwargs)[source]

Bases: WorkerResourceMixin, BatchTask

Task Python object, declare behavior for Python task to dolphinscheduler.

Python task support two types of parameters for :param:definition, and here is an example:

Using str type of :param:definition

python_task = Python(name="str_type", definition="print('Hello Python task.')")

Or using Python callable type of :param:definition

def foo():
    print("Hello Python task.")

python_task = Python(name="str_type", definition=foo)
Parameters:
  • name – The name for Python task. It define the task name.

  • definition – String format of Python script you want to execute or Python callable you want to execute.

_build_exe_str() str[source]

Build executable string from given definition.

Attribute self.definition almost is a function, we need to call this function after parsing it to string. The easier way to call a function is using syntax func() and we use it to call it too.

_get_attr() set[str]

Get final task task_params attribute.

Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

add_attr(**kwargs)

Add attributes to WorkerResource, include cpu_quota and memory_max now.

add_in(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add input parameters.

Parameters:
  • name – name of the input parameter.

  • value – value of the input parameter.

It could be simply command:

task.add_in("a")
task.add_in("b", 123)
task.add_in("c", bool)
task.add_in("d", ParameterType.LONG(123))
add_out(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add output parameters.

Parameters:
  • name – name of the output parameter.

  • value – value of the output parameter.

It could be simply command:

task.add_out("a")
task.add_out("b", 123)
task.add_out("c", bool)
task.add_out("d", ParameterType.LONG(123))
gen_code_and_version() tuple

Generate task code and version from java gateway.

If task name do not exists in workflow before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_content()

Get the file content according to the resource plugin.

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

get_plugin()

Return the resource plug-in.

according to parameter resource_plugin and parameter workflow.resource_plugin.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'is_cache', 'name', 'task_execute_type', 'task_group_id', 'task_group_priority', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_downstream_task_codes: set[int]
_task_custom_attr: set = {'raw_script'}
_task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
_task_ignore_attr: set = {}
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
property condition_result: dict

Get attribute condition_result.

property cpu_quota

Get cpu_quota.

property environment_code: str

Convert environment name to code.

ext: set = {'.py'}
ext_attr: str | types.FunctionType = '_definition'
property is_cache: str

Whether the cache is being set or not.

property local_params

Convert local params.

property memory_max

Get memory_max.

property raw_script: str

Get python task define attribute raw_script.

property resource_list: list[dict[str, Resource]]

Get task define attribute resource_list.

property task_params: dict | None

Get task parameter object.

Will get result to combine _task_custom_attr and custom_attr.

property timeout: int

Get attribute timeout.

property timeout_flag: str

Whether the timeout attribute is being set or not.

property user_name: str | None

Return username of workflow.

property workflow: Workflow | None

Get attribute workflow.

class pydolphinscheduler.tasks.Pytorch(name: str, script: str, script_params: str = '', project_path: str | None = '.', is_create_environment: bool | None = False, python_command: str | None = '${PYTHON_HOME}', python_env_tool: str | None = 'conda', requirements: str | None = 'requirements.txt', conda_python_version: str | None = '3.7', *args, **kwargs)[source]

Bases: WorkerResourceMixin, BatchTask

Task Pytorch object, declare behavior for Pytorch task to dolphinscheduler.

See also: DolphinScheduler Pytorch Task Plugin

Parameters:
  • name – task name

  • script – Entry to the Python script file that you want to run.

  • script_params – Input parameters at run time.

  • project_path – The path to the project. Default “.” .

  • is_create_environment – is create environment. Default False.

  • python_command – The path to the python command. Default “${PYTHON_HOME}”.

  • python_env_tool – The python environment tool. Default “conda”.

  • requirements – The path to the requirements.txt file. Default “requirements.txt”.

  • conda_python_version – The python version of conda environment. Default “3.7”.

_get_attr() set[str]

Get final task task_params attribute.

Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

add_attr(**kwargs)

Add attributes to WorkerResource, include cpu_quota and memory_max now.

add_in(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add input parameters.

Parameters:
  • name – name of the input parameter.

  • value – value of the input parameter.

It could be simply command:

task.add_in("a")
task.add_in("b", 123)
task.add_in("c", bool)
task.add_in("d", ParameterType.LONG(123))
add_out(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add output parameters.

Parameters:
  • name – name of the output parameter.

  • value – value of the output parameter.

It could be simply command:

task.add_out("a")
task.add_out("b", 123)
task.add_out("c", bool)
task.add_out("d", ParameterType.LONG(123))
gen_code_and_version() tuple

Generate task code and version from java gateway.

If task name do not exists in workflow before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_content()

Get the file content according to the resource plugin.

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

get_plugin()

Return the resource plug-in.

according to parameter resource_plugin and parameter workflow.resource_plugin.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'is_cache', 'name', 'task_execute_type', 'task_group_id', 'task_group_priority', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_downstream_task_codes: set[int]
_task_custom_attr: set = {'conda_python_version', 'is_create_environment', 'other_params', 'python_command', 'python_env_tool', 'python_path', 'requirements', 'script', 'script_params'}
_task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
_task_ignore_attr: set = {}
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
property condition_result: dict

Get attribute condition_result.

property cpu_quota

Get cpu_quota.

property environment_code: str

Convert environment name to code.

ext: set = None
ext_attr: str | types.FunctionType = None
property is_cache: str

Whether the cache is being set or not.

property local_params

Convert local params.

property memory_max

Get memory_max.

property other_params

Return other params.

property resource_list: list[dict[str, Resource]]

Get task define attribute resource_list.

property task_params: dict | None

Get task parameter object.

Will get result to combine _task_custom_attr and custom_attr.

property timeout: int

Get attribute timeout.

property timeout_flag: str

Whether the timeout attribute is being set or not.

property user_name: str | None

Return username of workflow.

property workflow: Workflow | None

Get attribute workflow.

class pydolphinscheduler.tasks.SageMaker(name: str, sagemaker_request_json: str, *args, **kwargs)[source]

Bases: BatchTask

Task SageMaker object, declare behavior for SageMaker task to dolphinscheduler.

Parameters:
  • name – A unique, meaningful string for the SageMaker task.

  • sagemaker_request_json – Request parameters of StartPipelineExecution, see also AWS API

_get_attr() set[str]

Get final task task_params attribute.

Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

add_in(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add input parameters.

Parameters:
  • name – name of the input parameter.

  • value – value of the input parameter.

It could be simply command:

task.add_in("a")
task.add_in("b", 123)
task.add_in("c", bool)
task.add_in("d", ParameterType.LONG(123))
add_out(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add output parameters.

Parameters:
  • name – name of the output parameter.

  • value – value of the output parameter.

It could be simply command:

task.add_out("a")
task.add_out("b", 123)
task.add_out("c", bool)
task.add_out("d", ParameterType.LONG(123))
gen_code_and_version() tuple

Generate task code and version from java gateway.

If task name do not exists in workflow before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_content()

Get the file content according to the resource plugin.

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

get_plugin()

Return the resource plug-in.

according to parameter resource_plugin and parameter workflow.resource_plugin.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'is_cache', 'name', 'task_execute_type', 'task_group_id', 'task_group_priority', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_downstream_task_codes: set[int]
_task_custom_attr: set = {'sagemaker_request_json'}
_task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
_task_ignore_attr: set = {}
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
property condition_result: dict

Get attribute condition_result.

property environment_code: str

Convert environment name to code.

ext: set = None
ext_attr: str | types.FunctionType = None
property is_cache: str

Whether the cache is being set or not.

property local_params

Convert local params.

property resource_list: list[dict[str, Resource]]

Get task define attribute resource_list.

property task_params: dict | None

Get task parameter object.

Will get result to combine _task_custom_attr and custom_attr.

property timeout: int

Get attribute timeout.

property timeout_flag: str

Whether the timeout attribute is being set or not.

property user_name: str | None

Return username of workflow.

property workflow: Workflow | None

Get attribute workflow.

class pydolphinscheduler.tasks.Shell(name: str, command: str, *args, **kwargs)[source]

Bases: WorkerResourceMixin, BatchTask

Task shell object, declare behavior for shell task to dolphinscheduler.

Parameters:
  • name – A unique, meaningful string for the shell task.

  • command

    One or more command want to run in this task.

    It could be simply command:

    Shell(name=..., command="echo task shell")
    

    or maybe same commands trying to do complex task:

    command = '''echo task shell step 1;
    echo task shell step 2;
    echo task shell step 3
    '''
    
    Shell(name=..., command=command)
    

_get_attr() set[str]

Get final task task_params attribute.

Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

add_attr(**kwargs)

Add attributes to WorkerResource, include cpu_quota and memory_max now.

add_in(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add input parameters.

Parameters:
  • name – name of the input parameter.

  • value – value of the input parameter.

It could be simply command:

task.add_in("a")
task.add_in("b", 123)
task.add_in("c", bool)
task.add_in("d", ParameterType.LONG(123))
add_out(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add output parameters.

Parameters:
  • name – name of the output parameter.

  • value – value of the output parameter.

It could be simply command:

task.add_out("a")
task.add_out("b", 123)
task.add_out("c", bool)
task.add_out("d", ParameterType.LONG(123))
gen_code_and_version() tuple

Generate task code and version from java gateway.

If task name do not exists in workflow before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_content()

Get the file content according to the resource plugin.

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

get_plugin()

Return the resource plug-in.

according to parameter resource_plugin and parameter workflow.resource_plugin.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'is_cache', 'name', 'task_execute_type', 'task_group_id', 'task_group_priority', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_downstream_task_codes: set[int]
_task_custom_attr: set = {'raw_script'}
_task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
_task_ignore_attr: set = {}
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
property condition_result: dict

Get attribute condition_result.

property cpu_quota

Get cpu_quota.

property environment_code: str

Convert environment name to code.

ext: set = {'.sh', '.zsh'}
ext_attr: str = '_raw_script'
property is_cache: str

Whether the cache is being set or not.

property local_params

Convert local params.

property memory_max

Get memory_max.

property resource_list: list[dict[str, Resource]]

Get task define attribute resource_list.

property task_params: dict | None

Get task parameter object.

Will get result to combine _task_custom_attr and custom_attr.

property timeout: int

Get attribute timeout.

property timeout_flag: str

Whether the timeout attribute is being set or not.

property user_name: str | None

Return username of workflow.

property workflow: Workflow | None

Get attribute workflow.

class pydolphinscheduler.tasks.Spark(name: str, main_class: str, main_package: str, program_type: ProgramType | None = 'SCALA', deploy_mode: DeployMode | None = 'cluster', app_name: str | None = None, driver_cores: int | None = 1, driver_memory: str | None = '512M', num_executors: int | None = 2, executor_memory: str | None = '2G', executor_cores: int | None = 2, main_args: str | None = None, others: str | None = None, *args, **kwargs)[source]

Bases: Engine

Task spark object, declare behavior for spark task to dolphinscheduler.

_get_attr() set[str]

Get final task task_params attribute.

Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

add_in(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add input parameters.

Parameters:
  • name – name of the input parameter.

  • value – value of the input parameter.

It could be simply command:

task.add_in("a")
task.add_in("b", 123)
task.add_in("c", bool)
task.add_in("d", ParameterType.LONG(123))
add_out(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add output parameters.

Parameters:
  • name – name of the output parameter.

  • value – value of the output parameter.

It could be simply command:

task.add_out("a")
task.add_out("b", 123)
task.add_out("c", bool)
task.add_out("d", ParameterType.LONG(123))
gen_code_and_version() tuple

Generate task code and version from java gateway.

If task name do not exists in workflow before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_content()

Get the file content according to the resource plugin.

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

get_jar_id() int

Get jar id from java gateway, a wrapper for get_resource_info().

get_plugin()

Return the resource plug-in.

according to parameter resource_plugin and parameter workflow.resource_plugin.

get_resource_info(program_type, main_package)

Get resource info from java gateway, contains resource id, name.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'is_cache', 'name', 'task_execute_type', 'task_group_id', 'task_group_priority', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_downstream_task_codes: set[int]
_task_custom_attr: set = {'app_name', 'deploy_mode', 'driver_cores', 'driver_memory', 'executor_cores', 'executor_memory', 'main_args', 'num_executors', 'others'}
_task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
_task_ignore_attr: set = {}
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
property condition_result: dict

Get attribute condition_result.

property environment_code: str

Convert environment name to code.

ext: set = None
ext_attr: str | types.FunctionType = None
property is_cache: str

Whether the cache is being set or not.

property local_params

Convert local params.

property resource_list: list[dict[str, Resource]]

Get task define attribute resource_list.

property task_params: dict

Override Task.task_params for engine children task.

children task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here.

property timeout: int

Get attribute timeout.

property timeout_flag: str

Whether the timeout attribute is being set or not.

property user_name: str | None

Return username of workflow.

property workflow: Workflow | None

Get attribute workflow.

class pydolphinscheduler.tasks.Sql(name: str, datasource_name: str, sql: str, datasource_type: str | None = None, sql_type: str | None = None, pre_statements: str | Sequence[str] | None = None, post_statements: str | Sequence[str] | None = None, display_rows: int | None = 10, *args, **kwargs)[source]

Bases: BatchTask

Task SQL object, declare behavior for SQL task to dolphinscheduler.

It should run sql job in multiply sql lik engine, such as: - ClickHouse - DB2 - HIVE - MySQL - Oracle - Postgresql - Presto - SQLServer You provider datasource_name contain connection information, it decisions which database type and database instance would run this sql.

Parameters:
  • name – SQL task name

  • datasource_name – datasource name in dolphinscheduler, the name must exists and must be online datasource instead of test.

  • sql – SQL statement, the sql script you want to run. Support resource plugin in this parameter.

  • sql_type – SQL type, whether sql statement is select query or not. If not provided, it will be auto detected according to sql statement using pydolphinscheduler.tasks.sql.Sql.sql_type(), and you can also set it manually. by SqlType.SELECT for query statement or SqlType.NOT_SELECT for not query statement.

  • pre_statements – SQL statements to be executed before the main SQL statement.

  • post_statements – SQL statements to be executed after the main SQL statement.

  • display_rows – The number of record rows number to be displayed in the SQL task log, default is 10.

_get_attr() set[str]

Get final task task_params attribute.

Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

add_in(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add input parameters.

Parameters:
  • name – name of the input parameter.

  • value – value of the input parameter.

It could be simply command:

task.add_in("a")
task.add_in("b", 123)
task.add_in("c", bool)
task.add_in("d", ParameterType.LONG(123))
add_out(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add output parameters.

Parameters:
  • name – name of the output parameter.

  • value – value of the output parameter.

It could be simply command:

task.add_out("a")
task.add_out("b", 123)
task.add_out("c", bool)
task.add_out("d", ParameterType.LONG(123))
gen_code_and_version() tuple

Generate task code and version from java gateway.

If task name do not exists in workflow before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_content()

Get the file content according to the resource plugin.

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

get_plugin()

Return the resource plug-in.

according to parameter resource_plugin and parameter workflow.resource_plugin.

static get_stm_list(stm: str | Sequence[str]) list[str][source]

Convert statement to str of list.

Parameters:

stm – statements string

Returns:

statements list

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'is_cache', 'name', 'task_execute_type', 'task_group_id', 'task_group_priority', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_downstream_task_codes: set[int]
_task_custom_attr: set = {'display_rows', 'post_statements', 'pre_statements', 'sql', 'sql_type'}
_task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
_task_ignore_attr: set = {}
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
property condition_result: dict

Get attribute condition_result.

property datasource: dict

Get datasource for procedure sql.

property environment_code: str

Convert environment name to code.

ext: set = {'.sql'}
ext_attr: str = '_sql'
property is_cache: str

Whether the cache is being set or not.

property local_params

Convert local params.

property resource_list: list[dict[str, Resource]]

Get task define attribute resource_list.

property sql_type: str

Judgement sql type, it will return the SQL type for type SELECT or NOT_SELECT.

If param_sql_type dot not specific, will use regexp to check which type of the SQL is. But if param_sql_type is specific will use the parameter overwrites the regexp way

property task_params: dict

Override Task.task_params for sql task.

sql task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here.

property timeout: int

Get attribute timeout.

property timeout_flag: str

Whether the timeout attribute is being set or not.

property user_name: str | None

Return username of workflow.

property workflow: Workflow | None

Get attribute workflow.

class pydolphinscheduler.tasks.SubWorkflow(name: str, workflow_name: str, *args, **kwargs)[source]

Bases: BatchTask

Task SubWorkflow object, declare behavior for SubWorkflow task to dolphinscheduler.

_get_attr() set[str]

Get final task task_params attribute.

Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

add_in(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add input parameters.

Parameters:
  • name – name of the input parameter.

  • value – value of the input parameter.

It could be simply command:

task.add_in("a")
task.add_in("b", 123)
task.add_in("c", bool)
task.add_in("d", ParameterType.LONG(123))
add_out(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add output parameters.

Parameters:
  • name – name of the output parameter.

  • value – value of the output parameter.

It could be simply command:

task.add_out("a")
task.add_out("b", 123)
task.add_out("c", bool)
task.add_out("d", ParameterType.LONG(123))
gen_code_and_version() tuple

Generate task code and version from java gateway.

If task name do not exists in workflow before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_content()

Get the file content according to the resource plugin.

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

get_plugin()

Return the resource plug-in.

according to parameter resource_plugin and parameter workflow.resource_plugin.

get_workflow_info(workflow_name: str) dict[source]

Get workflow info from java gateway, contains workflow id, name, code.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'is_cache', 'name', 'task_execute_type', 'task_group_id', 'task_group_priority', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_downstream_task_codes: set[int]
_task_custom_attr: set = {'process_definition_code'}
_task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
_task_ignore_attr: set = {}
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
property condition_result: dict

Get attribute condition_result.

property environment_code: str

Convert environment name to code.

ext: set = None
ext_attr: str | types.FunctionType = None
property is_cache: str

Whether the cache is being set or not.

property local_params

Convert local params.

property process_definition_code: str

Get workflow code, a wrapper for get_workflow_info().

We can not change this function name to workflow_code, because it is a keyword used in dolphinscheduler itself.

property resource_list: list[dict[str, Resource]]

Get task define attribute resource_list.

property task_params: dict | None

Get task parameter object.

Will get result to combine _task_custom_attr and custom_attr.

property timeout: int

Get attribute timeout.

property timeout_flag: str

Whether the timeout attribute is being set or not.

property user_name: str | None

Return username of workflow.

property workflow: Workflow | None

Get attribute workflow.

class pydolphinscheduler.tasks.Switch(name: str, condition: SwitchCondition, *args, **kwargs)[source]

Bases: BatchTask

Task switch object, declare behavior for switch task to dolphinscheduler.

Param of workflow or at least one local param of task must be set if task switch in this workflow.

_get_attr() set[str]

Get final task task_params attribute.

Base on _task_default_attr, append attribute from _task_custom_attr and subtract attribute from _task_ignore_attr.

_set_dep() None[source]

Set downstream according to parameter condition.

_set_deps(tasks: Task | Sequence[Task], upstream: bool = True) None

Set parameter tasks dependent to current task.

it is a wrapper for set_upstream() and set_downstream().

add_in(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add input parameters.

Parameters:
  • name – name of the input parameter.

  • value – value of the input parameter.

It could be simply command:

task.add_in("a")
task.add_in("b", 123)
task.add_in("c", bool)
task.add_in("d", ParameterType.LONG(123))
add_out(name: str, value: int | str | float | bool | BaseDataType | None = None)

Add output parameters.

Parameters:
  • name – name of the output parameter.

  • value – value of the output parameter.

It could be simply command:

task.add_out("a")
task.add_out("b", 123)
task.add_out("c", bool)
task.add_out("d", ParameterType.LONG(123))
gen_code_and_version() tuple

Generate task code and version from java gateway.

If task name do not exists in workflow before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version.

get_content()

Get the file content according to the resource plugin.

get_define(camel_attr: bool = True) dict

Get object definition attribute communicate to Java gateway server.

use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server.

get_define_custom(camel_attr: bool = True, custom_attr: set = None) dict

Get object definition attribute by given attr set.

get_plugin()

Return the resource plug-in.

according to parameter resource_plugin and parameter workflow.resource_plugin.

set_downstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as downstream to current task.

set_upstream(tasks: Task | Sequence[Task]) None

Set parameter tasks as upstream to current task.

DEFAULT_CONDITION_RESULT = {'failedNode': [''], 'successNode': ['']}
_DEFAULT_ATTR: dict = {}
_DEFINE_ATTR: set = {'code', 'delay_time', 'description', 'environment_code', 'fail_retry_interval', 'fail_retry_times', 'flag', 'is_cache', 'name', 'task_execute_type', 'task_group_id', 'task_group_priority', 'task_params', 'task_priority', 'task_type', 'timeout', 'timeout_flag', 'timeout_notify_strategy', 'version', 'worker_group'}
_KEY_ATTR: set = {'description', 'name'}
_downstream_task_codes: set[int]
_task_custom_attr: set = {}
_task_default_attr = {'condition_result', 'dependence', 'local_params', 'resource_list', 'wait_start_timeout'}
_task_ignore_attr: set = {'condition_result', 'dependence'}
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
property condition_result: dict

Get attribute condition_result.

property environment_code: str

Convert environment name to code.

ext: set = None
ext_attr: str | types.FunctionType = None
property is_cache: str

Whether the cache is being set or not.

property local_params

Convert local params.

property resource_list: list[dict[str, Resource]]

Get task define attribute resource_list.

property task_params: dict

Override Task.task_params for switch task.

switch task have some specials attribute switch, and in most of the task this attribute is None and use empty dict {} as default value. We do not use class attribute _task_custom_attr due to avoid attribute cover.

property timeout: int

Get attribute timeout.

property timeout_flag: str

Whether the timeout attribute is being set or not.

property user_name: str | None

Return username of workflow.

property workflow: Workflow | None

Get attribute workflow.

Constants

Constants for pydolphinscheduler.

class pydolphinscheduler.constants.DefaultTaskCodeNum[source]

Bases: str

Constants and default value for default task code number.

DEFAULT = 1
class pydolphinscheduler.constants.Delimiter[source]

Bases: str

Constants for delimiter.

BAR = '-'
COLON = ':'
DASH = '/'
DIRECTION = '->'
UNDERSCORE = '_'
class pydolphinscheduler.constants.IsCache[source]

Bases: str

Constants for Cache.

NO = 'NO'
YES = 'YES'
class pydolphinscheduler.constants.JavaGatewayDefault[source]

Bases: str

Constants and default value for java gateway.

RESULT_DATA = 'data'
RESULT_MESSAGE_KEYWORD = 'msg'
RESULT_MESSAGE_SUCCESS = 'success'
RESULT_STATUS_KEYWORD = 'status'
RESULT_STATUS_SUCCESS = 'SUCCESS'
class pydolphinscheduler.constants.Keyword[source]

Bases: str

Constants for keyword.

DATASOURCE_ID = 'id'
DATASOURCE_TYPE = 'type'
class pydolphinscheduler.constants.ResourceKey[source]

Bases: str

Constants for key of resource.

NAME = 'resourceName'
class pydolphinscheduler.constants.Symbol[source]

Bases: str

Constants for symbol.

BLANK = ' '
COMMA = ','
POINT = '.'
SLASH = '/'
UNDERLINE = '_'
class pydolphinscheduler.constants.TaskFlag[source]

Bases: str

Constants for task flag.

NO = 'NO'
YES = 'YES'
class pydolphinscheduler.constants.TaskPriority[source]

Bases: str

Constants for task priority.

HIGH = 'HIGH'
HIGHEST = 'HIGHEST'
LOW = 'LOW'
LOWEST = 'LOWEST'
MEDIUM = 'MEDIUM'
class pydolphinscheduler.constants.TaskTimeoutFlag[source]

Bases: str

Constants for task timeout flag.

OFF = 'CLOSE'
ON = 'OPEN'
class pydolphinscheduler.constants.TaskType[source]

Bases: str

Constants for task type, it will also show you which kind we support up to now.

CONDITIONS = 'CONDITIONS'
DATAX = 'DATAX'
DEPENDENT = 'DEPENDENT'
DVC = 'DVC'
HTTP = 'HTTP'
KUBERNETES = 'K8S'
MLFLOW = 'MLFLOW'
MR = 'MR'
OPENMLDB = 'OPENMLDB'
PROCEDURE = 'PROCEDURE'
PYTHON = 'PYTHON'
PYTORCH = 'PYTORCH'
SAGEMAKER = 'SAGEMAKER'
SHELL = 'SHELL'
SPARK = 'SPARK'
SQL = 'SQL'
SUB_WORKFLOW = 'SUB_PROCESS'
SWITCH = 'SWITCH'
class pydolphinscheduler.constants.Time[source]

Bases: str

Constants for date.

FMT_DASH_DATE = '%Y/%m/%d'
FMT_NO_COLON_TIME = '%H%M%S'
FMT_SHORT_DATE = '%Y%m%d'
FMT_STD_DATE = '%Y-%m-%d'
FMT_STD_TIME = '%H:%M:%S'
LEN_SHORT_DATE = 8
LEN_STD_DATE = 10
class pydolphinscheduler.constants.Version[source]

Bases: str

Constants for version match.

DS = 'dolphinscheduler'
FILE_NAME = 'version_ext'

Exceptions

Exceptions for pydolphinscheduler.

exception pydolphinscheduler.exceptions.PyDSBaseException[source]

Bases: Exception

Base exception for pydolphinscheduler.

exception pydolphinscheduler.exceptions.PyDSConfException[source]

Bases: PyDSBaseException

Exception for pydolphinscheduler configuration error.

exception pydolphinscheduler.exceptions.PyDSJavaGatewayException[source]

Bases: PyDSBaseException

Exception for pydolphinscheduler Java gateway error.

exception pydolphinscheduler.exceptions.PyDSParamException[source]

Bases: PyDSBaseException

Exception for pydolphinscheduler parameter verify error.

exception pydolphinscheduler.exceptions.PyDSTaskNoFoundException[source]

Bases: PyDSBaseException

Exception for pydolphinscheduler workflow task no found error.

exception pydolphinscheduler.exceptions.PyDSWorkflowNotAssignException[source]

Bases: PyDSBaseException

Exception for pydolphinscheduler workflow not assign error.

exception pydolphinscheduler.exceptions.PyResPluginException[source]

Bases: PyDSBaseException

Exception for pydolphinscheduler resource plugin error.