DVC
A DVC task type’s example and dive into information of PyDolphinScheduler.
Example
"""A example workflow for task dvc."""
from pydolphinscheduler.core.workflow import Workflow
from pydolphinscheduler.tasks import DVCDownload, DVCInit, DVCUpload
repository = "git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example.git"
with Workflow(
name="task_dvc_example",
) as workflow:
init_task = DVCInit(name="init_dvc", repository=repository, store_url="~/dvc_data")
upload_task = DVCUpload(
name="upload_data",
repository=repository,
data_path_in_dvc_repository="iris",
data_path_in_worker="~/source/iris",
version="v1",
message="upload iris data v1",
)
download_task = DVCDownload(
name="download_data",
repository=repository,
data_path_in_dvc_repository="iris",
data_path_in_worker="~/target/iris",
version="v1",
)
init_task >> upload_task >> download_task
workflow.run()
Dive Into
Task dvc.
- class pydolphinscheduler.tasks.dvc.BaseDVC(name: str, repository: str, *args, **kwargs)[source]
Bases:
Task
Base class for dvc task.
- _child_task_dvc_attr = {}
- _task_custom_attr: set = {'dvc_repository', 'dvc_task_type'}
- dvc_task_type = None
- property task_params: Dict
Return task params.
- class pydolphinscheduler.tasks.dvc.DVCDownload(name: str, repository: str, data_path_in_dvc_repository: str, data_path_in_worker: str, version: str, *args, **kwargs)[source]
Bases:
BaseDVC
Task DVC Download object, declare behavior for DVC Download task to dolphinscheduler.
- _child_task_dvc_attr = {'dvc_data_location', 'dvc_load_save_data_path', 'dvc_version'}
- _downstream_task_codes: Set[int]
- _task_relation: Set[TaskRelation]
- _timeout: timedelta
- _upstream_task_codes: Set[int]
- dvc_task_type = 'Download'
- class pydolphinscheduler.tasks.dvc.DVCInit(name: str, repository: str, store_url: str, *args, **kwargs)[source]
Bases:
BaseDVC
Task DVC Init object, declare behavior for DVC Init task to dolphinscheduler.
- _child_task_dvc_attr = {'dvc_store_url'}
- _downstream_task_codes: Set[int]
- _task_relation: Set[TaskRelation]
- _timeout: timedelta
- _upstream_task_codes: Set[int]
- dvc_task_type = 'Init DVC'
- class pydolphinscheduler.tasks.dvc.DVCUpload(name: str, repository: str, data_path_in_worker: str, data_path_in_dvc_repository: str, version: str, message: str, *args, **kwargs)[source]
Bases:
BaseDVC
Task DVC Upload object, declare behavior for DVC Upload task to dolphinscheduler.
- _child_task_dvc_attr = {'dvc_data_location', 'dvc_load_save_data_path', 'dvc_message', 'dvc_version'}
- _downstream_task_codes: Set[int]
- _task_relation: Set[TaskRelation]
- _timeout: timedelta
- _upstream_task_codes: Set[int]
- dvc_task_type = 'Upload'
YAML file example
# Define variable `repository`
repository: &repository "git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example.git"
# Define the workflow
workflow:
name: "DVC"
release_state: "offline"
# Define the tasks within the workflow
tasks:
- name: init_dvc
task_type: DVCInit
repository: *repository
store_url: ~/dvc_data
- name: upload_data
task_type: DVCUpload
repository: *repository
data_path_in_dvc_repository: "iris"
data_path_in_worker: ~/source/iris
version: v1
message: upload iris data v1
- name: download_data
task_type: DVCDownload
repository: *repository
data_path_in_dvc_repository: "iris"
data_path_in_worker: ~/target/iris
version: v1