Source code for pydolphinscheduler.tasks.dependent
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Task dependent."""importwarningsfromtypingimportDict,Optional,Tuplefrompydolphinscheduler.constantsimportTaskTypefrompydolphinscheduler.core.taskimportTaskfrompydolphinscheduler.exceptionsimportPyDSJavaGatewayException,PyDSParamExceptionfrompydolphinscheduler.java_gatewayimportgatewayfrompydolphinscheduler.models.baseimportBaseDEPENDENT_ALL_TASK_IN_WORKFLOW="0"
[docs]classDependentDate(str):"""Constant of Dependent date value. These values set according to Java server models, if you want to add and change it, please change Java server models first. """# TODO Maybe we should add parent level to DependentDate for easy to use, such as# DependentDate.MONTH.THIS_MONTH# HourCURRENT_HOUR="currentHour"LAST_ONE_HOUR="last1Hour"LAST_TWO_HOURS="last2Hours"LAST_THREE_HOURS="last3Hours"LAST_TWENTY_FOUR_HOURS="last24Hours"# DayTODAY="today"LAST_ONE_DAYS="last1Days"LAST_TWO_DAYS="last2Days"LAST_THREE_DAYS="last3Days"LAST_SEVEN_DAYS="last7Days"# WeekTHIS_WEEK="thisWeek"LAST_WEEK="lastWeek"LAST_MONDAY="lastMonday"LAST_TUESDAY="lastTuesday"LAST_WEDNESDAY="lastWednesday"LAST_THURSDAY="lastThursday"LAST_FRIDAY="lastFriday"LAST_SATURDAY="lastSaturday"LAST_SUNDAY="lastSunday"# MonthTHIS_MONTH="thisMonth"LAST_MONTH="lastMonth"LAST_MONTH_BEGIN="lastMonthBegin"LAST_MONTH_END="lastMonthEnd"
[docs]classDependentItem(Base):"""Dependent item object, minimal unit for task dependent. It declares which project, workflow, task are dependent to this task. """_DEFINE_ATTR={"project_code","definition_code","dep_task_code","cycle","date_value",}# TODO maybe we should conside overwrite operator `and` and `or` for DependentItem to# support more easy way to set relationdef__init__(self,project_name:str,# TODO zhongjiajie should be also depeloped in 4.1.0workflow_name:Optional[str]=None,dependent_task_name:Optional[str]=DEPENDENT_ALL_TASK_IN_WORKFLOW,dependent_date:Optional[DependentDate]=DependentDate.TODAY,*args,**kwargs,):obj_name=(f"{project_name}.{workflow_name}.{dependent_task_name}.{dependent_date}")super().__init__(obj_name)self.project_name=project_nameifworkflow_nameisnotNone:self.workflow_name=workflow_nameelif"process_definition_name"inkwargs:warnings.warn("Parameter name `process_definition_name` is deprecated and will be remove in 4.1.0, ""please use `workflow_name` instead.",DeprecationWarning,stacklevel=2,)self.workflow_name=kwargs.pop("process_definition_name")else:raisePyDSParamException("Parameter `workflow_name` or `process_definition_name` is required, but got None.")self.dependent_task_name=dependent_task_nameifdependent_dateisNone:raisePyDSParamException("Parameter dependent_date must provider by got None.")else:self.dependent_date=dependent_dateself._code={}def__repr__(self)->str:return"depend_item_list"@propertydefproject_code(self)->str:"""Get dependent project code."""returnself.get_code_from_gateway().get("projectCode")@propertydefdefinition_code(self)->str:"""Get dependent definition code."""returnself.get_code_from_gateway().get("processDefinitionCode")@propertydefdep_task_code(self)->str:"""Get dependent tasks code list."""ifself.is_all_task:returnDEPENDENT_ALL_TASK_IN_WORKFLOWelse:returnself.get_code_from_gateway().get("taskDefinitionCode")# TODO Maybe we should get cycle from dependent date class.@propertydefcycle(self)->str:"""Get dependent cycle."""if"Hour"inself.dependent_date:return"hour"elifself.dependent_date=="today"or"Days"inself.dependent_date:return"day"elif"Month"inself.dependent_date:return"month"else:return"week"@propertydefdate_value(self)->str:"""Get dependent date."""returnself.dependent_date@propertydefis_all_task(self)->bool:"""Check whether dependent all tasks or not."""returnself.dependent_task_name==DEPENDENT_ALL_TASK_IN_WORKFLOW@propertydefcode_parameter(self)->Tuple:"""Get name info parameter to query code."""param=(self.project_name,self.workflow_name,self.dependent_task_nameifnotself.is_all_taskelseNone,)returnparam
[docs]defget_code_from_gateway(self)->Dict:"""Get project, definition, task code from given parameter."""ifself._code:returnself._codeelse:try:self._code=gateway.get_dependent_info(*self.code_parameter)returnself._codeexceptException:raisePyDSJavaGatewayException("Function get_code_from_gateway error.")
[docs]classDependentOperator(Base):"""Set DependentItem or dependItemList with specific operator."""_DEFINE_ATTR={"relation",}def__init__(self,*args):super().__init__(self.__class__.__name__)self.args=argsdef__repr__(self)->str:return"depend_task_list"
[docs]@classmethoddefoperator_name(cls)->str:"""Get operator name in different class."""returncls.__name__.upper()
@propertydefrelation(self)->str:"""Get operator name in different class, for function :func:`get_define`."""returnself.operator_name()
[docs]defset_define_attr(self)->str:"""Set attribute to function :func:`get_define`. It is a wrapper for both `And` and `Or` operator. """result=[]attr=Nonefordependentinself.args:ifisinstance(dependent,(DependentItem,DependentOperator)):ifattrisNone:attr=repr(dependent)elifrepr(dependent)!=attr:raisePyDSParamException("Dependent %s operator parameter only support same type.",self.relation,)else:raisePyDSParamException("Dependent %s operator parameter support DependentItem and ""DependentOperator but got %s.",(self.relation,type(dependent)),)result.append(dependent.get_define())setattr(self,attr,result)returnattr
[docs]defget_define(self,camel_attr=True)->Dict:"""Overwrite Base.get_define to get task dependent specific get define."""attr=self.set_define_attr()dependent_define_attr=self._DEFINE_ATTR.union({attr})returnsuper().get_define_custom(camel_attr=True,custom_attr=dependent_define_attr)
[docs]classAnd(DependentOperator):"""Operator And for task dependent. It could accept both :class:`DependentItem` and children of :class:`DependentOperator`, and set AND condition to those args. """def__init__(self,*args):super().__init__(*args)
[docs]classOr(DependentOperator):"""Operator Or for task dependent. It could accept both :class:`DependentItem` and children of :class:`DependentOperator`, and set OR condition to those args. """def__init__(self,*args):super().__init__(*args)
[docs]classDependent(Task):"""Task dependent object, declare behavior for dependent task to dolphinscheduler."""def__init__(self,name:str,dependence:DependentOperator,*args,**kwargs):super().__init__(name,TaskType.DEPENDENT,*args,**kwargs)self.dependence=dependence@propertydeftask_params(self,camel_attr:bool=True,custom_attr:set=None)->Dict:"""Override Task.task_params for dependent task. Dependent task have some specials attribute `dependence`, and in most of the task this attribute is None and use empty dict `{}` as default value. We do not use class attribute `_task_custom_attr` due to avoid attribute cover. """params=super().task_paramsparams["dependence"]=self.dependence.get_define()returnparams