ResourcePlugin
ResourcePlugin is an abstract class of resource plug-in parameters of task subclass and workflow. All resource plugins need to inherit and override its abstract methods.
Code
class ResourcePlugin(object, metaclass=ABCMeta):
"""ResourcePlugin object, declare resource plugin for task and workflow to dolphinscheduler.
:param prefix: A string representing the prefix of ResourcePlugin.
"""
# [start init_method]
def __init__(self, prefix: str, *args, **kwargs):
self.prefix = prefix
# [end init_method]
# [start abstractmethod read_file]
@abstractmethod
def read_file(self, suf: str):
"""Get the content of the file.
The address of the file is the prefix of the resource plugin plus the parameter suf.
"""
# [end abstractmethod read_file]
def get_index(self, s: str, x, n):
"""Find the subscript of the nth occurrence of the X character in the string s."""
if n <= s.count(x):
all_index = [key for key, value in enumerate(s) if value == x]
return all_index[n - 1]
else:
raise PyResPluginException("Incomplete path.")
Dive Into
It has the following key functions.
Method __init__: The __init__ function has STR type parameter prefix, which means the prefix of the resource.
You can rewrite this function if necessary.
def __init__(self, prefix: str, *args, **kwargs):
self.prefix = prefix
Method read_file: Get content from the given URI, The function parameter is the suffix of the file path.
The file prefix has been initialized in init of the resource plug-in.
The prefix plus suffix is the absolute path of the file in this resource.
It is an abstract function. You must rewrite it
@abstractmethod
def read_file(self, suf: str):
"""Get the content of the file.
The address of the file is the prefix of the resource plugin plus the parameter suf.
"""
DolphinScheduler ResourcePlugin object.
- class pydolphinscheduler.core.resource_plugin.ResourcePlugin(prefix: str, *args, **kwargs)[source]
Bases:
object
ResourcePlugin object, declare resource plugin for task and workflow to dolphinscheduler.
- Parameters:
prefix – A string representing the prefix of ResourcePlugin.
- get_index(s: str, x, n)[source]
Find the subscript of the nth occurrence of the X character in the string s.
- abstract read_file(suf: str)[source]
Get the content of the file.
The address of the file is the prefix of the resource plugin plus the parameter suf.
- _abc_impl = <_abc._abc_data object>
How to use
Resource plugin can be used in task subclasses and workflows. You can use the resource plugin by adding the resource_plugin parameter when they are initialized. For example, local resource plugin, add resource_plugin = Local(“/tmp”).
The resource plugin we currently support are local, github, gitlab, OSS, S3.
Here is an example.
with Workflow(
name="tutorial_resource_plugin",
schedule="0 0 0 * * ? *",
start_time="2021-01-01",
tenant="tenant_exists",
resource_plugin=Local("/tmp"),
) as workflow:
# [end workflow_declare]
# [start task_declare]
file = "resource.sh"
path = Path("/tmp").joinpath(file)
with open(str(path), "w") as f:
f.write("echo tutorial resource plugin")
task_parent = Shell(
name="local-resource-example",
command=file,
)
print(task_parent.task_params)
os.remove(path)
When the resource_plugin parameter is defined in both the task subclass and the workflow, the resource_plugin defined in the task subclass is used first.
If the task subclass does not define resource_plugin, but the resource_plugin is defined in the workflow, the resource_plugin in the workflow is used.
Of course, if neither the task subclass nor the workflow specifies resource_plugin, the command at this time will be executed as a script,
in other words, we are forward compatible.