# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""DolphinScheduler github resource plugin."""
from __future__ import annotations
import base64
from urllib.parse import urljoin
import requests
from pydolphinscheduler.constants import Symbol
from pydolphinscheduler.core.resource_plugin import ResourcePlugin
from pydolphinscheduler.resources_plugin.base.git import Git, GitHubFileInfo
[docs]
class GitHub(ResourcePlugin, Git):
"""GitHub resource plugin, a plugin for task and workflow to dolphinscheduler to read resource.
:param prefix: A string representing the prefix of GitHub.
:param access_token: A string used for identity authentication of GitHub private repository.
"""
def __init__(self, prefix: str, access_token: str | None = None, *args, **kwargs):
super().__init__(prefix, *args, **kwargs)
self.access_token = access_token
_git_file_info: GitHubFileInfo | None = None
[docs]
def build_req_api(
self,
user: str,
repo_name: str,
file_path: str,
api: str,
):
"""Build request file content API."""
api = api.replace("{user}", user)
api = api.replace("{repo_name}", repo_name)
api = api.replace("{file_path}", file_path)
return api
[docs]
def get_git_file_info(self, path: str):
"""Get file information from the file url, like repository name, user, branch, and file path."""
elements = path.split(Symbol.SLASH)
index = self.get_index(path, Symbol.SLASH, 7)
index = index + 1
file_info = GitHubFileInfo(
user=elements[3],
repo_name=elements[4],
branch=elements[6],
file_path=path[index:],
)
self._git_file_info = file_info
[docs]
def get_req_url(self):
"""Build request URL according to file information."""
return self.build_req_api(
user=self._git_file_info.user,
repo_name=self._git_file_info.repo_name,
file_path=self._git_file_info.file_path,
api="https://api.github.com/repos/{user}/{repo_name}/contents/{file_path}",
)
[docs]
def read_file(self, suf: str):
"""Get the content of the file.
The address of the file is the prefix of the resource plugin plus the parameter suf.
"""
path = urljoin(self.prefix, suf)
return self.req(path)
[docs]
def req(self, path: str):
"""Send HTTP request, parse response data, and get file content."""
headers = {
"Content-Type": "application/json; charset=utf-8",
}
if self.access_token is not None:
headers.setdefault("Authorization", f"Bearer {self.access_token}")
self.get_git_file_info(path)
response = requests.get(
headers=headers,
url=self.get_req_url(),
params={"ref": self._git_file_info.branch},
)
if response.status_code == requests.codes.ok:
json_response = response.json()
content = base64.b64decode(json_response["content"])
return content.decode("utf-8")
else:
raise Exception(response.json())