Skip to content

Commit

Permalink
Merge pull request #30 from MiguelPeralvo/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
MiguelPeralvo authored Jun 21, 2020
2 parents 2284b21 + 4d54743 commit 31ee0c4
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 27 deletions.
33 changes: 14 additions & 19 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -245,22 +245,20 @@ stages:
displayName: 'Import ML Batch Inference script'
- script: |
id
echo $HOME
echo "[registry]" > $HOME/.databrickscfg
ls $(Build.Repository.LocalPath)/libraries/python/dbxdemo/
echo >> $HOME/.databrickscfg
echo "[registry]" >> $HOME/.databrickscfg
echo "host=$(WORKSPACE_REGION_URL)" >> $HOME/.databrickscfg
echo "token=$(DATABRICKS_TOKEN)" >> $HOME/.databrickscfg
cat /home/vsts/.databrickscfg
mkdir -p /home/vsts/mlflow/$(MODEL_NAME)/artifacts
displayName: 'Configure mlflow client connection to mlflow global (per org) model registry'
- task: PythonScript@0
inputs:
scriptSource: 'filePath'
scriptPath: '$(Build.Repository.LocalPath)/cicd-scripts/remote_registry_mlflow.py'
arguments: '--output_local_path=/home/vsts/mlflow/$(MODEL_NAME)/artifacts --model $(MODEL_NAME)'
python $(Build.Repository.LocalPath)/cicd-scripts/remote_registry_mlflow.py --output_local_path=/home/vsts/mlflow/$(MODEL_NAME)/artifacts --model $(MODEL_NAME)
ls -latr /home/vsts/mlflow/$(MODEL_NAME)/artifacts
dbfs cp --recursive --overwrite /home/vsts/mlflow/$(MODEL_NAME)/artifacts/ dbfs:/FileStore/Shared/db-automation/mlflow/$(MODEL_NAME)/model/ --profile AZDO
displayName: 'Retrieve artifacts from the mlflow global (per org) model registry to use them in Databricks Staging'
- task: liprec.vsts-publish-adf.deploy-adf-json.deploy-adf-json@2
displayName: 'Deploy Data Pipeline to $(STAGING_ADF_NAME) ADF'
inputs:
Expand All @@ -276,7 +274,7 @@ stages:
scriptType: bash
scriptLocation: inlineScript
inlineScript: |
python $(Build.Repository.LocalPath)/cicd-scripts/adf_pipeline_run.py -r $(RESOURCE_GROUP) -a $(STAGING_ADF_NAME) -p $(STAGING_ADF_PIPELINE_NAME) -o ./logs/json -pa "{\"environment\":\"-s staging\"}"
python $(Build.Repository.LocalPath)/cicd-scripts/adf_pipeline_run.py -r $(RESOURCE_GROUP) -a $(STAGING_ADF_NAME) -p $(STAGING_ADF_PIPELINE_NAME) -o ./logs/json -pa "{\"environment\":\"--stage=staging\", \"model_path\":\"--model_path=/dbfs/FileStore/Shared/db-automation/mlflow/$(MODEL_NAME)\"}"
useGlobalConfig: true
timeoutInMinutes: 10

Expand Down Expand Up @@ -348,20 +346,17 @@ stages:
displayName: 'Import ML Batch Inference script'
- script: |
id
echo $HOME
echo "[registry]" > $HOME/.databrickscfg
ls $(Build.Repository.LocalPath)/libraries/python/dbxdemo/
echo >> $HOME/.databrickscfg
echo "[registry]" >> $HOME/.databrickscfg
echo "host=$(WORKSPACE_REGION_URL)" >> $HOME/.databrickscfg
echo "token=$(DATABRICKS_TOKEN)" >> $HOME/.databrickscfg
cat /home/vsts/.databrickscfg
mkdir -p /home/vsts/mlflow/$(MODEL_NAME)/artifacts
displayName: 'Configure mlflow client connection to global mlflow global (per org) model registry'
- task: PythonScript@0
inputs:
scriptSource: 'filePath'
scriptPath: '$(Build.Repository.LocalPath)/cicd-scripts/remote_registry_mlflow.py'
arguments: '--output_local_path=/home/vsts/mlflow/$(MODEL_NAME)/artifacts --model $(MODEL_NAME)'
python $(Build.Repository.LocalPath)/cicd-scripts/remote_registry_mlflow.py --output_local_path=/home/vsts/mlflow/$(MODEL_NAME)/artifacts --model $(MODEL_NAME)
ls -latr /home/vsts/mlflow/$(MODEL_NAME)/artifacts
dbfs cp --recursive --overwrite /home/vsts/mlflow/$(MODEL_NAME)/artifacts/ dbfs:/FileStore/Shared/db-automation/mlflow/$(MODEL_NAME)/model/ --profile AZDO
displayName: 'Retrieve artifacts from the mlflow global (per org) model registry to use them in Databricks Production'
- task: liprec.vsts-publish-adf.deploy-adf-json.deploy-adf-json@2
Expand Down
8 changes: 5 additions & 3 deletions cicd-scripts/remote_registry_mlflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from mlflow.utils.string_utils import strip_prefix
from mlflow.exceptions import MlflowException
from mlflow.tracking import artifact_utils
from mlflow.tracking import MlflowClient


def _get_dbfs_endpoint(artifact_uri, artifact_path):
Expand Down Expand Up @@ -64,11 +65,13 @@ def main():
parser = argparse.ArgumentParser(description="Execute python scripts in Databricks")
parser.add_argument("-o", "--output_local_path", help="Output path where the artifacts will be written", required=True)
parser.add_argument("-m", "--model_name", help="Model Registry Name", required=True)
parser.add_argument("-s", "--stage", help="Stage", default="staging", required=False)
args = parser.parse_args()


model_name = args.model_name
output_local_path = args.output_local_path
stage = args.stage

cli_profile_name = "registry"
# TODO: Document that we assume that the registry profile will be created in the local machine:
Expand All @@ -78,11 +81,11 @@ def main():
TRACKING_URI = f"databricks://{cli_profile_name}"
print(f"TRACKING_URI: {TRACKING_URI}")
artifact_path = 'model'
from mlflow.tracking import MlflowClient

remote_client = MlflowClient(tracking_uri=TRACKING_URI)
mlflow.set_tracking_uri(TRACKING_URI)
# client = mlflow.tracking.MlflowClient()
latest_model = remote_client.get_latest_versions(name=model_name, stages=["staging"])
latest_model = remote_client.get_latest_versions(name=model_name, stages=[stage])
print(f"Latest Model: {latest_model}")
run_id = latest_model[0].run_id
artifact_uri = artifact_utils.get_artifact_uri(run_id)
Expand All @@ -103,7 +106,6 @@ def main():
# remote_client = MlflowClient(tracking_uri=TRACKING_URI)



if __name__ == '__main__':
main()

27 changes: 25 additions & 2 deletions pipeline/ML/inference/batch_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import requests
import mlflow
import mlflow.sklearn
from mlflow.tracking import MlflowClient
from mlflow.tracking import artifact_utils
from mlflow import pyfunc
import json
from pyspark.sql.functions import col
Expand Down Expand Up @@ -39,20 +41,24 @@ def main():
parser.add_argument(
"-t", "--table_name", help="Output Table name", default="mlops_wine_quality_regression",
required=False)
# parser.add_argument("-p", "--phase", help="Phase", default="qa", required=True)
parser.add_argument(
"-p", "--model_path", help="Model's artifacts path",
default="/dbfs/FileStore/Shared/db-automation/mlflow/wine-model", required=True)

args = parser.parse_args()
model_name = args.model_name
home = args.root_path
stage = args.stage
stage = args.stage.replace(" ", "")
db = args.db_name.replace("@", "_").replace(".", "_")
ml_output_predictions_table = args.table_name
model_path = args.model_path

print(f"Model name: {model_name}")
print(f"home: {home}")
print(f"stage: {stage}")
print(f"db: {db}")
print(f"ml_output_predictions_table: {ml_output_predictions_table}")
print(f"model_path: {model_path}")
print("batch_inference")

temp_data_path = f"/dbfs/tmp/mlflow-wine-quality.csv"
Expand All @@ -61,8 +67,25 @@ def main():
wine_df = spark.read.format("csv").option("header", "true").load(dbfs_wine_data_path).drop("quality").cache()
wine_df = wine_df.select(*(col(column).cast("float").alias(column.replace(" ", "_")) for column in wine_df.columns))
data_spark = wine_df
model_artifact = 'model'
artifact_uri = model_path
print(f"artifact_uri: {artifact_uri}")
model_uri = f"{artifact_uri}/{model_artifact}"
print(f"model_uri: {model_uri}")
udf = pyfunc.spark_udf(spark, model_uri)

# data_spark = spark.read.csv(dbfs_wine_data_path, header=True)
predictions = data_spark.select(udf(*data_spark.columns).alias('prediction'), "*")

spark.sql(f"CREATE DATABASE IF NOT EXISTS {db}")
spark.sql(f"DROP TABLE IF EXISTS {db}.{ml_output_predictions_table}")
predictions.write.format("delta").mode("overwrite").saveAsTable(f"{db}.{ml_output_predictions_table}")
output = json.dumps({
"model_name": model_name,
"model_uri": model_uri
})

print(output)


if __name__ == '__main__':
Expand Down
13 changes: 10 additions & 3 deletions resources/adf/pipeline/data_ml_pipeline.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,11 @@
"typeProperties": {
"pythonFile": "dbfs:/FileStore/Shared/db-automation/ML/batch_model.py",
"parameters": [
"-m mlops-wine-model -o -r dbfs:/FileStore/Shared/db-automation -t test_table",
"@pipeline().parameters.environment"
"--model_name=mlops-wine-model",
"@pipeline().parameters.environment",
"--root_path=dbfs:/FileStore/Shared/db-automation",
"--table_name=wine_output_table_1",
"@pipeline().parameters.model_path"
]
},
"linkedServiceName": {
Expand All @@ -92,7 +95,11 @@
"parameters": {
"environment": {
"type": "string",
"defaultValue": "-s test"
"defaultValue": "--stage=test"
},
"model_path": {
"type": "string",
"defaultValue": "--model_path=/dbfs/FileStore/Shared/db-automation/mlflow/mlops-wine-model"
}
},
"annotations": []
Expand Down

0 comments on commit 31ee0c4

Please sign in to comment.