-
Notifications
You must be signed in to change notification settings - Fork 0
/
importer.py
142 lines (111 loc) · 5.54 KB
/
importer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
from google.cloud import bigquery
from typing import NoReturn
import argparse
import logging
import json
import os
import io
class Importer:
def __init__(self, project):
self.client = bigquery.Client()
self.project = project
def remove_key(self, container, key):
if type(container) is dict:
if key in container:
del container[key]
for v in container.values():
self.remove_key(v, key)
if type(container) is list:
for v in container:
self.remove_key(v, key)
def write_table_json(self, table: bigquery.Table) -> None:
"""Creates a json file contains the given BigQuery table configurations"""
f = io.StringIO("")
self.client.schema_to_json(table.schema, f)
content = {
"projectId": "{}".format(table.project),
"datasetId": "{}".format(table.dataset_id),
"tableId": "{}".format(table.table_id),
"schema": {
"fields": json.loads(f.getvalue())
}
}
self.remove_key(content['schema'], 'policyTags')
if getattr(table, "labels"):
content["labels"] = getattr(table, "labels")
if getattr(table, "time_partitioning"):
content["timePartitioning"] = {}
content["timePartitioning"]["field"] = table.time_partitioning.field
content["timePartitioning"]["type"] = table.time_partitioning.type_
if getattr(getattr(table, "time_partitioning"), "expiration_ms"):
expiration_day = table.time_partitioning.expiration_ms / 1000 / 60 / 60 / 24 # ms to day
content["timePartitioning"]["expirationDays"] = int(expiration_day)
with open(f'./bq_metadata/{self.project}/{table.dataset_id}/tables/{table.table_id}.json', 'w',
encoding='utf-8') as f:
f.write(json.dumps(content, indent=4))
def write_view_sql(self, dataset: str, table: bigquery.Table) -> None:
"""Creates a json file contains the given BigQuery table configurations"""
content = f"create or replace view `{self.project}.{dataset}.{table.table_id}` \nas\n"
content += table.view_query
with open(f'./bq_metadata/{self.project}/{dataset}/views/{table.table_id}.sql', 'w', encoding='utf-8') as f:
f.write(content)
def write_dataset_json(self, dataset: bigquery.Dataset = None) -> None:
"""Creates a json file contains dataset information"""
content = {
"location": "{}".format(dataset.location),
"projectId": "{}".format(dataset.project),
"datasetId": "{}".format(dataset.dataset_id),
"properties": {
}
}
properties = ["description", "default_table_expiration_ms", "default_partition_expiration_ms", "labels"]
for property in properties:
if getattr(dataset, property):
content["properties"][property] = getattr(dataset, property)
with open(f'./bq_metadata/{self.project}/{dataset.dataset_id}/{dataset.dataset_id}.json', 'w',
encoding='utf-8') as f:
f.write(json.dumps(content, indent=4))
def run(self) -> NoReturn:
logging.info("Importing has been started...")
# Create Project folder if not exist
if not os.path.exists(f'./bq_metadata/{self.project}'):
os.mkdir(f'./bq_metadata/{self.project}')
# Get Datasets and tables
self.client.project = self.project
datasets = list(self.client.list_datasets())
logging.info(f"There are {len(datasets)} datasets in the project {self.project}")
if datasets:
logging.info("Datasets in project {}:".format(self.project))
for dataset in datasets:
# Check schema folders if not exists create
is_dataset_folder_exists = os.path.exists(f'./bq_metadata/{self.project}/{dataset.dataset_id}')
if not is_dataset_folder_exists:
os.mkdir(f'./bq_metadata/{self.project}/{dataset.dataset_id}')
os.mkdir(f'./bq_metadata/{self.project}/{dataset.dataset_id}/views')
os.mkdir(f'./bq_metadata/{self.project}/{dataset.dataset_id}/tables')
# Create json file
self.write_dataset_json(dataset=self.client.get_dataset(dataset.dataset_id))
logging.info("{}".format(dataset.dataset_id))
# Get tables, and views in the given dataset
tables = self.client.list_tables(dataset)
for table in tables:
logging.info("\t{}".format(table.table_id))
if table.table_type == "VIEW":
self.write_view_sql(dataset.dataset_id, self.client.get_table(table))
if table.table_type == "TABLE":
self.write_table_json(table=self.client.get_table(table))
else:
logging.info("{} project does not contain any datasets.".format(self.project))
if __name__ == "__main__":
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO)
# Get Arguments
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--project', default=None, help='Project Name (default = None)')
args = parser.parse_args()
logging.info(f"Parameters: {args}")
# Path configurations
abspath = os.path.abspath(__file__)
dname = os.path.dirname(abspath)
os.chdir(dname)
importer = Importer(project=args.project)
importer.run()