# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""DolphinScheduler Task and TaskRelation object."""importcopyfromloggingimportgetLoggerfromtypingimportDict,List,Optional,Sequence,Set,Tuple,Unionfrompydolphinschedulerimportconfigurationfrompydolphinscheduler.constantsimport(Delimiter,ResourceKey,TaskFlag,TaskPriority,TaskTimeoutFlag,)frompydolphinscheduler.core.process_definitionimport(ProcessDefinition,ProcessDefinitionContext,)frompydolphinscheduler.core.resourceimportResourcefrompydolphinscheduler.exceptionsimportPyDSParamExceptionfrompydolphinscheduler.java_gatewayimportJavaGatefrompydolphinscheduler.modelsimportBaselogger=getLogger(__name__)classTaskRelation(Base):"""TaskRelation object, describe the relation of exactly two tasks."""# Add attr `_KEY_ATTR` to overwrite :func:`__eq__`, it is make set# `Task.process_definition._task_relations` work correctly._KEY_ATTR={"pre_task_code","post_task_code",}_DEFINE_ATTR={"pre_task_code","post_task_code",}_DEFAULT_ATTR={"name":"","preTaskVersion":1,"postTaskVersion":1,"conditionType":0,"conditionParams":{},}def__init__(self,pre_task_code:int,post_task_code:int,name:Optional[str]=None,):super().__init__(name)self.pre_task_code=pre_task_codeself.post_task_code=post_task_codedef__hash__(self):returnhash(f"{self.pre_task_code}{Delimiter.DIRECTION}{self.post_task_code}")
[docs]classTask(Base):"""Task object, parent class for all exactly task type."""_DEFINE_ATTR={"name","code","version","task_type","task_params","description","flag","task_priority","worker_group","environment_code","delay_time","fail_retry_times","fail_retry_interval","timeout_flag","timeout_notify_strategy","timeout",}# task default attribute will into `task_params` property_task_default_attr={"local_params","resource_list","dependence","wait_start_timeout","condition_result",}# task attribute ignore from _task_default_attr and will not into `task_params` property_task_ignore_attr:set=set()# task custom attribute define in sub class and will append to `task_params` property_task_custom_attr:set=set()DEFAULT_CONDITION_RESULT={"successNode":[""],"failedNode":[""]}def__init__(self,name:str,task_type:str,description:Optional[str]=None,flag:Optional[str]=TaskFlag.YES,task_priority:Optional[str]=TaskPriority.MEDIUM,worker_group:Optional[str]=configuration.WORKFLOW_WORKER_GROUP,environment_name:Optional[str]=None,delay_time:Optional[int]=0,fail_retry_times:Optional[int]=0,fail_retry_interval:Optional[int]=1,timeout_flag:Optional[int]=TaskTimeoutFlag.CLOSE,timeout_notify_strategy:Optional=None,timeout:Optional[int]=0,process_definition:Optional[ProcessDefinition]=None,local_params:Optional[List]=None,resource_list:Optional[List]=None,dependence:Optional[Dict]=None,wait_start_timeout:Optional[Dict]=None,condition_result:Optional[Dict]=None,):super().__init__(name,description)self.task_type=task_typeself.flag=flagself.task_priority=task_priorityself.worker_group=worker_groupself._environment_name=environment_nameself.fail_retry_times=fail_retry_timesself.fail_retry_interval=fail_retry_intervalself.delay_time=delay_timeself.timeout_flag=timeout_flagself.timeout_notify_strategy=timeout_notify_strategyself.timeout=timeoutself._process_definition=Noneself.process_definition:ProcessDefinition=(process_definitionorProcessDefinitionContext.get())self._upstream_task_codes:Set[int]=set()self._downstream_task_codes:Set[int]=set()self._task_relation:Set[TaskRelation]=set()# move attribute code and version after _process_definition and process_definition declareself.code,self.version=self.gen_code_and_version()# Add task to process definition, maybe we could put into property process_definition latterif(self.process_definitionisnotNoneandself.codenotinself.process_definition.tasks):self.process_definition.add_task(self)else:logger.warning("Task code %d already in process definition, prohibit re-add task.",self.code,)# Attribute for task paramself.local_params=local_paramsor[]self._resource_list=resource_listor[]self.dependence=dependenceor{}self.wait_start_timeout=wait_start_timeoutor{}self._condition_result=condition_resultorself.DEFAULT_CONDITION_RESULT@propertydefprocess_definition(self)->Optional[ProcessDefinition]:"""Get attribute process_definition."""returnself._process_definition@process_definition.setterdefprocess_definition(self,process_definition:Optional[ProcessDefinition]):"""Set attribute process_definition."""self._process_definition=process_definition@propertydefresource_list(self)->List:"""Get task define attribute `resource_list`."""resources=set()forresinself._resource_list:iftype(res)==str:resources.add(Resource(name=res,user_name=self.user_name).get_id_from_database())eliftype(res)==dictandres.get(ResourceKey.ID)isnotNone:logger.warning("""`resource_list` should be defined using List[str] with resource paths, the use of ids to define resources will be remove in version 3.2.0. """)resources.add(res.get(ResourceKey.ID))return[{ResourceKey.ID:r}forrinresources]@propertydefuser_name(self)->Optional[str]:"""Return user name of process definition."""ifself.process_definition:returnself.process_definition.user.nameelse:raisePyDSParamException("`user_name` cannot be empty.")@propertydefcondition_result(self)->Dict:"""Get attribute condition_result."""returnself._condition_result@condition_result.setterdefcondition_result(self,condition_result:Optional[Dict]):"""Set attribute condition_result."""self._condition_result=condition_result
[docs]def_get_attr(self)->Set[str]:"""Get final task task_params attribute. Base on `_task_default_attr`, append attribute from `_task_custom_attr` and subtract attribute from `_task_ignore_attr`. """attr=copy.deepcopy(self._task_default_attr)attr-=self._task_ignore_attrattr|=self._task_custom_attrreturnattr
@propertydeftask_params(self)->Optional[Dict]:"""Get task parameter object. Will get result to combine _task_custom_attr and custom_attr. """custom_attr=self._get_attr()returnself.get_define_custom(custom_attr=custom_attr)def__hash__(self):returnhash(self.code)def__lshift__(self,other:Union["Task",Sequence["Task"]]):"""Implement Task << Task."""self.set_upstream(other)returnotherdef__rshift__(self,other:Union["Task",Sequence["Task"]]):"""Implement Task >> Task."""self.set_downstream(other)returnotherdef__rrshift__(self,other:Union["Task",Sequence["Task"]]):"""Call for Task >> [Task] because list don't have __rshift__ operators."""self.__lshift__(other)returnselfdef__rlshift__(self,other:Union["Task",Sequence["Task"]]):"""Call for Task << [Task] because list don't have __lshift__ operators."""self.__rshift__(other)returnself
[docs]def_set_deps(self,tasks:Union["Task",Sequence["Task"]],upstream:bool=True)->None:""" Set parameter tasks dependent to current task. it is a wrapper for :func:`set_upstream` and :func:`set_downstream`. """ifnotisinstance(tasks,Sequence):tasks=[tasks]fortaskintasks:ifupstream:self._upstream_task_codes.add(task.code)task._downstream_task_codes.add(self.code)ifself._process_definition:task_relation=TaskRelation(pre_task_code=task.code,post_task_code=self.code,name=f"{task.name}{Delimiter.DIRECTION}{self.name}",)self.process_definition._task_relations.add(task_relation)else:self._downstream_task_codes.add(task.code)task._upstream_task_codes.add(self.code)ifself._process_definition:task_relation=TaskRelation(pre_task_code=self.code,post_task_code=task.code,name=f"{self.name}{Delimiter.DIRECTION}{task.name}",)self.process_definition._task_relations.add(task_relation)
[docs]defset_upstream(self,tasks:Union["Task",Sequence["Task"]])->None:"""Set parameter tasks as upstream to current task."""self._set_deps(tasks,upstream=True)
[docs]defset_downstream(self,tasks:Union["Task",Sequence["Task"]])->None:"""Set parameter tasks as downstream to current task."""self._set_deps(tasks,upstream=False)
# TODO code should better generate in bulk mode when :ref: processDefinition run submit or start
[docs]defgen_code_and_version(self)->Tuple:""" Generate task code and version from java gateway. If task name do not exists in process definition before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version. """# TODO get code from specific project process definition and task nameresult=JavaGate().get_code_and_version(self.process_definition._project,self.process_definition.name,self.name)# result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)# gateway_result_checker(result)returnresult.get("code"),result.get("version")
@propertydefenvironment_code(self)->str:"""Convert environment name to code."""ifself._environment_nameisNone:returnNonereturnJavaGate().query_environment_info(self._environment_name)