# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Task Python."""
import logging
import re
import types
from pathlib import Path
from typing import Union
from stmdency.extractor import Extractor
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.task import Task
from pydolphinscheduler.exceptions import PyDSParamException
log = logging.getLogger(__file__)
[docs]
class Python(Task):
"""Task Python object, declare behavior for Python task to dolphinscheduler.
Python task support two types of parameters for :param:``definition``, and here is an example:
Using str type of :param:``definition``
.. code-block:: python
python_task = Python(name="str_type", definition="print('Hello Python task.')")
Or using Python callable type of :param:``definition``
.. code-block:: python
def foo():
print("Hello Python task.")
python_task = Python(name="str_type", definition=foo)
:param name: The name for Python task. It define the task name.
:param definition: String format of Python script you want to execute or Python callable you
want to execute.
"""
_task_custom_attr = {"raw_script"}
ext: set = {".py"}
ext_attr: Union[str, types.FunctionType] = "_definition"
def __init__(
self, name: str, definition: Union[str, types.FunctionType], *args, **kwargs
):
self._definition = definition
super().__init__(name, TaskType.PYTHON, *args, **kwargs)
[docs]
def _build_exe_str(self) -> str:
"""Build executable string from given definition.
Attribute ``self.definition`` almost is a function, we need to call this function after parsing it
to string. The easier way to call a function is using syntax ``func()`` and we use it to call it too.
"""
definition = getattr(self, "definition")
if isinstance(definition, types.FunctionType):
loc = definition.__code__.co_filename
extractor = Extractor(Path(loc).open("r").read())
stm = extractor.get_code(definition.__name__)
func_str = f"{stm}{definition.__name__}()"
else:
pattern = re.compile("^def (\\w+)\\(")
find = pattern.findall(definition)
if not find:
return definition
# Keep function str and function callable always have one blank line
func_str = (
f"{definition}{find[0]}()"
if definition.endswith("\n")
else f"{definition}\n{find[0]}()"
)
return func_str
@property
def raw_script(self) -> str:
"""Get python task define attribute `raw_script`."""
if isinstance(getattr(self, "definition"), (str, types.FunctionType)):
return self._build_exe_str()
else:
raise PyDSParamException(
"Parameter definition do not support % for now.",
type(getattr(self, "definition")),
)