MLflow

A MLflow task type’s example and dive into information of PyDolphinScheduler.

Example

"""A example workflow for task mlflow."""

from pydolphinscheduler.core.workflow import Workflow
from pydolphinscheduler.tasks.mlflow import (
    MLflowDeployType,
    MLflowModels,
    MLFlowProjectsAutoML,
    MLFlowProjectsBasicAlgorithm,
    MLFlowProjectsCustom,
)

mlflow_tracking_uri = "http://127.0.0.1:5000"

with Workflow(
    name="task_mlflow_example",
) as workflow:
    # run custom mlflow project to train model
    train_custom = MLFlowProjectsCustom(
        name="train_xgboost_native",
        repository="https://github.com/mlflow/mlflow#examples/xgboost/xgboost_native",
        mlflow_tracking_uri=mlflow_tracking_uri,
        parameters="-P learning_rate=0.2 -P colsample_bytree=0.8 -P subsample=0.9",
        experiment_name="xgboost",
    )

    # run automl to train model
    train_automl = MLFlowProjectsAutoML(
        name="train_automl",
        mlflow_tracking_uri=mlflow_tracking_uri,
        parameters="time_budget=30;estimator_list=['lgbm']",
        experiment_name="automl_iris",
        model_name="iris_A",
        automl_tool="flaml",
        data_path="/data/examples/iris",
    )

    # Using DOCKER to deploy model from train_automl
    deploy_docker = MLflowModels(
        name="deploy_docker",
        model_uri="models:/iris_A/Production",
        mlflow_tracking_uri=mlflow_tracking_uri,
        deploy_mode=MLflowDeployType.DOCKER,
        port=7002,
    )

    train_automl >> deploy_docker

    # run lightgbm to train model
    train_basic_algorithm = MLFlowProjectsBasicAlgorithm(
        name="train_basic_algorithm",
        mlflow_tracking_uri=mlflow_tracking_uri,
        parameters="n_estimators=200;learning_rate=0.2",
        experiment_name="basic_algorithm_iris",
        model_name="iris_B",
        algorithm="lightgbm",
        data_path="/data/examples/iris",
        search_params="max_depth=[5, 10];n_estimators=[100, 200]",
    )

    # Using MLFLOW to deploy model from training lightgbm project
    deploy_mlflow = MLflowModels(
        name="deploy_mlflow",
        model_uri="models:/iris_B/Production",
        mlflow_tracking_uri=mlflow_tracking_uri,
        deploy_mode=MLflowDeployType.MLFLOW,
        port=7001,
    )

    train_basic_algorithm >> deploy_mlflow

    workflow.submit()

Dive Into

Task mlflow.

class pydolphinscheduler.tasks.mlflow.BaseMLflow(name: str, mlflow_tracking_uri: str, *args, **kwargs)[source]

Bases: Task

Base MLflow task.

_child_task_mlflow_attr = {}
_task_custom_attr: set = {'mlflow_task_type', 'mlflow_tracking_uri'}
mlflow_task_type = None
property task_params: Dict

Return task params.

class pydolphinscheduler.tasks.mlflow.MLFlowProjectsAutoML(name: str, data_path: str, automl_tool: str | None = 'flaml', mlflow_tracking_uri: str | None = 'http://127.0.0.1:5000', experiment_name: str | None = '', model_name: str | None = '', parameters: str | None = '', *args, **kwargs)[source]

Bases: BaseMLflow

Task MLflow projects object, declare behavior for AutoML task to dolphinscheduler.

Parameters:
  • name – task name

  • data_path – data path of MLflow Project, Support git address and directory on worker.

  • automl_tool – The AutoML tool used, currently supports autosklearn and flaml.

  • mlflow_tracking_uri – MLflow tracking server uri, default is http://127.0.0.1:5000

  • experiment_name – MLflow experiment name, default is empty

  • model_name – MLflow model name, default is empty

  • parameters – MLflow project parameters, default is empty

_child_task_mlflow_attr = {'automl_tool', 'data_path', 'experiment_name', 'mlflow_job_type', 'model_name', 'params', 'register_model'}
_downstream_task_codes: Set[int]
_task_relation: Set[TaskRelation]
_timeout: timedelta
_upstream_task_codes: Set[int]
mlflow_job_type = 'AutoML'
mlflow_task_type = 'MLflow Projects'
class pydolphinscheduler.tasks.mlflow.MLFlowProjectsBasicAlgorithm(name: str, data_path: str, algorithm: str | None = 'lightgbm', mlflow_tracking_uri: str | None = 'http://127.0.0.1:5000', experiment_name: str | None = '', model_name: str | None = '', parameters: str | None = '', search_params: str | None = '', *args, **kwargs)[source]

Bases: BaseMLflow

Task MLflow projects object, declare behavior for BasicAlgorithm task to dolphinscheduler.

Parameters:
  • name – task name

  • data_path – data path of MLflow Project, Support git address and directory on worker.

  • algorithm – The selected algorithm currently supports LR, SVM, LightGBM and XGboost based on scikit-learn form.

  • mlflow_tracking_uri – MLflow tracking server uri, default is http://127.0.0.1:5000

  • experiment_name – MLflow experiment name, default is empty

  • model_name – MLflow model name, default is empty

  • parameters – MLflow project parameters, default is empty

  • search_params – Whether to search the parameters, default is empty

_child_task_mlflow_attr = {'algorithm', 'data_path', 'experiment_name', 'mlflow_job_type', 'model_name', 'params', 'register_model', 'search_params'}
_downstream_task_codes: Set[int]
_task_relation: Set[TaskRelation]
_timeout: timedelta
_upstream_task_codes: Set[int]
mlflow_job_type = 'BasicAlgorithm'
mlflow_task_type = 'MLflow Projects'
class pydolphinscheduler.tasks.mlflow.MLFlowProjectsCustom(name: str, repository: str, mlflow_tracking_uri: str | None = 'http://127.0.0.1:5000', experiment_name: str | None = '', parameters: str | None = '', version: str | None = 'master', *args, **kwargs)[source]

Bases: BaseMLflow

Task MLflow projects object, declare behavior for MLflow Custom projects task to dolphinscheduler.

Parameters:
  • name – task name

  • repository – Repository url of MLflow Project, Support git address and directory on worker. If it’s in a subdirectory, We add # to support this (same as mlflow run) , for example https://github.com/mlflow/mlflow#examples/xgboost/xgboost_native.

  • mlflow_tracking_uri – MLflow tracking server uri, default is http://127.0.0.1:5000

  • experiment_name – MLflow experiment name, default is empty

  • parameters – MLflow project parameters, default is empty

  • version – MLflow project version, default is master

_child_task_mlflow_attr = {'experiment_name', 'mlflow_job_type', 'mlflow_project_repository', 'mlflow_project_version', 'params'}
_downstream_task_codes: Set[int]
_task_relation: Set[TaskRelation]
_timeout: timedelta
_upstream_task_codes: Set[int]
mlflow_job_type = 'CustomProject'
mlflow_task_type = 'MLflow Projects'
class pydolphinscheduler.tasks.mlflow.MLflowDeployType[source]

Bases: str

MLflow deploy type.

DOCKER = 'DOCKER'
MLFLOW = 'MLFLOW'
class pydolphinscheduler.tasks.mlflow.MLflowJobType[source]

Bases: str

MLflow job type.

AUTOML = 'AutoML'
BASIC_ALGORITHM = 'BasicAlgorithm'
CUSTOM_PROJECT = 'CustomProject'
class pydolphinscheduler.tasks.mlflow.MLflowModels(name: str, model_uri: str, mlflow_tracking_uri: str | None = 'http://127.0.0.1:5000', deploy_mode: str | None = 'DOCKER', port: int | None = 7000, *args, **kwargs)[source]

Bases: BaseMLflow

Task MLflow models object, declare behavior for MLflow models task to dolphinscheduler.

Deploy machine learning models in diverse serving environments.

Parameters:
  • name – task name

  • model_uri – Model-URI of MLflow , support models:/<model_name>/suffix format and runs:/ format. See https://mlflow.org/docs/latest/tracking.html#artifact-stores

  • mlflow_tracking_uri – MLflow tracking server uri, default is http://127.0.0.1:5000

  • deploy_mode – MLflow deploy mode, support MLFLOW, DOCKER, default is DOCKER

  • port – deploy port, default is 7000

_child_task_mlflow_attr = {'deploy_model_key', 'deploy_port', 'deploy_type'}
_downstream_task_codes: Set[int]
_task_relation: Set[TaskRelation]
_timeout: timedelta
_upstream_task_codes: Set[int]
mlflow_task_type = 'MLflow Models'
class pydolphinscheduler.tasks.mlflow.MLflowTaskType[source]

Bases: str

MLflow task type.

MLFLOW_MODELS = 'MLflow Models'
MLFLOW_PROJECTS = 'MLflow Projects'

YAML file example

# Define variable `mlflow_tracking_uri`
mlflow_tracking_uri: &mlflow_tracking_uri "http://127.0.0.1:5000" 

# Define the workflow
workflow:
  name: "MLflow"

# Define the tasks within the workflow
tasks:
  - name: train_xgboost_native
    task_type: MLFlowProjectsCustom 
    repository: https://github.com/mlflow/mlflow#examples/xgboost/xgboost_native
    mlflow_tracking_uri: *mlflow_tracking_uri
    parameters: -P learning_rate=0.2 -P colsample_bytree=0.8 -P subsample=0.9
    experiment_name: xgboost

  - name: train_automl
    task_type: MLFlowProjectsAutoML 
    mlflow_tracking_uri: *mlflow_tracking_uri
    parameters: time_budget=30;estimator_list=['lgbm']
    experiment_name: automl_iris
    model_name: iris_A
    automl_tool: flaml
    data_path: /data/examples/iris

  - name: deploy_docker
    task_type: MLflowModels 
    deps: [train_automl]
    model_uri: models:/iris_A/Production
    mlflow_tracking_uri: *mlflow_tracking_uri
    deploy_mode: DOCKER
    port: 7002

  - name: train_basic_algorithm
    task_type: MLFlowProjectsBasicAlgorithm 
    mlflow_tracking_uri: *mlflow_tracking_uri
    parameters: n_estimators=200;learning_rate=0.2
    experiment_name: basic_algorithm_iris
    model_name: iris_B
    algorithm: lightgbm
    data_path: /data/examples/iris
    search_params: max_depth=[5, 10];n_estimators=[100, 200]

  - name: deploy_mlflow
    deps: [train_basic_algorithm]
    task_type: MLflowModels
    model_uri: models:/iris_B/Production
    mlflow_tracking_uri: *mlflow_tracking_uri
    deploy_mode: MLFLOW
    port: 7001