diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 8059d24..5069dfe 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -245,22 +245,20 @@ stages: displayName: 'Import ML Batch Inference script' - script: | - id echo $HOME - echo "[registry]" > $HOME/.databrickscfg + ls $(Build.Repository.LocalPath)/libraries/python/dbxdemo/ + echo >> $HOME/.databrickscfg + echo "[registry]" >> $HOME/.databrickscfg echo "host=$(WORKSPACE_REGION_URL)" >> $HOME/.databrickscfg echo "token=$(DATABRICKS_TOKEN)" >> $HOME/.databrickscfg cat /home/vsts/.databrickscfg mkdir -p /home/vsts/mlflow/$(MODEL_NAME)/artifacts - displayName: 'Configure mlflow client connection to mlflow global (per org) model registry' - - - task: PythonScript@0 - inputs: - scriptSource: 'filePath' - scriptPath: '$(Build.Repository.LocalPath)/cicd-scripts/remote_registry_mlflow.py' - arguments: '--output_local_path=/home/vsts/mlflow/$(MODEL_NAME)/artifacts --model $(MODEL_NAME)' + python $(Build.Repository.LocalPath)/cicd-scripts/remote_registry_mlflow.py --output_local_path=/home/vsts/mlflow/$(MODEL_NAME)/artifacts --model $(MODEL_NAME) + ls -latr /home/vsts/mlflow/$(MODEL_NAME)/artifacts + dbfs cp --recursive --overwrite /home/vsts/mlflow/$(MODEL_NAME)/artifacts/ dbfs:/FileStore/Shared/db-automation/mlflow/$(MODEL_NAME)/model/ --profile AZDO displayName: 'Retrieve artifacts from the mlflow global (per org) model registry to use them in Databricks Staging' + - task: liprec.vsts-publish-adf.deploy-adf-json.deploy-adf-json@2 displayName: 'Deploy Data Pipeline to $(STAGING_ADF_NAME) ADF' inputs: @@ -276,7 +274,7 @@ stages: scriptType: bash scriptLocation: inlineScript inlineScript: | - python $(Build.Repository.LocalPath)/cicd-scripts/adf_pipeline_run.py -r $(RESOURCE_GROUP) -a $(STAGING_ADF_NAME) -p $(STAGING_ADF_PIPELINE_NAME) -o ./logs/json -pa "{\"environment\":\"-s staging\"}" + python $(Build.Repository.LocalPath)/cicd-scripts/adf_pipeline_run.py -r $(RESOURCE_GROUP) -a $(STAGING_ADF_NAME) -p $(STAGING_ADF_PIPELINE_NAME) -o ./logs/json -pa "{\"environment\":\"--stage=staging\", \"model_path\":\"--model_path=/dbfs/FileStore/Shared/db-automation/mlflow/$(MODEL_NAME)\"}" useGlobalConfig: true timeoutInMinutes: 10 @@ -348,20 +346,17 @@ stages: displayName: 'Import ML Batch Inference script' - script: | - id echo $HOME - echo "[registry]" > $HOME/.databrickscfg + ls $(Build.Repository.LocalPath)/libraries/python/dbxdemo/ + echo >> $HOME/.databrickscfg + echo "[registry]" >> $HOME/.databrickscfg echo "host=$(WORKSPACE_REGION_URL)" >> $HOME/.databrickscfg echo "token=$(DATABRICKS_TOKEN)" >> $HOME/.databrickscfg cat /home/vsts/.databrickscfg mkdir -p /home/vsts/mlflow/$(MODEL_NAME)/artifacts - displayName: 'Configure mlflow client connection to global mlflow global (per org) model registry' - - - task: PythonScript@0 - inputs: - scriptSource: 'filePath' - scriptPath: '$(Build.Repository.LocalPath)/cicd-scripts/remote_registry_mlflow.py' - arguments: '--output_local_path=/home/vsts/mlflow/$(MODEL_NAME)/artifacts --model $(MODEL_NAME)' + python $(Build.Repository.LocalPath)/cicd-scripts/remote_registry_mlflow.py --output_local_path=/home/vsts/mlflow/$(MODEL_NAME)/artifacts --model $(MODEL_NAME) + ls -latr /home/vsts/mlflow/$(MODEL_NAME)/artifacts + dbfs cp --recursive --overwrite /home/vsts/mlflow/$(MODEL_NAME)/artifacts/ dbfs:/FileStore/Shared/db-automation/mlflow/$(MODEL_NAME)/model/ --profile AZDO displayName: 'Retrieve artifacts from the mlflow global (per org) model registry to use them in Databricks Production' - task: liprec.vsts-publish-adf.deploy-adf-json.deploy-adf-json@2 diff --git a/cicd-scripts/remote_registry_mlflow.py b/cicd-scripts/remote_registry_mlflow.py index 5320710..dc84aa9 100644 --- a/cicd-scripts/remote_registry_mlflow.py +++ b/cicd-scripts/remote_registry_mlflow.py @@ -11,6 +11,7 @@ from mlflow.utils.string_utils import strip_prefix from mlflow.exceptions import MlflowException from mlflow.tracking import artifact_utils +from mlflow.tracking import MlflowClient def _get_dbfs_endpoint(artifact_uri, artifact_path): @@ -64,11 +65,13 @@ def main(): parser = argparse.ArgumentParser(description="Execute python scripts in Databricks") parser.add_argument("-o", "--output_local_path", help="Output path where the artifacts will be written", required=True) parser.add_argument("-m", "--model_name", help="Model Registry Name", required=True) + parser.add_argument("-s", "--stage", help="Stage", default="staging", required=False) args = parser.parse_args() model_name = args.model_name output_local_path = args.output_local_path + stage = args.stage cli_profile_name = "registry" # TODO: Document that we assume that the registry profile will be created in the local machine: @@ -78,11 +81,11 @@ def main(): TRACKING_URI = f"databricks://{cli_profile_name}" print(f"TRACKING_URI: {TRACKING_URI}") artifact_path = 'model' - from mlflow.tracking import MlflowClient + remote_client = MlflowClient(tracking_uri=TRACKING_URI) mlflow.set_tracking_uri(TRACKING_URI) # client = mlflow.tracking.MlflowClient() - latest_model = remote_client.get_latest_versions(name=model_name, stages=["staging"]) + latest_model = remote_client.get_latest_versions(name=model_name, stages=[stage]) print(f"Latest Model: {latest_model}") run_id = latest_model[0].run_id artifact_uri = artifact_utils.get_artifact_uri(run_id) @@ -103,7 +106,6 @@ def main(): # remote_client = MlflowClient(tracking_uri=TRACKING_URI) - if __name__ == '__main__': main() diff --git a/pipeline/ML/inference/batch_model.py b/pipeline/ML/inference/batch_model.py index f1e22d8..34b747d 100644 --- a/pipeline/ML/inference/batch_model.py +++ b/pipeline/ML/inference/batch_model.py @@ -3,6 +3,8 @@ import requests import mlflow import mlflow.sklearn +from mlflow.tracking import MlflowClient +from mlflow.tracking import artifact_utils from mlflow import pyfunc import json from pyspark.sql.functions import col @@ -39,20 +41,24 @@ def main(): parser.add_argument( "-t", "--table_name", help="Output Table name", default="mlops_wine_quality_regression", required=False) - # parser.add_argument("-p", "--phase", help="Phase", default="qa", required=True) + parser.add_argument( + "-p", "--model_path", help="Model's artifacts path", + default="/dbfs/FileStore/Shared/db-automation/mlflow/wine-model", required=True) args = parser.parse_args() model_name = args.model_name home = args.root_path - stage = args.stage + stage = args.stage.replace(" ", "") db = args.db_name.replace("@", "_").replace(".", "_") ml_output_predictions_table = args.table_name + model_path = args.model_path print(f"Model name: {model_name}") print(f"home: {home}") print(f"stage: {stage}") print(f"db: {db}") print(f"ml_output_predictions_table: {ml_output_predictions_table}") + print(f"model_path: {model_path}") print("batch_inference") temp_data_path = f"/dbfs/tmp/mlflow-wine-quality.csv" @@ -61,8 +67,25 @@ def main(): wine_df = spark.read.format("csv").option("header", "true").load(dbfs_wine_data_path).drop("quality").cache() wine_df = wine_df.select(*(col(column).cast("float").alias(column.replace(" ", "_")) for column in wine_df.columns)) data_spark = wine_df + model_artifact = 'model' + artifact_uri = model_path + print(f"artifact_uri: {artifact_uri}") + model_uri = f"{artifact_uri}/{model_artifact}" + print(f"model_uri: {model_uri}") + udf = pyfunc.spark_udf(spark, model_uri) + + # data_spark = spark.read.csv(dbfs_wine_data_path, header=True) + predictions = data_spark.select(udf(*data_spark.columns).alias('prediction'), "*") + spark.sql(f"CREATE DATABASE IF NOT EXISTS {db}") + spark.sql(f"DROP TABLE IF EXISTS {db}.{ml_output_predictions_table}") + predictions.write.format("delta").mode("overwrite").saveAsTable(f"{db}.{ml_output_predictions_table}") + output = json.dumps({ + "model_name": model_name, + "model_uri": model_uri + }) + print(output) if __name__ == '__main__': diff --git a/resources/adf/pipeline/data_ml_pipeline.json b/resources/adf/pipeline/data_ml_pipeline.json index 803c70f..bad13d1 100644 --- a/resources/adf/pipeline/data_ml_pipeline.json +++ b/resources/adf/pipeline/data_ml_pipeline.json @@ -79,8 +79,11 @@ "typeProperties": { "pythonFile": "dbfs:/FileStore/Shared/db-automation/ML/batch_model.py", "parameters": [ - "-m mlops-wine-model -o -r dbfs:/FileStore/Shared/db-automation -t test_table", - "@pipeline().parameters.environment" + "--model_name=mlops-wine-model", + "@pipeline().parameters.environment", + "--root_path=dbfs:/FileStore/Shared/db-automation", + "--table_name=wine_output_table_1", + "@pipeline().parameters.model_path" ] }, "linkedServiceName": { @@ -92,7 +95,11 @@ "parameters": { "environment": { "type": "string", - "defaultValue": "-s test" + "defaultValue": "--stage=test" + }, + "model_path": { + "type": "string", + "defaultValue": "--model_path=/dbfs/FileStore/Shared/db-automation/mlflow/mlops-wine-model" } }, "annotations": []