Source code for pydolphinscheduler.resources_plugin.github

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

"""DolphinScheduler github resource plugin."""
import base64
from typing import Optional
from urllib.parse import urljoin

import requests

from pydolphinscheduler.constants import Symbol
from pydolphinscheduler.core.resource_plugin import ResourcePlugin
from pydolphinscheduler.resources_plugin.base.git import Git, GitHubFileInfo


[docs] class GitHub(ResourcePlugin, Git): """GitHub resource plugin, a plugin for task and workflow to dolphinscheduler to read resource. :param prefix: A string representing the prefix of GitHub. :param access_token: A string used for identity authentication of GitHub private repository. """ def __init__( self, prefix: str, access_token: Optional[str] = None, *args, **kwargs ): super().__init__(prefix, *args, **kwargs) self.access_token = access_token _git_file_info: Optional[GitHubFileInfo] = None
[docs] def build_req_api( self, user: str, repo_name: str, file_path: str, api: str, ): """Build request file content API.""" api = api.replace("{user}", user) api = api.replace("{repo_name}", repo_name) api = api.replace("{file_path}", file_path) return api
[docs] def get_git_file_info(self, path: str): """Get file information from the file url, like repository name, user, branch, and file path.""" elements = path.split(Symbol.SLASH) index = self.get_index(path, Symbol.SLASH, 7) index = index + 1 file_info = GitHubFileInfo( user=elements[3], repo_name=elements[4], branch=elements[6], file_path=path[index:], ) self._git_file_info = file_info
[docs] def get_req_url(self): """Build request URL according to file information.""" return self.build_req_api( user=self._git_file_info.user, repo_name=self._git_file_info.repo_name, file_path=self._git_file_info.file_path, api="https://api.github.com/repos/{user}/{repo_name}/contents/{file_path}", )
[docs] def read_file(self, suf: str): """Get the content of the file. The address of the file is the prefix of the resource plugin plus the parameter suf. """ path = urljoin(self.prefix, suf) return self.req(path)
[docs] def req(self, path: str): """Send HTTP request, parse response data, and get file content.""" headers = { "Content-Type": "application/json; charset=utf-8", } if self.access_token is not None: headers.setdefault("Authorization", "Bearer %s" % self.access_token) self.get_git_file_info(path) response = requests.get( headers=headers, url=self.get_req_url(), params={"ref": self._git_file_info.branch}, ) if response.status_code == requests.codes.ok: json_response = response.json() content = base64.b64decode(json_response["content"]) return content.decode("utf-8") else: raise Exception(response.json())