# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Task Conditions."""
from __future__ import annotations
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.task import BatchTask, Task
from pydolphinscheduler.exceptions import PyDSParamException
from pydolphinscheduler.models.base import Base
[docs]
class Status(Base):
"""Base class of Condition task status.
It a parent class for :class:`SUCCESS` and :class:`FAILURE`. Provider status name
and :func:`get_define` to sub class.
"""
def __init__(self, *tasks):
super().__init__(f"Condition.{self.status_name()}")
self.tasks = tasks
def __repr__(self) -> str:
return "depend_item_list"
[docs]
@classmethod
def status_name(cls) -> str:
"""Get name for Status or its sub class."""
return cls.__name__.upper()
[docs]
def get_define(self, camel_attr: bool = True) -> list:
"""Get status definition attribute communicate to Java gateway server."""
content = []
for task in self.tasks:
if not isinstance(task, Task):
raise PyDSParamException(
"%s only accept class Task or sub class Task, but get %s",
(self.status_name(), type(task)),
)
content.append({"depTaskCode": task.code, "status": self.status_name()})
return content
[docs]
class SUCCESS(Status):
"""Class SUCCESS to task condition, sub class of :class:`Status`."""
def __init__(self, *tasks):
super().__init__(*tasks)
[docs]
class FAILURE(Status):
"""Class FAILURE to task condition, sub class of :class:`Status`."""
def __init__(self, *tasks):
super().__init__(*tasks)
[docs]
class ConditionOperator(Base):
"""Set ConditionTask or ConditionOperator with specific operator."""
_DEFINE_ATTR = {
"relation",
}
def __init__(self, *args):
super().__init__(self.__class__.__name__)
self.args = args
def __repr__(self) -> str:
return "depend_task_list"
[docs]
@classmethod
def operator_name(cls) -> str:
"""Get operator name in different class."""
return cls.__name__.upper()
@property
def relation(self) -> str:
"""Get operator name in different class, for function :func:`get_define`."""
return self.operator_name()
[docs]
def set_define_attr(self) -> str:
"""Set attribute to function :func:`get_define`.
It is a wrapper for both `And` and `Or` operator.
"""
result = []
attr = None
for condition in self.args:
if isinstance(condition, (Status, ConditionOperator)):
if attr is None:
attr = repr(condition)
elif repr(condition) != attr:
raise PyDSParamException(
"Condition %s operator parameter only support same type.",
self.relation,
)
else:
raise PyDSParamException(
"Condition %s operator parameter support ConditionTask and ConditionOperator but got %s.",
(self.relation, type(condition)),
)
if attr == "depend_item_list":
result.extend(condition.get_define())
else:
result.append(condition.get_define())
setattr(self, attr, result)
return attr
[docs]
def get_define(self, camel_attr=True) -> dict:
"""Overwrite Base.get_define to get task Condition specific get define."""
attr = self.set_define_attr()
dependent_define_attr = self._DEFINE_ATTR.union({attr})
return super().get_define_custom(
camel_attr=True, custom_attr=dependent_define_attr
)
[docs]
class And(ConditionOperator):
"""Operator And for task condition.
It could accept both :class:`Task` and children of :class:`ConditionOperator`,
and set AND condition to those args.
"""
def __init__(self, *args):
super().__init__(*args)
[docs]
class Or(ConditionOperator):
"""Operator Or for task condition.
It could accept both :class:`Task` and children of :class:`ConditionOperator`,
and set OR condition to those args.
"""
def __init__(self, *args):
super().__init__(*args)
[docs]
class Condition(BatchTask):
"""Task condition object, declare behavior for condition task to dolphinscheduler."""
def __init__(
self,
name: str,
condition: ConditionOperator,
success_task: Task,
failed_task: Task,
*args,
**kwargs,
):
super().__init__(name, TaskType.CONDITIONS, *args, **kwargs)
self.condition = condition
self.success_task = success_task
self.failed_task = failed_task
# Set condition tasks as current task downstream
self._set_dep()
[docs]
def _set_dep(self) -> None:
"""Set upstream according to parameter `condition`."""
upstream = []
for cond in self.condition.args:
if isinstance(cond, ConditionOperator):
for status in cond.args:
upstream.extend(list(status.tasks))
self.set_upstream(upstream)
self.set_downstream([self.success_task, self.failed_task])
@property
def condition_result(self) -> dict:
"""Get condition result define for java gateway."""
return {
"successNode": [self.success_task.code],
"failedNode": [self.failed_task.code],
}
@property
def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> dict:
"""Override Task.task_params for Condition task.
Condition task have some specials attribute `dependence`, and in most of the task
this attribute is None and use empty dict `{}` as default value. We do not use class
attribute `_task_custom_attr` due to avoid attribute cover.
"""
params = super().task_params
params["dependence"] = self.condition.get_define()
return params