MLflow
A MLflow task type’s example and dive into information of PyDolphinScheduler.
Example
"""A example workflow for task mlflow."""
from pydolphinscheduler.core.workflow import Workflow
from pydolphinscheduler.tasks.mlflow import (
MLflowDeployType,
MLflowModels,
MLFlowProjectsAutoML,
MLFlowProjectsBasicAlgorithm,
MLFlowProjectsCustom,
)
mlflow_tracking_uri = "http://127.0.0.1:5000"
with Workflow(
name="task_mlflow_example",
tenant="tenant_exists",
) as workflow:
# run custom mlflow project to train model
train_custom = MLFlowProjectsCustom(
name="train_xgboost_native",
repository="https://github.com/mlflow/mlflow#examples/xgboost/xgboost_native",
mlflow_tracking_uri=mlflow_tracking_uri,
parameters="-P learning_rate=0.2 -P colsample_bytree=0.8 -P subsample=0.9",
experiment_name="xgboost",
)
# run automl to train model
train_automl = MLFlowProjectsAutoML(
name="train_automl",
mlflow_tracking_uri=mlflow_tracking_uri,
parameters="time_budget=30;estimator_list=['lgbm']",
experiment_name="automl_iris",
model_name="iris_A",
automl_tool="flaml",
data_path="/data/examples/iris",
)
# Using DOCKER to deploy model from train_automl
deploy_docker = MLflowModels(
name="deploy_docker",
model_uri="models:/iris_A/Production",
mlflow_tracking_uri=mlflow_tracking_uri,
deploy_mode=MLflowDeployType.DOCKER,
port=7002,
)
train_automl >> deploy_docker
# run lightgbm to train model
train_basic_algorithm = MLFlowProjectsBasicAlgorithm(
name="train_basic_algorithm",
mlflow_tracking_uri=mlflow_tracking_uri,
parameters="n_estimators=200;learning_rate=0.2",
experiment_name="basic_algorithm_iris",
model_name="iris_B",
algorithm="lightgbm",
data_path="/data/examples/iris",
search_params="max_depth=[5, 10];n_estimators=[100, 200]",
)
# Using MLFLOW to deploy model from training lightgbm project
deploy_mlflow = MLflowModels(
name="deploy_mlflow",
model_uri="models:/iris_B/Production",
mlflow_tracking_uri=mlflow_tracking_uri,
deploy_mode=MLflowDeployType.MLFLOW,
port=7001,
)
train_basic_algorithm >> deploy_mlflow
workflow.submit()
Dive Into
Task mlflow.
- class pydolphinscheduler.tasks.mlflow.BaseMLflow(name: str, mlflow_tracking_uri: str, *args, **kwargs)[source]
Bases:
Task
Base MLflow task.
- _child_task_mlflow_attr = {}
- _task_custom_attr: set = {'mlflow_task_type', 'mlflow_tracking_uri'}
- mlflow_task_type = None
- property task_params: Dict
Return task params.
- class pydolphinscheduler.tasks.mlflow.MLFlowProjectsAutoML(name: str, data_path: str, automl_tool: str | None = 'flaml', mlflow_tracking_uri: str | None = 'http://127.0.0.1:5000', experiment_name: str | None = '', model_name: str | None = '', parameters: str | None = '', *args, **kwargs)[source]
Bases:
BaseMLflow
Task MLflow projects object, declare behavior for AutoML task to dolphinscheduler.
- Parameters:
name – task name
data_path – data path of MLflow Project, Support git address and directory on worker.
automl_tool – The AutoML tool used, currently supports autosklearn and flaml.
mlflow_tracking_uri – MLflow tracking server uri, default is http://127.0.0.1:5000
experiment_name – MLflow experiment name, default is empty
model_name – MLflow model name, default is empty
parameters – MLflow project parameters, default is empty
- _child_task_mlflow_attr = {'automl_tool', 'data_path', 'experiment_name', 'mlflow_job_type', 'model_name', 'params', 'register_model'}
- _downstream_task_codes: Set[int]
- _task_relation: Set[TaskRelation]
- _timeout: timedelta
- _upstream_task_codes: Set[int]
- mlflow_job_type = 'AutoML'
- mlflow_task_type = 'MLflow Projects'
- class pydolphinscheduler.tasks.mlflow.MLFlowProjectsBasicAlgorithm(name: str, data_path: str, algorithm: str | None = 'lightgbm', mlflow_tracking_uri: str | None = 'http://127.0.0.1:5000', experiment_name: str | None = '', model_name: str | None = '', parameters: str | None = '', search_params: str | None = '', *args, **kwargs)[source]
Bases:
BaseMLflow
Task MLflow projects object, declare behavior for BasicAlgorithm task to dolphinscheduler.
- Parameters:
name – task name
data_path – data path of MLflow Project, Support git address and directory on worker.
algorithm – The selected algorithm currently supports LR, SVM, LightGBM and XGboost based on scikit-learn form.
mlflow_tracking_uri – MLflow tracking server uri, default is http://127.0.0.1:5000
experiment_name – MLflow experiment name, default is empty
model_name – MLflow model name, default is empty
parameters – MLflow project parameters, default is empty
search_params – Whether to search the parameters, default is empty
- _child_task_mlflow_attr = {'algorithm', 'data_path', 'experiment_name', 'mlflow_job_type', 'model_name', 'params', 'register_model', 'search_params'}
- _downstream_task_codes: Set[int]
- _task_relation: Set[TaskRelation]
- _timeout: timedelta
- _upstream_task_codes: Set[int]
- mlflow_job_type = 'BasicAlgorithm'
- mlflow_task_type = 'MLflow Projects'
- class pydolphinscheduler.tasks.mlflow.MLFlowProjectsCustom(name: str, repository: str, mlflow_tracking_uri: str | None = 'http://127.0.0.1:5000', experiment_name: str | None = '', parameters: str | None = '', version: str | None = 'master', *args, **kwargs)[source]
Bases:
BaseMLflow
Task MLflow projects object, declare behavior for MLflow Custom projects task to dolphinscheduler.
- Parameters:
name – task name
repository – Repository url of MLflow Project, Support git address and directory on worker. If it’s in a subdirectory, We add # to support this (same as mlflow run) , for example https://github.com/mlflow/mlflow#examples/xgboost/xgboost_native.
mlflow_tracking_uri – MLflow tracking server uri, default is http://127.0.0.1:5000
experiment_name – MLflow experiment name, default is empty
parameters – MLflow project parameters, default is empty
version – MLflow project version, default is master
- _child_task_mlflow_attr = {'experiment_name', 'mlflow_job_type', 'mlflow_project_repository', 'mlflow_project_version', 'params'}
- _downstream_task_codes: Set[int]
- _task_relation: Set[TaskRelation]
- _timeout: timedelta
- _upstream_task_codes: Set[int]
- mlflow_job_type = 'CustomProject'
- mlflow_task_type = 'MLflow Projects'
- class pydolphinscheduler.tasks.mlflow.MLflowDeployType[source]
Bases:
str
MLflow deploy type.
- DOCKER = 'DOCKER'
- MLFLOW = 'MLFLOW'
- class pydolphinscheduler.tasks.mlflow.MLflowJobType[source]
Bases:
str
MLflow job type.
- AUTOML = 'AutoML'
- BASIC_ALGORITHM = 'BasicAlgorithm'
- CUSTOM_PROJECT = 'CustomProject'
- class pydolphinscheduler.tasks.mlflow.MLflowModels(name: str, model_uri: str, mlflow_tracking_uri: str | None = 'http://127.0.0.1:5000', deploy_mode: str | None = 'DOCKER', port: int | None = 7000, *args, **kwargs)[source]
Bases:
BaseMLflow
Task MLflow models object, declare behavior for MLflow models task to dolphinscheduler.
Deploy machine learning models in diverse serving environments.
- Parameters:
name – task name
model_uri – Model-URI of MLflow , support models:/<model_name>/suffix format and runs:/ format. See https://mlflow.org/docs/latest/tracking.html#artifact-stores
mlflow_tracking_uri – MLflow tracking server uri, default is http://127.0.0.1:5000
deploy_mode – MLflow deploy mode, support MLFLOW, DOCKER, default is DOCKER
port – deploy port, default is 7000
- _child_task_mlflow_attr = {'deploy_model_key', 'deploy_port', 'deploy_type'}
- _downstream_task_codes: Set[int]
- _task_relation: Set[TaskRelation]
- _timeout: timedelta
- _upstream_task_codes: Set[int]
- mlflow_task_type = 'MLflow Models'
YAML file example
# Define variable `mlflow_tracking_uri`
mlflow_tracking_uri: &mlflow_tracking_uri "http://127.0.0.1:5000"
# Define the workflow
workflow:
name: "MLflow"
# Define the tasks within the workflow
tasks:
- name: train_xgboost_native
task_type: MLFlowProjectsCustom
repository: https://github.com/mlflow/mlflow#examples/xgboost/xgboost_native
mlflow_tracking_uri: *mlflow_tracking_uri
parameters: -P learning_rate=0.2 -P colsample_bytree=0.8 -P subsample=0.9
experiment_name: xgboost
- name: train_automl
task_type: MLFlowProjectsAutoML
mlflow_tracking_uri: *mlflow_tracking_uri
parameters: time_budget=30;estimator_list=['lgbm']
experiment_name: automl_iris
model_name: iris_A
automl_tool: flaml
data_path: /data/examples/iris
- name: deploy_docker
task_type: MLflowModels
deps: [train_automl]
model_uri: models:/iris_A/Production
mlflow_tracking_uri: *mlflow_tracking_uri
deploy_mode: DOCKER
port: 7002
- name: train_basic_algorithm
task_type: MLFlowProjectsBasicAlgorithm
mlflow_tracking_uri: *mlflow_tracking_uri
parameters: n_estimators=200;learning_rate=0.2
experiment_name: basic_algorithm_iris
model_name: iris_B
algorithm: lightgbm
data_path: /data/examples/iris
search_params: max_depth=[5, 10];n_estimators=[100, 200]
- name: deploy_mlflow
deps: [train_basic_algorithm]
task_type: MLflowModels
model_uri: models:/iris_B/Production
mlflow_tracking_uri: *mlflow_tracking_uri
deploy_mode: MLFLOW
port: 7001