ResourcePlugin

ResourcePlugin is an abstract class of resource plug-in parameters of task subclass and workflow. All resource plugins need to inherit and override its abstract methods.

Code

class ResourcePlugin(object, metaclass=ABCMeta):
    """ResourcePlugin object, declare resource plugin for task and workflow to dolphinscheduler.

    :param prefix: A string representing the prefix of ResourcePlugin.

    """

    # [start init_method]
    def __init__(self, prefix: str, *args, **kwargs):
        self.prefix = prefix

    # [end init_method]

    # [start abstractmethod read_file]
    @abstractmethod
    def read_file(self, suf: str):
        """Get the content of the file.

        The address of the file is the prefix of the resource plugin plus the parameter suf.
        """

    # [end abstractmethod read_file]

    def get_index(self, s: str, x, n):
        """Find the subscript of the nth occurrence of the X character in the string s."""
        if n <= s.count(x):
            all_index = [key for key, value in enumerate(s) if value == x]
            return all_index[n - 1]
        else:
            raise PyResPluginException("Incomplete path.")


Dive Into

It has the following key functions.

  • Method __init__: The __init__ function has STR type parameter prefix, which means the prefix of the resource.

You can rewrite this function if necessary.

    def __init__(self, prefix: str, *args, **kwargs):
        self.prefix = prefix

  • Method read_file: Get content from the given URI, The function parameter is the suffix of the file path.

The file prefix has been initialized in init of the resource plug-in.

The prefix plus suffix is the absolute path of the file in this resource.

It is an abstract function. You must rewrite it

    @abstractmethod
    def read_file(self, suf: str):
        """Get the content of the file.

        The address of the file is the prefix of the resource plugin plus the parameter suf.
        """

DolphinScheduler ResourcePlugin object.

class pydolphinscheduler.core.resource_plugin.ResourcePlugin(prefix: str, *args, **kwargs)[source]

Bases: object

ResourcePlugin object, declare resource plugin for task and workflow to dolphinscheduler.

Parameters:

prefix – A string representing the prefix of ResourcePlugin.

get_index(s: str, x, n)[source]

Find the subscript of the nth occurrence of the X character in the string s.

abstract read_file(suf: str)[source]

Get the content of the file.

The address of the file is the prefix of the resource plugin plus the parameter suf.

_abc_impl = <_abc._abc_data object>

How to use

Resource plugin can be used in task subclasses and workflows. You can use the resource plugin by adding the resource_plugin parameter when they are initialized. For example, local resource plugin, add resource_plugin = Local(“/tmp”).

The resource plugin we currently support are local, github, gitlab, OSS, S3.

Here is an example.

with Workflow(
    name="tutorial_resource_plugin",
    schedule="0 0 0 * * ? *",
    start_time="2021-01-01",
    tenant="tenant_exists",
    resource_plugin=Local("/tmp"),
) as workflow:
    # [end workflow_declare]
    # [start task_declare]
    file = "resource.sh"
    path = Path("/tmp").joinpath(file)
    with open(str(path), "w") as f:
        f.write("echo tutorial resource plugin")
    task_parent = Shell(
        name="local-resource-example",
        command=file,
    )
    print(task_parent.task_params)
    os.remove(path)

When the resource_plugin parameter is defined in both the task subclass and the workflow, the resource_plugin defined in the task subclass is used first.

If the task subclass does not define resource_plugin, but the resource_plugin is defined in the workflow, the resource_plugin in the workflow is used.

Of course, if neither the task subclass nor the workflow specifies resource_plugin, the command at this time will be executed as a script,

in other words, we are forward compatible.