DVC

A DVC task type’s example and dive into information of PyDolphinScheduler.

Example

"""A example workflow for task dvc."""

from pydolphinscheduler.core.workflow import Workflow
from pydolphinscheduler.tasks import DVCDownload, DVCInit, DVCUpload

repository = "git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example.git"

with Workflow(
    name="task_dvc_example",
) as workflow:
    init_task = DVCInit(name="init_dvc", repository=repository, store_url="~/dvc_data")
    upload_task = DVCUpload(
        name="upload_data",
        repository=repository,
        data_path_in_dvc_repository="iris",
        data_path_in_worker="~/source/iris",
        version="v1",
        message="upload iris data v1",
    )

    download_task = DVCDownload(
        name="download_data",
        repository=repository,
        data_path_in_dvc_repository="iris",
        data_path_in_worker="~/target/iris",
        version="v1",
    )

    init_task >> upload_task >> download_task

    workflow.run()

Dive Into

Task dvc.

class pydolphinscheduler.tasks.dvc.BaseDVC(name: str, repository: str, *args, **kwargs)[source]

Bases: BatchTask

Base class for dvc task.

_child_task_dvc_attr = {}
_task_custom_attr: set = {'dvc_repository', 'dvc_task_type'}
dvc_task_type = None
property task_params: dict

Return task params.

class pydolphinscheduler.tasks.dvc.DVCDownload(name: str, repository: str, data_path_in_dvc_repository: str, data_path_in_worker: str, version: str, *args, **kwargs)[source]

Bases: BaseDVC

Task DVC Download object, declare behavior for DVC Download task to dolphinscheduler.

_child_task_dvc_attr = {'dvc_data_location', 'dvc_load_save_data_path', 'dvc_version'}
_downstream_task_codes: set[int]
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
dvc_task_type = 'Download'
class pydolphinscheduler.tasks.dvc.DVCInit(name: str, repository: str, store_url: str, *args, **kwargs)[source]

Bases: BaseDVC

Task DVC Init object, declare behavior for DVC Init task to dolphinscheduler.

_child_task_dvc_attr = {'dvc_store_url'}
_downstream_task_codes: set[int]
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
dvc_task_type = 'Init DVC'
class pydolphinscheduler.tasks.dvc.DVCUpload(name: str, repository: str, data_path_in_worker: str, data_path_in_dvc_repository: str, version: str, message: str, *args, **kwargs)[source]

Bases: BaseDVC

Task DVC Upload object, declare behavior for DVC Upload task to dolphinscheduler.

_child_task_dvc_attr = {'dvc_data_location', 'dvc_load_save_data_path', 'dvc_message', 'dvc_version'}
_downstream_task_codes: set[int]
_task_relation: set[TaskRelation]
_timeout: timedelta | int
_upstream_task_codes: set[int]
dvc_task_type = 'Upload'
class pydolphinscheduler.tasks.dvc.DvcTaskType[source]

Bases: str

Constants for dvc task type.

DOWNLOAD = 'Download'
INIT = 'Init DVC'
UPLOAD = 'Upload'

YAML file example

# Define variable `repository`
repository: &repository "git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example.git" 

# Define the workflow
workflow:
  name: "DVC"
  release_state: "offline"

# Define the tasks within the workflow
tasks:
  - name: init_dvc 
    task_type: DVCInit
    repository: *repository
    store_url: ~/dvc_data

  - name: upload_data
    task_type: DVCUpload
    repository: *repository
    data_path_in_dvc_repository: "iris"
    data_path_in_worker: ~/source/iris
    version: v1
    message: upload iris data v1

  - name: download_data
    task_type: DVCDownload
    repository: *repository
    data_path_in_dvc_repository: "iris"
    data_path_in_worker: ~/target/iris
    version: v1