Source code for pydolphinscheduler.tasks.condition

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

"""Task Conditions."""

from __future__ import annotations

from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.task import BatchTask, Task
from pydolphinscheduler.exceptions import PyDSParamException
from pydolphinscheduler.models.base import Base


[docs] class Status(Base): """Base class of Condition task status. It a parent class for :class:`SUCCESS` and :class:`FAILURE`. Provider status name and :func:`get_define` to sub class. """ def __init__(self, *tasks): super().__init__(f"Condition.{self.status_name()}") self.tasks = tasks def __repr__(self) -> str: return "depend_item_list"
[docs] @classmethod def status_name(cls) -> str: """Get name for Status or its sub class.""" return cls.__name__.upper()
[docs] def get_define(self, camel_attr: bool = True) -> list: """Get status definition attribute communicate to Java gateway server.""" content = [] for task in self.tasks: if not isinstance(task, Task): raise PyDSParamException( "%s only accept class Task or sub class Task, but get %s", (self.status_name(), type(task)), ) content.append({"depTaskCode": task.code, "status": self.status_name()}) return content
[docs] class SUCCESS(Status): """Class SUCCESS to task condition, sub class of :class:`Status`.""" def __init__(self, *tasks): super().__init__(*tasks)
[docs] class FAILURE(Status): """Class FAILURE to task condition, sub class of :class:`Status`.""" def __init__(self, *tasks): super().__init__(*tasks)
[docs] class ConditionOperator(Base): """Set ConditionTask or ConditionOperator with specific operator.""" _DEFINE_ATTR = { "relation", } def __init__(self, *args): super().__init__(self.__class__.__name__) self.args = args def __repr__(self) -> str: return "depend_task_list"
[docs] @classmethod def operator_name(cls) -> str: """Get operator name in different class.""" return cls.__name__.upper()
@property def relation(self) -> str: """Get operator name in different class, for function :func:`get_define`.""" return self.operator_name()
[docs] def set_define_attr(self) -> str: """Set attribute to function :func:`get_define`. It is a wrapper for both `And` and `Or` operator. """ result = [] attr = None for condition in self.args: if isinstance(condition, (Status, ConditionOperator)): if attr is None: attr = repr(condition) elif repr(condition) != attr: raise PyDSParamException( "Condition %s operator parameter only support same type.", self.relation, ) else: raise PyDSParamException( "Condition %s operator parameter support ConditionTask and ConditionOperator but got %s.", (self.relation, type(condition)), ) if attr == "depend_item_list": result.extend(condition.get_define()) else: result.append(condition.get_define()) setattr(self, attr, result) return attr
[docs] def get_define(self, camel_attr=True) -> dict: """Overwrite Base.get_define to get task Condition specific get define.""" attr = self.set_define_attr() dependent_define_attr = self._DEFINE_ATTR.union({attr}) return super().get_define_custom( camel_attr=True, custom_attr=dependent_define_attr )
[docs] class And(ConditionOperator): """Operator And for task condition. It could accept both :class:`Task` and children of :class:`ConditionOperator`, and set AND condition to those args. """ def __init__(self, *args): super().__init__(*args)
[docs] class Or(ConditionOperator): """Operator Or for task condition. It could accept both :class:`Task` and children of :class:`ConditionOperator`, and set OR condition to those args. """ def __init__(self, *args): super().__init__(*args)
[docs] class Condition(BatchTask): """Task condition object, declare behavior for condition task to dolphinscheduler.""" def __init__( self, name: str, condition: ConditionOperator, success_task: Task, failed_task: Task, *args, **kwargs, ): super().__init__(name, TaskType.CONDITIONS, *args, **kwargs) self.condition = condition self.success_task = success_task self.failed_task = failed_task # Set condition tasks as current task downstream self._set_dep()
[docs] def _set_dep(self) -> None: """Set upstream according to parameter `condition`.""" upstream = [] for cond in self.condition.args: if isinstance(cond, ConditionOperator): for status in cond.args: upstream.extend(list(status.tasks)) self.set_upstream(upstream) self.set_downstream([self.success_task, self.failed_task])
@property def condition_result(self) -> dict: """Get condition result define for java gateway.""" return { "successNode": [self.success_task.code], "failedNode": [self.failed_task.code], } @property def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> dict: """Override Task.task_params for Condition task. Condition task have some specials attribute `dependence`, and in most of the task this attribute is None and use empty dict `{}` as default value. We do not use class attribute `_task_custom_attr` due to avoid attribute cover. """ params = super().task_params params["dependence"] = self.condition.get_define() return params