-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathcreate_view.py
85 lines (63 loc) · 2.67 KB
/
create_view.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from google.cloud import bigquery
def _extract_json(column, feature_name):
return "JSON_EXTRACT({}, '$.{}')".format(column, feature_name)
def _cast_to_numeric(field):
return "CAST({} AS FLOAT64)".format(field)
def _add_alias(field, feature_name):
return "{} AS {}".format(field, feature_name.replace('-prod', ''))
def create_view(GOOGLE_CLOUD_PROJECT, BQ_DATASET_NAME, BQ_TABLE_NAME, PIPELINE_NAME, VERSION_NAME):
TITLE_FEATURES = ['title-prod']
LABEL_KEY = 'label_key'
SCORE_KEY = 'prediction_confidence'
FEATURE_NAMES = TITLE_FEATURES
view_name = "vw_"+BQ_TABLE_NAME+"_"+VERSION_NAME
colum_names = FEATURE_NAMES
json_features_extraction = []
for feature_name in colum_names:
field = _extract_json('instances', feature_name + '[0]')
field = _add_alias(field, feature_name)
json_features_extraction.append(field)
json_features_extraction = ', \r\n '.join(json_features_extraction)
json_prediction_extraction = []
for feature_name in [LABEL_KEY, SCORE_KEY]:
field = _extract_json('predictions', feature_name)
field = _cast_to_numeric(field)
field = _add_alias(field, feature_name)
json_prediction_extraction.append(field)
json_prediction_extraction = ', \r\n '.join(json_prediction_extraction)
sql_script = '''
CREATE OR REPLACE VIEW @dataset_name.@view_name
AS
SELECT
model,
model_version,
time,
ARRAY(
SELECT AS STRUCT
@json_features_extraction,
@json_prediction_extraction
FROM
UNNEST(JSON_EXTRACT_ARRAY(raw_prediction, "$.predictions")
) predictions WITH OFFSET AS f1
JOIN
UNNEST(JSON_EXTRACT_ARRAY(raw_data, "$.instances")) instances WITH OFFSET AS f2
ON f1=f2
) as request
FROM
@project.@dataset_name.@table_name
WHERE
model = "@model_name" AND
model_version = "@version"
'''
sql_script = sql_script.replace("@project", GOOGLE_CLOUD_PROJECT)
sql_script = sql_script.replace("@dataset_name", BQ_DATASET_NAME)
sql_script = sql_script.replace("@table_name", BQ_TABLE_NAME)
sql_script = sql_script.replace("@view_name", view_name)
sql_script = sql_script.replace("@model_name", PIPELINE_NAME.replace('-', '_'))
sql_script = sql_script.replace("@version", VERSION_NAME)
sql_script = sql_script.replace("@json_features_extraction", json_features_extraction)
sql_script = sql_script.replace("@json_prediction_extraction", json_prediction_extraction)
client = bigquery.Client(GOOGLE_CLOUD_PROJECT)
client.query(query = sql_script)
print("View was created or replaced.")
return sql_script