Source code for pydolphinscheduler.core.task

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

"""DolphinScheduler Task and TaskRelation object."""

from __future__ import annotations

import copy
import types
import warnings
from collections.abc import Sequence
from datetime import timedelta
from logging import getLogger

from pydolphinscheduler import configuration
from pydolphinscheduler.constants import (
    Delimiter,
    IsCache,
    ResourceKey,
    Symbol,
    TaskFlag,
    TaskPriority,
    TaskTimeoutFlag,
)
from pydolphinscheduler.core.parameter import BaseDataType, Direction, ParameterHelper
from pydolphinscheduler.core.resource import Resource
from pydolphinscheduler.core.resource_plugin import ResourcePlugin
from pydolphinscheduler.core.workflow import Workflow, WorkflowContext
from pydolphinscheduler.exceptions import PyDSParamException, PyResPluginException
from pydolphinscheduler.java_gateway import gateway
from pydolphinscheduler.models import Base
from pydolphinscheduler.utils.date import timedelta2timeout

logger = getLogger(__name__)


class TaskRelation(Base):
    """TaskRelation object, describe the relation of exactly two tasks."""

    # Add attr `_KEY_ATTR` to overwrite :func:`__eq__`, it is make set
    # `Task.workflow._task_relations` work correctly.
    _KEY_ATTR = {
        "pre_task_code",
        "post_task_code",
    }

    _DEFINE_ATTR = {
        "pre_task_code",
        "post_task_code",
    }

    _DEFAULT_ATTR = {
        "name": "",
        "preTaskVersion": 1,
        "postTaskVersion": 1,
        "conditionType": 0,
        "conditionParams": {},
    }

    def __init__(
        self,
        pre_task_code: int,
        post_task_code: int,
        name: str | None = None,
    ):
        super().__init__(name)
        self.pre_task_code = pre_task_code
        self.post_task_code = post_task_code

    def __hash__(self):
        return hash(f"{self.pre_task_code} {Delimiter.DIRECTION} {self.post_task_code}")


[docs] class Task(Base): """Task object, parent class for all exactly task type. :param name: The name of the task. Node names within the same workflow must be unique. :param task_type: :param description: default None :param flag: default TaskFlag.YES, :param task_priority: default TaskPriority.MEDIUM :param worker_group: default configuration.WORKFLOW_WORKER_GROUP :param environment_name: default None :param task_group_id: Identify of task group to restrict the parallelism of tasks instance run, default 0. :param task_group_priority: Priority for same task group to, the higher the value, the higher the priority, default 0. :param delay_time: deault 0 :param fail_retry_times: default 0 :param fail_retry_interval: default 1 :param timeout_notify_strategy: default, None :param timeout: Timeout attribute for task, in minutes. Task is consider as timed out task when the running time of a task exceeds than this value. when data type is :class:`datetime.timedelta` will be converted to int(in minutes). default ``None`` :param resource_list: default None :param wait_start_timeout: default None :param condition_result: default None, :param resource_plugin: default None :param is_cache: default False :param input_params: default None, input parameters, {param_name: param_value} :param output_params: default None, input parameters, {param_name: param_value} """ _DEFINE_ATTR = { "name", "code", "version", "task_type", "task_params", "description", "flag", "task_priority", "worker_group", "environment_code", "delay_time", "fail_retry_times", "fail_retry_interval", "task_group_id", "task_group_priority", "timeout_flag", "timeout_notify_strategy", "timeout", "is_cache", } # task default attribute will into `task_params` property _task_default_attr = { "local_params", "resource_list", "dependence", "wait_start_timeout", "condition_result", } # task attribute ignore from _task_default_attr and will not into `task_params` property _task_ignore_attr: set = set() # task custom attribute define in sub class and will append to `task_params` property _task_custom_attr: set = set() ext: set = None ext_attr: str | types.FunctionType = None DEFAULT_CONDITION_RESULT = {"successNode": [""], "failedNode": [""]} def __init__( self, name: str, task_type: str, description: str | None = None, flag: str | None = TaskFlag.YES, task_priority: str | None = TaskPriority.MEDIUM, worker_group: str | None = configuration.WORKFLOW_WORKER_GROUP, environment_name: str | None = None, task_group_id: int | None = 0, task_group_priority: int | None = 0, delay_time: int | None = 0, fail_retry_times: int | None = 0, fail_retry_interval: int | None = 1, timeout_notify_strategy: str | None = None, timeout: timedelta | int | None = None, workflow: Workflow | None = None, resource_list: list | None = None, dependence: dict | None = None, wait_start_timeout: dict | None = None, condition_result: dict | None = None, resource_plugin: ResourcePlugin | None = None, is_cache: bool | None = False, input_params: dict | None = None, output_params: dict | None = None, *args, **kwargs, ): super().__init__(name, description) self.task_type = task_type self.flag = flag self._is_cache = is_cache self.task_priority = task_priority self.worker_group = worker_group self._environment_name = environment_name self.task_group_id = task_group_id self.task_group_priority = task_group_priority self.fail_retry_times = fail_retry_times self.fail_retry_interval = fail_retry_interval self.delay_time = delay_time self.timeout_notify_strategy = timeout_notify_strategy self._timeout: timedelta | int = timeout self._workflow = None self._input_params = input_params or {} self._output_params = output_params or {} if "process_definition" in kwargs: warnings.warn( "The `process_definition` parameter is deprecated, please use `workflow` instead.", DeprecationWarning, ) self.workflow = kwargs.pop("process_definition") else: self.workflow: Workflow = workflow or WorkflowContext.get() if "local_params" in kwargs: warnings.warn( """The `local_params` parameter is deprecated, please use `input_params` and `output_params` instead. """, DeprecationWarning, ) self._local_params = kwargs.get("local_params") self._upstream_task_codes: set[int] = set() self._downstream_task_codes: set[int] = set() self._task_relation: set[TaskRelation] = set() # move attribute code and version after _workflow and workflow declare self.code, self.version = self.gen_code_and_version() # Add task to workflow, maybe we could put into property workflow latter if self.workflow is not None and self.code not in self.workflow.tasks: self.workflow.add_task(self) else: logger.warning( "Task code %d already in workflow, prohibit re-add task.", self.code, ) # Attribute for task param self._resource_list = resource_list or [] self.dependence = dependence or {} self.wait_start_timeout = wait_start_timeout or {} self._condition_result = condition_result or self.DEFAULT_CONDITION_RESULT self.resource_plugin = resource_plugin self.get_content() @property def workflow(self) -> Workflow | None: """Get attribute workflow.""" return self._workflow @workflow.setter def workflow(self, workflow: Workflow | None): """Set attribute workflow.""" self._workflow = workflow @property def timeout(self) -> int: """Get attribute timeout.""" if isinstance(self._timeout, int): if self._timeout < 0: raise PyDSParamException("The timeout value must be greater than 0") return self._timeout return timedelta2timeout(self._timeout) if self._timeout else 0 @property def timeout_flag(self) -> str: """Whether the timeout attribute is being set or not.""" return TaskTimeoutFlag.ON if self._timeout else TaskTimeoutFlag.OFF @property def is_cache(self) -> str: """Whether the cache is being set or not.""" if isinstance(self._is_cache, bool): return IsCache.YES if self._is_cache else IsCache.NO else: raise PyDSParamException("is_cache must be a bool") @property def resource_list(self) -> list[dict[str, Resource]]: """Get task define attribute `resource_list`.""" resources = set() for res in self._resource_list: if isinstance(res, str): resources.add( Resource( name=res, user_name=self.user_name ).get_fullname_from_database() ) elif isinstance(res, dict) and ResourceKey.NAME in res: warnings.warn( """`resource_list` should be defined using List[str] with resource paths, the use of ids to define resources will be remove in version 3.2.0. """, DeprecationWarning, stacklevel=2, ) resources.add(res.get(ResourceKey.NAME)) return [{ResourceKey.NAME: r} for r in resources] @property def user_name(self) -> str | None: """Return username of workflow.""" if self.workflow: return self.workflow.user.name else: raise PyDSParamException("`user_name` cannot be empty.") @property def condition_result(self) -> dict: """Get attribute condition_result.""" return self._condition_result @condition_result.setter def condition_result(self, condition_result: dict | None): """Set attribute condition_result.""" self._condition_result = condition_result
[docs] def _get_attr(self) -> set[str]: """Get final task task_params attribute. Base on `_task_default_attr`, append attribute from `_task_custom_attr` and subtract attribute from `_task_ignore_attr`. """ attr = copy.deepcopy(self._task_default_attr) attr -= self._task_ignore_attr attr |= self._task_custom_attr return attr
@property def task_params(self) -> dict | None: """Get task parameter object. Will get result to combine _task_custom_attr and custom_attr. """ custom_attr = self._get_attr() return self.get_define_custom(custom_attr=custom_attr)
[docs] def get_plugin(self): """Return the resource plug-in. according to parameter resource_plugin and parameter workflow.resource_plugin. """ if self.resource_plugin is None: if self.workflow.resource_plugin is not None: return self.workflow.resource_plugin else: raise PyResPluginException( "The execution command of this task is a file, but the resource plugin is empty" ) else: return self.resource_plugin
[docs] def get_content(self): """Get the file content according to the resource plugin.""" if self.ext_attr is None and self.ext is None: return _ext_attr = getattr(self, self.ext_attr) if _ext_attr is not None: if isinstance(_ext_attr, str) and _ext_attr.endswith(tuple(self.ext)): res = self.get_plugin() content = res.read_file(_ext_attr) setattr(self, self.ext_attr.lstrip(Symbol.UNDERLINE), content) else: if self.resource_plugin is not None or ( self.workflow is not None and self.workflow.resource_plugin is not None ): index = _ext_attr.rfind(Symbol.POINT) if index != -1: raise ValueError( f"This task does not support files with suffix {_ext_attr[index:]}," f"only supports {Symbol.COMMA.join(str(suf) for suf in self.ext)}" ) setattr(self, self.ext_attr.lstrip(Symbol.UNDERLINE), _ext_attr)
def __hash__(self): return hash(self.code) def __lshift__(self, other: Task | Sequence[Task]): """Implement Task << Task.""" self.set_upstream(other) return other def __rshift__(self, other: Task | Sequence[Task]): """Implement Task >> Task.""" self.set_downstream(other) return other def __rrshift__(self, other: Task | Sequence[Task]): """Call for Task >> [Task] because list don't have __rshift__ operators.""" self.__lshift__(other) return self def __rlshift__(self, other: Task | Sequence[Task]): """Call for Task << [Task] because list don't have __lshift__ operators.""" self.__rshift__(other) return self
[docs] def _set_deps(self, tasks: Task | Sequence[Task], upstream: bool = True) -> None: """Set parameter tasks dependent to current task. it is a wrapper for :func:`set_upstream` and :func:`set_downstream`. """ if not isinstance(tasks, Sequence): tasks = [tasks] for task in tasks: if upstream: self._upstream_task_codes.add(task.code) task._downstream_task_codes.add(self.code) if self._workflow: task_relation = TaskRelation( pre_task_code=task.code, post_task_code=self.code, name=f"{task.name} {Delimiter.DIRECTION} {self.name}", ) self.workflow._task_relations.add(task_relation) else: self._downstream_task_codes.add(task.code) task._upstream_task_codes.add(self.code) if self._workflow: task_relation = TaskRelation( pre_task_code=self.code, post_task_code=task.code, name=f"{self.name} {Delimiter.DIRECTION} {task.name}", ) self.workflow._task_relations.add(task_relation)
[docs] def set_upstream(self, tasks: Task | Sequence[Task]) -> None: """Set parameter tasks as upstream to current task.""" self._set_deps(tasks, upstream=True)
[docs] def set_downstream(self, tasks: Task | Sequence[Task]) -> None: """Set parameter tasks as downstream to current task.""" self._set_deps(tasks, upstream=False)
# TODO code should better generate in bulk mode when :ref: workflow run submit or start
[docs] def gen_code_and_version(self) -> tuple: """Generate task code and version from java gateway. If task name do not exists in workflow before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version. """ # TODO get code from specific project workflow and task name result = gateway.get_code_and_version( self.workflow._project, self.workflow.name, self.name ) # result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT) # gateway_result_checker(result) return result.get("code"), result.get("version")
@property def environment_code(self) -> str: """Convert environment name to code.""" if self._environment_name is None: return None return gateway.query_environment_info(self._environment_name) @property def local_params(self): """Convert local params.""" local_params = ( copy.deepcopy(self._local_params) if hasattr(self, "_local_params") else [] ) local_params.extend( ParameterHelper.convert_params(self._input_params, Direction.IN) ) local_params.extend( ParameterHelper.convert_params(self._output_params, Direction.OUT) ) return local_params
[docs] def add_in( self, name: str, value: int | str | float | bool | BaseDataType | None = None, ): """Add input parameters. :param name: name of the input parameter. :param value: value of the input parameter. It could be simply command:: task.add_in("a") task.add_in("b", 123) task.add_in("c", bool) task.add_in("d", ParameterType.LONG(123)) """ self._input_params[name] = value
[docs] def add_out( self, name: str, value: int | str | float | bool | BaseDataType | None = None, ): """Add output parameters. :param name: name of the output parameter. :param value: value of the output parameter. It could be simply command:: task.add_out("a") task.add_out("b", 123) task.add_out("c", bool) task.add_out("d", ParameterType.LONG(123)) """ self._output_params[name] = value
class BatchTask(Task): """Task object, parent class for all exactly task type. :param name: The name of the task. Node names within the same workflow must be unique. :param task_type: :param description: default None :param flag: default TaskFlag.YES, :param task_priority: default TaskPriority.MEDIUM :param worker_group: default configuration.WORKFLOW_WORKER_GROUP :param environment_name: default None :param task_group_id: Identify of task group to restrict the parallelism of tasks instance run, default 0. :param task_group_priority: Priority for same task group to, the higher the value, the higher the priority, default 0. :param delay_time: deault 0 :param fail_retry_times: default 0 :param fail_retry_interval: default 1 :param timeout_notify_strategy: default, None :param timeout: Timeout attribute for task, in minutes. Task is consider as timed out task when the running time of a task exceeds than this value. when data type is :class:`datetime.timedelta` will be converted to int(in minutes). default ``None`` :param resource_list: default None :param wait_start_timeout: default None :param condition_result: default None, :param resource_plugin: default None :param is_cache: default False :param input_params: default None, input parameters, {param_name: param_value} :param output_params: default None, input parameters, {param_name: param_value} """ _DEFINE_ATTR = Task._DEFINE_ATTR | {"task_execute_type"} def __init__( self, name: str, task_type: str, description: str | None = None, flag: str | None = TaskFlag.YES, task_priority: str | None = TaskPriority.MEDIUM, worker_group: str | None = configuration.WORKFLOW_WORKER_GROUP, environment_name: str | None = None, task_group_id: int | None = 0, task_group_priority: int | None = 0, delay_time: int | None = 0, fail_retry_times: int | None = 0, fail_retry_interval: int | None = 1, timeout_notify_strategy: str | None = None, timeout: timedelta | int | None = None, workflow: Workflow | None = None, resource_list: list | None = None, dependence: dict | None = None, wait_start_timeout: dict | None = None, condition_result: dict | None = None, resource_plugin: ResourcePlugin | None = None, is_cache: bool | None = False, input_params: dict | None = None, output_params: dict | None = None, *args, **kwargs, ): super().__init__( name, task_type, description, flag, task_priority, worker_group, environment_name, task_group_id, task_group_priority, delay_time, fail_retry_times, fail_retry_interval, timeout_notify_strategy, timeout, workflow, resource_list, dependence, wait_start_timeout, condition_result, resource_plugin, is_cache, input_params, output_params, *args, **kwargs, ) self.task_execute_type = "BATCH" class StreamTask(Task): """Task object, parent class for all exactly task type. :param name: The name of the task. Node names within the same workflow must be unique. :param task_type: :param description: default None :param flag: default TaskFlag.YES, :param task_priority: default TaskPriority.MEDIUM :param worker_group: default configuration.WORKFLOW_WORKER_GROUP :param environment_name: default None :param task_group_id: Identify of task group to restrict the parallelism of tasks instance run, default 0. :param task_group_priority: Priority for same task group to, the higher the value, the higher the priority, default 0. :param delay_time: deault 0 :param fail_retry_times: default 0 :param fail_retry_interval: default 1 :param timeout_notify_strategy: default, None :param timeout: Timeout attribute for task, in minutes. Task is consider as timed out task when the running time of a task exceeds than this value. when data type is :class:`datetime.timedelta` will be converted to int(in minutes). default ``None`` :param resource_list: default None :param wait_start_timeout: default None :param condition_result: default None, :param resource_plugin: default None :param is_cache: default False :param input_params: default None, input parameters, {param_name: param_value} :param output_params: default None, input parameters, {param_name: param_value} """ _DEFINE_ATTR = Task._DEFINE_ATTR | {"task_execute_type"} def __init__( self, name: str, task_type: str, description: str | None = None, flag: str | None = TaskFlag.YES, task_priority: str | None = TaskPriority.MEDIUM, worker_group: str | None = configuration.WORKFLOW_WORKER_GROUP, environment_name: str | None = None, task_group_id: int | None = 0, task_group_priority: int | None = 0, delay_time: int | None = 0, fail_retry_times: int | None = 0, fail_retry_interval: int | None = 1, timeout_notify_strategy: str | None = None, timeout: timedelta | int | None = None, workflow: Workflow | None = None, resource_list: list | None = None, dependence: dict | None = None, wait_start_timeout: dict | None = None, condition_result: dict | None = None, resource_plugin: ResourcePlugin | None = None, is_cache: bool | None = False, input_params: dict | None = None, output_params: dict | None = None, *args, **kwargs, ): super().__init__( name, task_type, description, flag, task_priority, worker_group, environment_name, task_group_id, task_group_priority, delay_time, fail_retry_times, fail_retry_interval, timeout_notify_strategy, timeout, workflow, resource_list, dependence, wait_start_timeout, condition_result, resource_plugin, is_cache, input_params, output_params, *args, **kwargs, ) self.task_execute_type = "STREAM"