Source code for pydolphinscheduler.tasks.http

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

"""Task shell."""

from __future__ import annotations

import warnings

from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.parameter import Direction, ParameterHelper
from pydolphinscheduler.core.task import BatchTask
from pydolphinscheduler.exceptions import PyDSParamException


[docs] class HttpMethod: """Constant of HTTP method.""" GET = "GET" POST = "POST" HEAD = "HEAD" PUT = "PUT" DELETE = "DELETE"
[docs] class HttpCheckCondition: """Constant of HTTP check condition. For now it contain four value: - STATUS_CODE_DEFAULT: when http response code equal to 200, mark as success. - STATUS_CODE_CUSTOM: when http response code equal to the code user define, mark as success. - BODY_CONTAINS: when http response body contain text user define, mark as success. - BODY_NOT_CONTAINS: when http response body do not contain text user define, mark as success. """ STATUS_CODE_DEFAULT = "STATUS_CODE_DEFAULT" STATUS_CODE_CUSTOM = "STATUS_CODE_CUSTOM" BODY_CONTAINS = "BODY_CONTAINS" BODY_NOT_CONTAINS = "BODY_NOT_CONTAINS"
[docs] class Http(BatchTask): """Task HTTP object, declare behavior for HTTP task to dolphinscheduler. :param name: The name or identifier for the HTTP task. :param url: The URL endpoint for the HTTP request. :param http_method: The HTTP method for the request (GET, POST, etc.). Defaults to HttpMethod.GET. :param http_params: Parameters for the HTTP request. Defaults to None. :param http_check_condition: Condition for checking the HTTP response status. Defaults to HttpCheckCondition.STATUS_CODE_DEFAULT. :param condition: Additional condition to evaluate if `http_check_condition` is not STATUS_CODE_DEFAULT. :param connect_timeout: Connection timeout for the HTTP request in milliseconds. Defaults to 60000. :param socket_timeout: Socket timeout for the HTTP request in milliseconds. Defaults to 60000. Attributes: _task_custom_attr (set): A set containing custom attributes specific to the Http task, including 'url', 'http_method', 'http_params', and more. Raises: PyDSParamException: Exception raised for invalid parameters, such as unsupported HTTP methods or conditions. Example: Usage example for creating an HTTP task: http_task = Http(name="http_task", url="https://api.example.com", http_method="POST", http_params={"key": "value"}) """ _task_custom_attr = { "url", "http_method", "http_params", "http_check_condition", "condition", "connect_timeout", "socket_timeout", } def __init__( self, name: str, url: str, http_method: str | None = HttpMethod.GET, http_params: dict | None = None, http_check_condition: str | None = HttpCheckCondition.STATUS_CODE_DEFAULT, condition: str | None = None, connect_timeout: int | None = 60000, socket_timeout: int | None = 60000, *args, **kwargs, ): super().__init__(name, TaskType.HTTP, *args, **kwargs) self.url = url if not hasattr(HttpMethod, http_method): raise PyDSParamException( "Parameter http_method %s not support.", http_method ) if isinstance(http_params, list): warnings.warn( "The `http_params` parameter currently accepts a dictionary instead of a list. Your parameter is being ignored.", DeprecationWarning, ) self.http_method = http_method self._http_params = http_params if not hasattr(HttpCheckCondition, http_check_condition): raise PyDSParamException( "Parameter http_check_condition %s not support.", http_check_condition ) self.http_check_condition = http_check_condition if ( http_check_condition != HttpCheckCondition.STATUS_CODE_DEFAULT and condition is None ): raise PyDSParamException( "Parameter condition must provider if http_check_condition not equal to STATUS_CODE_DEFAULT" ) self.condition = condition self.connect_timeout = connect_timeout self.socket_timeout = socket_timeout @property def http_params(self): """Property to convert http_params using ParameterHelper when accessed.""" return ( ParameterHelper.convert_params(self._http_params, direction=Direction.IN) if self._http_params else [] )