SageMaker

A SageMaker task type’s example and dive into information of PyDolphinScheduler.

Example

"""A example workflow for task sagemaker."""
import json

from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.tasks.sagemaker import SageMaker

sagemaker_request_data = {
    "ParallelismConfiguration": {"MaxParallelExecutionSteps": 1},
    "PipelineExecutionDescription": "test Pipeline",
    "PipelineExecutionDisplayName": "AbalonePipeline",
    "PipelineName": "AbalonePipeline",
    "PipelineParameters": [
        {"Name": "ProcessingInstanceType", "Value": "ml.m4.xlarge"},
        {"Name": "ProcessingInstanceCount", "Value": "2"},
    ],
}

with ProcessDefinition(
    name="task_sagemaker_example",
    tenant="tenant_exists",
) as pd:
    task_sagemaker = SageMaker(
        name="task_sagemaker",
        sagemaker_request_json=json.dumps(sagemaker_request_data, indent=2),
    )

    pd.run()

Dive Into

Task SageMaker.

class pydolphinscheduler.tasks.sagemaker.SageMaker(name: str, sagemaker_request_json: str, *args, **kwargs)[source]

Bases: Task

Task SageMaker object, declare behavior for SageMaker task to dolphinscheduler.

Parameters:
  • name – A unique, meaningful string for the SageMaker task.

  • sagemaker_request_json – Request parameters of StartPipelineExecution, see also AWS API

_downstream_task_codes: Set[int]
_task_custom_attr: set = {'sagemaker_request_json'}
_task_relation: Set[TaskRelation]
_upstream_task_codes: Set[int]

YAML file example

# Define the workflow
workflow:
  name: "Sagemaker"
  release_state: "offline"

# Define the tasks under the process
tasks:
  - name: sagemaker
    task_type: Sagemaker
    sagemaker_request_json: $FILE{"example_sagemaker_params.json"}

example_sagemaker_params.json:

{
    "ParallelismConfiguration":{
        "MaxParallelExecutionSteps":1
    },
    "PipelineExecutionDescription":"run pipeline using ds",
    "PipelineExecutionDisplayName":"ds-sagemaker-pipeline",
    "PipelineName":"DsSagemakerPipeline",
    "PipelineParameters":[
        {
            "Name":"InputData",
            "Value": "s3://sagemaker/dataset/dataset.csv"
        },
        {
            "Name":"InferenceData",
            "Value": "s3://sagemaker/dataset/inference.csv"
        }
    ]
}