-
Notifications
You must be signed in to change notification settings - Fork 0
/
run_deployment.py
95 lines (87 loc) · 3.5 KB
/
run_deployment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
from typing import cast
import logging
import click
from pipelines.deployment_pipeline import continuous_deployment_pipeline, inference_pipeline
from rich import print
from zenml.integrations.mlflow.mlflow_utils import get_tracking_uri
from zenml.integrations.mlflow.model_deployers.mlflow_model_deployer import (
MLFlowModelDeployer,
)
from zenml.integrations.mlflow.services import MLFlowDeploymentService
DEPLOY = 'deploy'
PREDICT = 'predict'
DEPLOY_AND_PREDICT = 'deploy_and_predict'
@click.command()
@click.option(
'--mode',
'-m',
type=click.Choice([DEPLOY, PREDICT, DEPLOY_AND_PREDICT]),
default=DEPLOY_AND_PREDICT,
help="Choose the mode to run the deployment.\n"
"(`deploy`) - Deploy the model\n"
"(`predict`) - Make predictions using the deployed model\n"
"(`deploy_and_predict`) - Deploy the model and make predictions using the deployed model\n"
"By default, it is set to `deploy_and_predict`"
)
@click.option(
'--min-accuracy',
default=0,
help='Minimum accuracy required for deployment'
)
def main(mode: str, min_accuracy: float) -> None:
mlflow_model_deployer_component = MLFlowModelDeployer.get_active_model_deployer()
deploy = mode == DEPLOY or mode == DEPLOY_AND_PREDICT
predict = mode == PREDICT or mode == DEPLOY_AND_PREDICT
if deploy:
# Initialize a continuous deployment pipeline run
continuous_deployment_pipeline(
min_accuracy=min_accuracy,
workers=3,
timeout=60,
)
if predict:
# Initialize an inference pipeline run
inference_pipeline(
pipeline_name="continuous_deployment_pipeline",
pipeline_step_name="mlflow_model_deployer_step",
)
print(
"You can run:\n "
f"[italic green] mlflow ui --backend-store-uri '{get_tracking_uri()}"
"[/italic green]\n ...to inspect your experiment runs within the MLflow"
" UI.\nYou can find your runs tracked within the "
"`mlflow_example_pipeline` experiment. There you'll also be able to "
"compare two or more runs.\n\n"
)
# fetch existing services with same pipeline name, step name and model name
existing_services = mlflow_model_deployer_component.find_model_server(
pipeline_name="continuous_deployment_pipeline",
pipeline_step_name="mlflow_model_deployer_step",
model_name="model",
)
print("--->", existing_services)
if existing_services:
service = cast(MLFlowDeploymentService, existing_services[0])
if service.is_running:
print(
f"The MLflow prediction server is running locally as a daemon "
f"process service and accepts inference requests at:\n"
f" {service.prediction_url}\n"
f"To stop the service, run "
f"[italic green]`zenml model-deployer models delete "
f"{str(service.uuid)}`[/italic green]."
)
elif service.is_failed:
print(
f"The MLflow prediction server is in a failed state:\n"
f" Last state: '{service.status.state.value}'\n"
f" Last error: '{service.status.last_error}'"
)
else:
print(
"No MLflow prediction server is currently running. The deployment "
"pipeline must run first to train a model and deploy it. Execute "
"the same command with the `--deploy` argument to deploy a model."
)
if __name__ == '__main__':
main()