# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Task dependent."""
from __future__ import annotations
import warnings
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.task import BatchTask
from pydolphinscheduler.exceptions import PyDSJavaGatewayException, PyDSParamException
from pydolphinscheduler.java_gateway import gateway
from pydolphinscheduler.models.base import Base
DEPENDENT_ALL_TASK_IN_WORKFLOW = "0"
[docs]
class DependentDate(str):
"""Constant of Dependent date value.
These values set according to Java server models, if you want to add and change it,
please change Java server models first.
"""
# TODO Maybe we should add parent level to DependentDate for easy to use, such as
# DependentDate.MONTH.THIS_MONTH
# Hour
CURRENT_HOUR = "currentHour"
LAST_ONE_HOUR = "last1Hour"
LAST_TWO_HOURS = "last2Hours"
LAST_THREE_HOURS = "last3Hours"
LAST_TWENTY_FOUR_HOURS = "last24Hours"
# Day
TODAY = "today"
LAST_ONE_DAYS = "last1Days"
LAST_TWO_DAYS = "last2Days"
LAST_THREE_DAYS = "last3Days"
LAST_SEVEN_DAYS = "last7Days"
# Week
THIS_WEEK = "thisWeek"
LAST_WEEK = "lastWeek"
LAST_MONDAY = "lastMonday"
LAST_TUESDAY = "lastTuesday"
LAST_WEDNESDAY = "lastWednesday"
LAST_THURSDAY = "lastThursday"
LAST_FRIDAY = "lastFriday"
LAST_SATURDAY = "lastSaturday"
LAST_SUNDAY = "lastSunday"
# Month
THIS_MONTH = "thisMonth"
LAST_MONTH = "lastMonth"
LAST_MONTH_BEGIN = "lastMonthBegin"
LAST_MONTH_END = "lastMonthEnd"
[docs]
class DependentItem(Base):
"""Dependent item object, minimal unit for task dependent.
It declares which project, workflow, task are dependent to this task.
"""
_DEFINE_ATTR = {
"project_code",
"definition_code",
"dep_task_code",
"cycle",
"date_value",
}
# TODO maybe we should conside overwrite operator `and` and `or` for DependentItem to
# support more easy way to set relation
def __init__(
self,
project_name: str,
# TODO zhongjiajie should be also depeloped in 4.1.0
workflow_name: str | None = None,
dependent_task_name: str | None = DEPENDENT_ALL_TASK_IN_WORKFLOW,
dependent_date: DependentDate | None = DependentDate.TODAY,
*args,
**kwargs,
):
obj_name = (
f"{project_name}.{workflow_name}.{dependent_task_name}.{dependent_date}"
)
super().__init__(obj_name)
self.project_name = project_name
if workflow_name is not None:
self.workflow_name = workflow_name
elif "process_definition_name" in kwargs:
warnings.warn(
"Parameter name `process_definition_name` is deprecated and will be remove in 4.1.0, "
"please use `workflow_name` instead.",
DeprecationWarning,
stacklevel=2,
)
self.workflow_name = kwargs.pop("process_definition_name")
else:
raise PyDSParamException(
"Parameter `workflow_name` or `process_definition_name` is required, but got None."
)
self.dependent_task_name = dependent_task_name
if dependent_date is None:
raise PyDSParamException(
"Parameter dependent_date must provider by got None."
)
else:
self.dependent_date = dependent_date
self._code = {}
def __repr__(self) -> str:
return "depend_item_list"
@property
def project_code(self) -> str:
"""Get dependent project code."""
return self.get_code_from_gateway().get("projectCode")
@property
def definition_code(self) -> str:
"""Get dependent definition code."""
return self.get_code_from_gateway().get("processDefinitionCode")
@property
def dep_task_code(self) -> str:
"""Get dependent tasks code list."""
if self.is_all_task:
return DEPENDENT_ALL_TASK_IN_WORKFLOW
else:
return self.get_code_from_gateway().get("taskDefinitionCode")
# TODO Maybe we should get cycle from dependent date class.
@property
def cycle(self) -> str:
"""Get dependent cycle."""
if "Hour" in self.dependent_date:
return "hour"
elif self.dependent_date == "today" or "Days" in self.dependent_date:
return "day"
elif "Month" in self.dependent_date:
return "month"
else:
return "week"
@property
def date_value(self) -> str:
"""Get dependent date."""
return self.dependent_date
@property
def is_all_task(self) -> bool:
"""Check whether dependent all tasks or not."""
return self.dependent_task_name == DEPENDENT_ALL_TASK_IN_WORKFLOW
@property
def code_parameter(self) -> tuple:
"""Get name info parameter to query code."""
param = (
self.project_name,
self.workflow_name,
self.dependent_task_name if not self.is_all_task else None,
)
return param
[docs]
def get_code_from_gateway(self) -> dict:
"""Get project, definition, task code from given parameter."""
if self._code:
return self._code
else:
try:
self._code = gateway.get_dependent_info(*self.code_parameter)
return self._code
except Exception:
raise PyDSJavaGatewayException("Function get_code_from_gateway error.")
[docs]
class DependentOperator(Base):
"""Set DependentItem or dependItemList with specific operator."""
_DEFINE_ATTR = {
"relation",
}
def __init__(self, *args):
super().__init__(self.__class__.__name__)
self.args = args
def __repr__(self) -> str:
return "depend_task_list"
[docs]
@classmethod
def operator_name(cls) -> str:
"""Get operator name in different class."""
return cls.__name__.upper()
@property
def relation(self) -> str:
"""Get operator name in different class, for function :func:`get_define`."""
return self.operator_name()
[docs]
def set_define_attr(self) -> str:
"""Set attribute to function :func:`get_define`.
It is a wrapper for both `And` and `Or` operator.
"""
result = []
attr = None
for dependent in self.args:
if isinstance(dependent, (DependentItem, DependentOperator)):
if attr is None:
attr = repr(dependent)
elif repr(dependent) != attr:
raise PyDSParamException(
"Dependent %s operator parameter only support same type.",
self.relation,
)
else:
raise PyDSParamException(
"Dependent %s operator parameter support DependentItem and "
"DependentOperator but got %s.",
(self.relation, type(dependent)),
)
result.append(dependent.get_define())
setattr(self, attr, result)
return attr
[docs]
def get_define(self, camel_attr=True) -> dict:
"""Overwrite Base.get_define to get task dependent specific get define."""
attr = self.set_define_attr()
dependent_define_attr = self._DEFINE_ATTR.union({attr})
return super().get_define_custom(
camel_attr=True, custom_attr=dependent_define_attr
)
[docs]
class And(DependentOperator):
"""Operator And for task dependent.
It could accept both :class:`DependentItem` and children of :class:`DependentOperator`,
and set AND condition to those args.
"""
def __init__(self, *args):
super().__init__(*args)
[docs]
class Or(DependentOperator):
"""Operator Or for task dependent.
It could accept both :class:`DependentItem` and children of :class:`DependentOperator`,
and set OR condition to those args.
"""
def __init__(self, *args):
super().__init__(*args)
[docs]
class Dependent(BatchTask):
"""Task dependent object, declare behavior for dependent task to dolphinscheduler."""
def __init__(self, name: str, dependence: DependentOperator, *args, **kwargs):
super().__init__(name, TaskType.DEPENDENT, *args, **kwargs)
self.dependence = dependence
@property
def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> dict:
"""Override Task.task_params for dependent task.
Dependent task have some specials attribute `dependence`, and in most of the task
this attribute is None and use empty dict `{}` as default value. We do not use class
attribute `_task_custom_attr` due to avoid attribute cover.
"""
params = super().task_params
params["dependence"] = self.dependence.get_define()
return params