From bbf3ddbc3832c01e79a8886fe0ad0e0e89e17d68 Mon Sep 17 00:00:00 2001 From: Amit Gal Date: Sun, 24 Jan 2021 17:39:49 +0100 Subject: [PATCH 1/3] authenticate temp flow w. prefect secrets --- .gitignore | 4 ++-- .../flows/cbs/register_statline_bq_flow.py | 9 ++++++--- nl_open_data/flows/cbs/register_temp_secrets.py | 16 ++++++++++++++++ nl_open_data/flows/cbs/run_regionaal.py | 6 +++--- nl_open_data/flows/cbs/run_temp_secrets.py | 14 ++++++++++++++ nl_open_data/user_config.toml | 3 +++ nl_open_data/utils.py | 10 ++++++++-- pyproject.toml | 4 ++-- 8 files changed, 54 insertions(+), 12 deletions(-) create mode 100644 nl_open_data/flows/cbs/register_temp_secrets.py create mode 100644 nl_open_data/flows/cbs/run_temp_secrets.py diff --git a/.gitignore b/.gitignore index 282149d..3be9df4 100644 --- a/.gitignore +++ b/.gitignore @@ -114,5 +114,5 @@ test.ipynb # Local temp files temp/ -# Prefect token -prefect_cloud_token.txt \ No newline at end of file +# Prefect secrets +secrets/ \ No newline at end of file diff --git a/nl_open_data/flows/cbs/register_statline_bq_flow.py b/nl_open_data/flows/cbs/register_statline_bq_flow.py index da813cb..e5d2314 100644 --- a/nl_open_data/flows/cbs/register_statline_bq_flow.py +++ b/nl_open_data/flows/cbs/register_statline_bq_flow.py @@ -13,9 +13,9 @@ from datetime import datetime -# from box import Box from prefect import task, Flow, unmapped, Parameter from prefect.executors import DaskExecutor +from prefect.client import Secret from statline_bq.utils import ( check_gcp_env, check_v4, @@ -90,6 +90,7 @@ gcp_env = Parameter("gcp_env", default="dev") force = Parameter("force", default=False) + gcp_credentials = Secret("GCP_CREDENTIALS").get() ids = upper.map( ids ) # TODO: Do we need a different variable name here (ids_upper = ...)? @@ -192,6 +193,8 @@ if __name__ == "__main__": # Register flow statline_flow.executor = DaskExecutor() + print("Output last registration") + print("------------------------") flow_id = statline_flow.register( project_name="nl_open_data", version_group_id="statline_bq" ) @@ -201,10 +204,10 @@ Output last registration ------------------------ Flow URL: https://cloud.prefect.io/dataverbinders/flow/eef07631-c5d3-4313-9b2c-41b1e8d180a8 - └── ID: 2dedcace-27ec-42b9-8be7-dcdd954078e4 + └── ID: 36aed3bf-d5ad-4863-903f-08075b77b052 └── Project: nl_open_data └── Labels: ['tud0029822'] - └── Registered on: 2021-01-12 14:52:31.387941 + └── Registered on: 2021-01-21 16:59:49.649501 """ # Run locally diff --git a/nl_open_data/flows/cbs/register_temp_secrets.py b/nl_open_data/flows/cbs/register_temp_secrets.py new file mode 100644 index 0000000..6a090f9 --- /dev/null +++ b/nl_open_data/flows/cbs/register_temp_secrets.py @@ -0,0 +1,16 @@ +from nl_open_data.config import config +from prefect import Flow, task, Parameter + +from nl_open_data.utils import check_bq_dataset, get_gcp_credentials + +check_bq_dataset = task(check_bq_dataset) +get_gcp_credentials = task(get_gcp_credentials) + +with Flow("test_secret") as flow: + json_acct_info = Parameter("json_acct_info") + gcp_credentials = get_gcp_credentials(json_acct_info) + check_bq_dataset( + dataset_id="83583NED", gcp=config.gcp.dev, credentials=gcp_credentials + ) + +flow.register("nl_open_data", version_group_id="test_secrets") diff --git a/nl_open_data/flows/cbs/run_regionaal.py b/nl_open_data/flows/cbs/run_regionaal.py index 7a832f2..3c57624 100644 --- a/nl_open_data/flows/cbs/run_regionaal.py +++ b/nl_open_data/flows/cbs/run_regionaal.py @@ -56,9 +56,9 @@ "gcp_env": GCP_ENV, "force": FORCE, } -# flow_run_id = client.create_flow_run( -# version_group_id=STATLINE_VERSION_GROUP_ID, parameters=statline_parameters -# ) +flow_run_id = client.create_flow_run( + version_group_id=STATLINE_VERSION_GROUP_ID, parameters=statline_parameters +) #################### diff --git a/nl_open_data/flows/cbs/run_temp_secrets.py b/nl_open_data/flows/cbs/run_temp_secrets.py new file mode 100644 index 0000000..3c3d6f4 --- /dev/null +++ b/nl_open_data/flows/cbs/run_temp_secrets.py @@ -0,0 +1,14 @@ +from nl_open_data.config import config + +from prefect import Client +from prefect.client import Secret + +TENANT_SLUG = "dataverbinders" +JSON_ACCT_INFO = Secret("GCP_CREDENTIALS").get() + +client = Client() # Local api key has been stored previously +client.login_to_tenant(tenant_slug=TENANT_SLUG) # For user-scoped API token + +client.create_flow_run( + version_group_id="test_secrets", parameters={"json_acct_info": JSON_ACCT_INFO}, +) diff --git a/nl_open_data/user_config.toml b/nl_open_data/user_config.toml index 924a17e..e3bcfb8 100644 --- a/nl_open_data/user_config.toml +++ b/nl_open_data/user_config.toml @@ -1,3 +1,6 @@ +[cloud] +use_local_secrets = false + [gcp] [gcp.prod] project_id = "dataverbinders" diff --git a/nl_open_data/utils.py b/nl_open_data/utils.py index 606a05a..167a76c 100644 --- a/nl_open_data/utils.py +++ b/nl_open_data/utils.py @@ -4,10 +4,16 @@ from google.cloud import storage from google.cloud import bigquery from google.cloud import exceptions +from google.oauth2 import service_account from statline_bq.config import Config, GcpProject +def get_gcp_credentials(json_acct_info): + credentials = service_account.Credentials.from_service_account_info(json_acct_info) + return credentials + + def create_dir_util(path: Union[Path, str]) -> Path: """Checks whether a path exists and is a directory, and creates it if not. @@ -42,7 +48,7 @@ def set_gcp(config: Config, gcp_env: str) -> GcpProject: return config_envs[gcp_env] -def check_bq_dataset(dataset_id: str, gcp: GcpProject) -> bool: +def check_bq_dataset(dataset_id: str, gcp: GcpProject, credentials: str = None) -> bool: """Check if dataset exists in BQ. Parameters @@ -57,7 +63,7 @@ def check_bq_dataset(dataset_id: str, gcp: GcpProject) -> bool: - True if exists, False if does not exists """ - client = bigquery.Client(project=gcp.project_id) + client = bigquery.Client(project=gcp.project_id, credentials=credentials) try: client.get_dataset(dataset_id) # Make an API request. diff --git a/pyproject.toml b/pyproject.toml index ee11af7..84381a7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,8 +18,8 @@ pyarrow = "^2.0.0" bunch = "^1.0.1" lxml = "^4.5.2" xmltodict = "^0.12.0" -# statline_bq = {git = "https://github.com/dataverbinders/statline-bq.git", branch = "main"} # takes main branch of statline-bq from github -statline_bq = {path = "/Users/tslilstrauss/Projects/statline-bq", develop = true} # takes statline-bq from a local path +statline_bq = {git = "https://github.com/dataverbinders/statline-bq.git", branch = "main"} # takes main branch of statline-bq from github +# statline_bq = {path = "/Users/tslilstrauss/Projects/statline-bq", develop = true} # takes statline-bq from a local path pandas = "^1.1.5" python-box = "^5.2.0" From faf8b5e408f526b6b9fa80b9c62d3806bf31f4ee Mon Sep 17 00:00:00 2001 From: Amit Gal Date: Mon, 25 Jan 2021 11:14:38 +0100 Subject: [PATCH 2/3] renaming parameters --- nl_open_data/flows/cbs/register_temp_secrets.py | 6 +++--- nl_open_data/flows/cbs/run_temp_secrets.py | 5 +++-- nl_open_data/utils.py | 8 ++++++-- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/nl_open_data/flows/cbs/register_temp_secrets.py b/nl_open_data/flows/cbs/register_temp_secrets.py index 6a090f9..ed6dfc4 100644 --- a/nl_open_data/flows/cbs/register_temp_secrets.py +++ b/nl_open_data/flows/cbs/register_temp_secrets.py @@ -6,9 +6,9 @@ check_bq_dataset = task(check_bq_dataset) get_gcp_credentials = task(get_gcp_credentials) -with Flow("test_secret") as flow: - json_acct_info = Parameter("json_acct_info") - gcp_credentials = get_gcp_credentials(json_acct_info) +with Flow("test_secrets") as flow: + service_account_info = Parameter("service_account_info") + gcp_credentials = get_gcp_credentials(service_account_info) check_bq_dataset( dataset_id="83583NED", gcp=config.gcp.dev, credentials=gcp_credentials ) diff --git a/nl_open_data/flows/cbs/run_temp_secrets.py b/nl_open_data/flows/cbs/run_temp_secrets.py index 3c3d6f4..c6332b6 100644 --- a/nl_open_data/flows/cbs/run_temp_secrets.py +++ b/nl_open_data/flows/cbs/run_temp_secrets.py @@ -4,11 +4,12 @@ from prefect.client import Secret TENANT_SLUG = "dataverbinders" -JSON_ACCT_INFO = Secret("GCP_CREDENTIALS").get() +GCP_SERVICE_ACCOUNT_INFO = Secret("GCP_CREDENTIALS").get() client = Client() # Local api key has been stored previously client.login_to_tenant(tenant_slug=TENANT_SLUG) # For user-scoped API token client.create_flow_run( - version_group_id="test_secrets", parameters={"json_acct_info": JSON_ACCT_INFO}, + version_group_id="test_secrets", + parameters={"service_account_info": GCP_SERVICE_ACCOUNT_INFO}, ) diff --git a/nl_open_data/utils.py b/nl_open_data/utils.py index 167a76c..58743ed 100644 --- a/nl_open_data/utils.py +++ b/nl_open_data/utils.py @@ -9,8 +9,12 @@ from statline_bq.config import Config, GcpProject -def get_gcp_credentials(json_acct_info): - credentials = service_account.Credentials.from_service_account_info(json_acct_info) +def get_gcp_credentials( + service_account_info, +): # ADD TYPE CHECKING service_account_info / Credentials + credentials = service_account.Credentials.from_service_account_info( + service_account_info + ) return credentials From d206b2b8364d1f1cf1a625216e0cf6b418c1bb64 Mon Sep 17 00:00:00 2001 From: Amit Gal Date: Mon, 25 Jan 2021 18:29:49 +0100 Subject: [PATCH 3/3] authenticate as service account using prefect.secret --- .gitignore | 1 + .../flows/cbs/register_statline_bq_flow.py | 66 ++++++++++++++----- .../flows/cbs/register_temp_secrets.py | 4 +- .../flows/cbs/register_zip_csv_flow.py | 9 ++- nl_open_data/flows/cbs/run_mlz.py | 5 ++ nl_open_data/flows/cbs/run_regionaal.py | 8 ++- nl_open_data/flows/cbs/run_rivm.py | 5 ++ nl_open_data/tasks.py | 47 +++++++++++-- nl_open_data/utils.py | 48 +++++++++----- poetry.lock | 32 ++++++--- pyproject.toml | 1 + 11 files changed, 174 insertions(+), 52 deletions(-) diff --git a/.gitignore b/.gitignore index 3be9df4..d74bd40 100644 --- a/.gitignore +++ b/.gitignore @@ -110,6 +110,7 @@ dask-worker-space/ # Local testing notebook test.ipynb +test2.ipynb # Local temp files temp/ diff --git a/nl_open_data/flows/cbs/register_statline_bq_flow.py b/nl_open_data/flows/cbs/register_statline_bq_flow.py index e5d2314..4aff664 100644 --- a/nl_open_data/flows/cbs/register_statline_bq_flow.py +++ b/nl_open_data/flows/cbs/register_statline_bq_flow.py @@ -14,8 +14,8 @@ from datetime import datetime from prefect import task, Flow, unmapped, Parameter -from prefect.executors import DaskExecutor from prefect.client import Secret +from prefect.executors import DaskExecutor from statline_bq.utils import ( check_gcp_env, check_v4, @@ -37,7 +37,7 @@ bq_update_main_table_col_descriptions, ) -from nl_open_data.tasks import upper, lower, remove_dir, skip_task +from nl_open_data.tasks import get_gcp_credentials, upper, lower, remove_dir, skip_task # Converting statline-bq functions to tasks check_gcp_env = task(check_gcp_env) @@ -82,6 +82,11 @@ force : bool, default = False If set to True, processes datasets, even if Modified dates are identical in source and target locations. + + service_account_info : str (JSON) + A service account info from GCP provided as a JSON string, such + as the one required by: + google.oauth2.service_account.Credentials.from_service_account_info() """ ids = Parameter("ids") @@ -89,8 +94,9 @@ third_party = Parameter("third_party", default=False) gcp_env = Parameter("gcp_env", default="dev") force = Parameter("force", default=False) + service_account_info = Parameter("service_account_info") - gcp_credentials = Secret("GCP_CREDENTIALS").get() + gcp_credentials = get_gcp_credentials(service_account_info) ids = upper.map( ids ) # TODO: Do we need a different variable name here (ids_upper = ...)? @@ -102,7 +108,11 @@ ) source_metas = get_metadata_cbs.map(urls=urls, odata_version=odata_versions) gcp_metas = get_metadata_gcp.map( - id=ids, source=unmapped(source), odata_version=odata_versions, gcp=unmapped(gcp) + id=ids, + source=unmapped(source), + odata_version=odata_versions, + gcp=unmapped(gcp), + credentials=unmapped(gcp_credentials), ) # TODO: skip if force=True cbs_modifieds = get_from_meta.map(meta=source_metas, key=unmapped("Modified")) gcp_modifieds = get_gcp_modified.map( @@ -155,6 +165,7 @@ id=ids, config=unmapped(config), gcp_env=unmapped(gcp_env), + credentials=unmapped(gcp_credentials), upstream_tasks=[files_parquet, col_desc_files, go_nogo], ) file_names = get_file_names.map(files_parquet, upstream_tasks=[go_nogo],) @@ -167,6 +178,7 @@ gcs_folder=gcs_folders, file_names=file_names, gcp_env=unmapped(gcp_env), + credentials=unmapped(gcp_credentials), upstream_tasks=[gcs_folders, go_nogo], ) desc_dicts = get_col_descs_from_gcs.map( @@ -176,6 +188,7 @@ config=unmapped(config), gcp_env=unmapped(gcp_env), gcs_folder=gcs_folders, + credentials=unmapped(gcp_credentials), upstream_tasks=[gcs_folders, go_nogo], ) bq_updates = bq_update_main_table_col_descriptions.map( @@ -183,7 +196,8 @@ descriptions=desc_dicts, config=unmapped(config), gcp_env=unmapped(gcp_env), - upstream_tasks=[desc_dicts, go_nogo], + credentials=unmapped(gcp_credentials), + upstream_tasks=[desc_dicts, go_nogo], # TODO: Add dataset_refs as upstream? ) remove = remove_dir.map( pq_dir, upstream_tasks=[gcs_folders] @@ -193,24 +207,46 @@ if __name__ == "__main__": # Register flow statline_flow.executor = DaskExecutor() - print("Output last registration") + print("Registration Output") print("------------------------") flow_id = statline_flow.register( project_name="nl_open_data", version_group_id="statline_bq" ) print(f" └── Registered on: {datetime.today()}") - """ - Output last registration - ------------------------ - Flow URL: https://cloud.prefect.io/dataverbinders/flow/eef07631-c5d3-4313-9b2c-41b1e8d180a8 - └── ID: 36aed3bf-d5ad-4863-903f-08075b77b052 - └── Project: nl_open_data - └── Labels: ['tud0029822'] - └── Registered on: 2021-01-21 16:59:49.649501 - """ + ######################## # Run locally # ids = ["83583ned"] # ids = ["83583NED", "83765NED", "84799NED", "84583NED", "84286NED"] # state = statline_flow.run(parameters={"ids": ids, "force": False}) + + ######################## + + # Schedule flow-run on prefect cloud + from prefect import Client + + STATLINE_VERSION_GROUP_ID = "statline_bq" + + TENANT_SLUG = "dataverbinders" + DATA = ["83583NED"] + SOURCE = "cbs" + THIRD_PARTY = False + GCP_ENV = "dev" + FORCE = False + SERVICE_ACCOUNT_INFO = Secret("GCP_CREDENTIALS").get() + + client = Client() # Local api key has been stored previously + client.login_to_tenant(tenant_slug=TENANT_SLUG) # For user-scoped API token + + statline_parameters = { + "ids": DATA, + "source": SOURCE, + "third_party": THIRD_PARTY, + "gcp_env": GCP_ENV, + "force": FORCE, + "service_account_info": SERVICE_ACCOUNT_INFO, + } + flow_run_id = client.create_flow_run( + version_group_id=STATLINE_VERSION_GROUP_ID, parameters=statline_parameters + ) diff --git a/nl_open_data/flows/cbs/register_temp_secrets.py b/nl_open_data/flows/cbs/register_temp_secrets.py index ed6dfc4..434a451 100644 --- a/nl_open_data/flows/cbs/register_temp_secrets.py +++ b/nl_open_data/flows/cbs/register_temp_secrets.py @@ -1,10 +1,10 @@ from nl_open_data.config import config from prefect import Flow, task, Parameter -from nl_open_data.utils import check_bq_dataset, get_gcp_credentials +from nl_open_data.utils import check_bq_dataset +from nl_open_data.tasks import get_gcp_credentials check_bq_dataset = task(check_bq_dataset) -get_gcp_credentials = task(get_gcp_credentials) with Flow("test_secrets") as flow: service_account_info = Parameter("service_account_info") diff --git a/nl_open_data/flows/cbs/register_zip_csv_flow.py b/nl_open_data/flows/cbs/register_zip_csv_flow.py index 8bf2bd9..fe62c3f 100644 --- a/nl_open_data/flows/cbs/register_zip_csv_flow.py +++ b/nl_open_data/flows/cbs/register_zip_csv_flow.py @@ -57,6 +57,9 @@ "bq_dataset_description", default=None ) # TODO: implement source = Parameter("source", required=False) + service_account_info = Parameter("service_account_info") + + gcp_credentials = nlt.get_gcp_credentials(service_account_info) filename = nlt.get_filename_from_url(url) filepath = local_folder / nlt.path_wrap(filename) @@ -74,6 +77,7 @@ gcs_folder=unmapped(gcs_folder), config=unmapped(config), gcp_env=unmapped(gcp_env), + credentials=unmapped(gcp_credentials), upstream_tasks=[pq_files], ) tables = nlt.gcs_to_bq( @@ -82,6 +86,7 @@ config=config, gcp_env=gcp_env, source=source, + credentials=unmapped(gcp_credentials), description=bq_dataset_description, upstream_tasks=[gcs_ids], ) @@ -102,8 +107,8 @@ ------------------------ Result check: OK Flow URL: https://cloud.prefect.io/dataverbinders/flow/24e3c567-88c7-4a6e-8333-72a9cd1abebd - └── ID: b91257e3-7c63-468c-9460-c7403e602a0a + └── ID: dfe6b519-e20d-4de8-8188-8bf195168d83 └── Project: nl_open_data └── Labels: ['tud0029822'] - └── Registered on: 2021-01-12 18:27:56.586313 + └── Registered on: 2021-01-26 12:58:09.831253 """ diff --git a/nl_open_data/flows/cbs/run_mlz.py b/nl_open_data/flows/cbs/run_mlz.py index 22dfb68..c340772 100644 --- a/nl_open_data/flows/cbs/run_mlz.py +++ b/nl_open_data/flows/cbs/run_mlz.py @@ -4,8 +4,11 @@ [^mlz]: https://mlzopendata.cbs.nl/#/MLZ/nl/ """ +# the config object must be imported from config.py before any Prefect imports from nl_open_data.config import config + from prefect import Client +from prefect.client import Secret VERSION_GROUP_ID = "statline_bq" @@ -15,6 +18,7 @@ THIRD_PARTY = True GCP_ENV = "dev" FORCE = False +SERVICE_ACCOUNT_INFO = Secret("GCP_CREDENTIALS").get() client = Client() # Local api key has been stored previously client.login_to_tenant(tenant_slug=TENANT_SLUG) # For user-scoped API token @@ -24,6 +28,7 @@ "third_party": THIRD_PARTY, "gcp_env": GCP_ENV, "force": FORCE, + "service_account_info": SERVICE_ACCOUNT_INFO, } flow_run_id = client.create_flow_run( version_group_id=VERSION_GROUP_ID, parameters=parameters diff --git a/nl_open_data/flows/cbs/run_regionaal.py b/nl_open_data/flows/cbs/run_regionaal.py index 3c57624..a765d72 100644 --- a/nl_open_data/flows/cbs/run_regionaal.py +++ b/nl_open_data/flows/cbs/run_regionaal.py @@ -9,8 +9,11 @@ """ from pathlib import Path +# the config object must be imported from config.py before any Prefect imports from nl_open_data.config import config + from prefect import Client +from prefect.client import Secret STATLINE_VERSION_GROUP_ID = "statline_bq" ZIP_VERSION_GROUP_ID = "zipped_csv" @@ -18,7 +21,7 @@ TENANT_SLUG = "dataverbinders" ODATA_REGIONAAL = [ # TODO: check datasets, add and organize # Regionale kerncijfers Nederland - "70072NED" + "70072NED", # Kerncijfers wijken en buurten "84583NED", # 2019 "84286NED", # 2018 @@ -44,6 +47,7 @@ THIRD_PARTY = False GCP_ENV = "dev" FORCE = False +SERVICE_ACCOUNT_INFO = Secret("GCP_CREDENTIALS").get() client = Client() # Local api key has been stored previously client.login_to_tenant(tenant_slug=TENANT_SLUG) # For user-scoped API token @@ -55,6 +59,7 @@ "third_party": THIRD_PARTY, "gcp_env": GCP_ENV, "force": FORCE, + "service_account_info": SERVICE_ACCOUNT_INFO, } flow_run_id = client.create_flow_run( version_group_id=STATLINE_VERSION_GROUP_ID, parameters=statline_parameters @@ -82,6 +87,7 @@ "bq_dataset_name": BQ_DATASET_NAME, "bq_dataset_description": BQ_DATASET_DESCRIPTION, "source": SOURCE, + "service_account_info": SERVICE_ACCOUNT_INFO, } flow_run_id = client.create_flow_run( diff --git a/nl_open_data/flows/cbs/run_rivm.py b/nl_open_data/flows/cbs/run_rivm.py index 071efd7..90089b8 100644 --- a/nl_open_data/flows/cbs/run_rivm.py +++ b/nl_open_data/flows/cbs/run_rivm.py @@ -4,8 +4,11 @@ [^rivm]: https://statline.rivm.nl/#/RIVM/nl/dataset/50052NED/table?ts=1589622516137 """ +# the config object must be imported from config.py before any Prefect imports from nl_open_data.config import config + from prefect import Client +from prefect.client import Secret VERSION_GROUP_ID = "statline_bq" @@ -17,6 +20,7 @@ THIRD_PARTY = True GCP_ENV = "dev" FORCE = False +SERVICE_ACCOUNT_INFO = Secret("GCP_CREDENTIALS").get() client = Client() # Local api key has been stored previously client.login_to_tenant(tenant_slug=TENANT_SLUG) # For user-scoped API token @@ -26,6 +30,7 @@ "third_party": THIRD_PARTY, "gcp_env": GCP_ENV, "force": FORCE, + "service_account_info": SERVICE_ACCOUNT_INFO, } flow_run_id = client.create_flow_run( version_group_id=VERSION_GROUP_ID, parameters=parameters diff --git a/nl_open_data/tasks.py b/nl_open_data/tasks.py index 9869c18..b816202 100644 --- a/nl_open_data/tasks.py +++ b/nl_open_data/tasks.py @@ -3,8 +3,11 @@ import os from shutil import rmtree from zipfile import ZipFile +from collections.abc import Mapping from google.cloud import storage +from google.oauth2 import service_account +from google.oauth2.credentials import Credentials from pyarrow import csv import pyarrow.parquet as pq from prefect.engine.signals import SKIP @@ -14,6 +17,14 @@ import nl_open_data.utils as nlu +@task +def get_gcp_credentials(service_account_info: dict) -> Credentials: + credentials = service_account.Credentials.from_service_account_info( + service_account_info + ) + return credentials + + @task def path_wrap(string): return Path(string) @@ -203,15 +214,36 @@ def csv_to_parquet( @task def upload_to_gcs( - to_upload: Union[str, Path], gcs_folder: str, config: Config, gcp_env: str = "dev", + to_upload: Union[str, Path], + gcs_folder: str, + config: Config, + gcp_env: str = "dev", + credentials: Credentials = None, ) -> list: + """Uploads a single file or multiple files in a single directory to Google Cloud Storage. + Parameters + ---------- + to_upload: Path + Path object to a directory containing files to be uploaded + gcs_folder: str + GCS folder (=blob) to upload the files into + config: Config + Config object holding GCP configurations + gcp_env: str + determines which GCP configuration to use from config.gcp + + Returns + ------- + gcs_folder: str + The folder (=blob) into which the tables have been uploaded # TODO -> Return success/ fail code?/job ID + """ to_upload = Path(to_upload) # Set GCP params gcp = nlu.set_gcp(config=config, gcp_env=gcp_env) gcs_folder = gcs_folder.rstrip("/") - gcs = storage.Client(project=gcp.project_id) + gcs = storage.Client(project=gcp.project_id, credentials=credentials) gcs_bucket = gcs.get_bucket(gcp.bucket) # List to return blob ids ids = [] @@ -235,6 +267,7 @@ def gcs_to_bq( dataset_name: str, config: Config = None, gcp_env: str = "dev", + credentials: Credentials = None, **kwargs, ): gcp = nlu.set_gcp(config=config, gcp_env=gcp_env) @@ -246,15 +279,17 @@ def gcs_to_bq( dataset_id = dataset_name # Check if dataset exists and delete if it does TODO: maybe delete anyway (deleting currently uses not_found_ok to ignore error if does not exist) - if nlu.check_bq_dataset(dataset_id=dataset_id, gcp=gcp): - nlu.delete_bq_dataset(dataset_id=dataset_id, gcp=gcp) + if nlu.check_bq_dataset(dataset_id=dataset_id, gcp=gcp, credentials=credentials): + nlu.delete_bq_dataset(dataset_id=dataset_id, gcp=gcp, credentials=credentials) # Create dataset and reset dataset_id to new dataset - dataset_id = nlu.create_bq_dataset(name=dataset_name, gcp=gcp, **kwargs) + dataset_id = nlu.create_bq_dataset( + name=dataset_name, gcp=gcp, **kwargs, credentials=credentials + ) # Link parquet files in GCS to tables in BQ dataset tables = nlu.link_parquet_to_bq_dataset( - gcs_folder=gcs_folder, gcp=gcp, dataset_id=dataset_id + gcs_folder=gcs_folder, gcp=gcp, dataset_id=dataset_id, credentials=credentials ) return tables diff --git a/nl_open_data/utils.py b/nl_open_data/utils.py index 58743ed..02dd06d 100644 --- a/nl_open_data/utils.py +++ b/nl_open_data/utils.py @@ -4,18 +4,20 @@ from google.cloud import storage from google.cloud import bigquery from google.cloud import exceptions -from google.oauth2 import service_account +from google.oauth2.credentials import Credentials + +# from google.oauth2 import service_account from statline_bq.config import Config, GcpProject -def get_gcp_credentials( - service_account_info, -): # ADD TYPE CHECKING service_account_info / Credentials - credentials = service_account.Credentials.from_service_account_info( - service_account_info - ) - return credentials +# def get_gcp_credentials( +# service_account_info, +# ): # ADD TYPE CHECKING service_account_info / Credentials +# credentials = service_account.Credentials.from_service_account_info( +# service_account_info +# ) +# return credentials def create_dir_util(path: Union[Path, str]) -> Path: @@ -52,7 +54,9 @@ def set_gcp(config: Config, gcp_env: str) -> GcpProject: return config_envs[gcp_env] -def check_bq_dataset(dataset_id: str, gcp: GcpProject, credentials: str = None) -> bool: +def check_bq_dataset( + dataset_id: str, gcp: GcpProject, credentials: Credentials = None +) -> bool: """Check if dataset exists in BQ. Parameters @@ -76,7 +80,9 @@ def check_bq_dataset(dataset_id: str, gcp: GcpProject, credentials: str = None) return False -def delete_bq_dataset(dataset_id: str, gcp: GcpProject) -> None: +def delete_bq_dataset( + dataset_id: str, gcp: GcpProject, credentials: Credentials = None +) -> None: """Delete an exisiting dataset from Google Big Query. If dataset does not exists, does nothing. @@ -94,16 +100,22 @@ def delete_bq_dataset(dataset_id: str, gcp: GcpProject) -> None: """ # Construct a bq client - client = bigquery.Client(project=gcp.project_id) + client = bigquery.Client(project=gcp.project_id, credentials=credentials) # Delete the dataset and its contents - client.delete_dataset(dataset_id, delete_contents=True, not_found_ok=True) + client.delete_dataset( + dataset_id, delete_contents=True, not_found_ok=True, credentials=credentials + ) return None def create_bq_dataset( - name: str, gcp: GcpProject, source: str = None, description: str = None, + name: str, + gcp: GcpProject, + source: str = None, + description: str = None, + credentials: Credentials = None, ) -> str: """Creates a dataset in Google Big Query. If dataset exists already exists, does nothing. @@ -125,7 +137,7 @@ def create_bq_dataset( """ # Construct a BigQuery client object. - client = bigquery.Client(project=gcp.project_id) + client = bigquery.Client(project=gcp.project_id, credentials=credentials) # Set dataset_id to the ID of the dataset to create. dataset_id = f"{client.project}.{source}_{name}" @@ -151,15 +163,17 @@ def create_bq_dataset( return dataset.dataset_id -def link_parquet_to_bq_dataset(gcs_folder: str, gcp: GcpProject, dataset_id: str): +def link_parquet_to_bq_dataset( + gcs_folder: str, gcp: GcpProject, dataset_id: str, credentials: Credentials = None, +): # Get blobs within gcs_folder - storage_client = storage.Client(project=gcp.project_id) + storage_client = storage.Client(project=gcp.project_id, credentials=credentials) blobs = storage_client.list_blobs(gcp.bucket, prefix=gcs_folder) names = [blob.name for blob in blobs] # Initialize client - bq_client = bigquery.Client(project=gcp.project_id) + bq_client = bigquery.Client(project=gcp.project_id, credentials=credentials) # Configure the external data source dataset_ref = bigquery.DatasetReference(gcp.project_id, dataset_id) diff --git a/poetry.lock b/poetry.lock index e3f75a5..b2b0e05 100644 --- a/poetry.lock +++ b/poetry.lock @@ -163,6 +163,14 @@ delayed = ["cloudpickle (>=0.2.2)", "toolz (>=0.8.2)"] diagnostics = ["bokeh (>=1.0.0,!=2.0.0)"] distributed = ["distributed (>=2.0)"] +[[package]] +name = "dataclasses" +version = "0.6" +description = "A backport of the dataclasses module for Python 3.6" +category = "main" +optional = false +python-versions = "*" + [[package]] name = "dataclasses-json" version = "0.5.2" @@ -984,13 +992,14 @@ python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" [[package]] name = "pyserde" -version = "0.2.1" +version = "0.2.2" description = "Serialization library on top of dataclasses." category = "main" optional = false python-versions = ">=3.6" [package.dependencies] +dataclasses = "*" jinja2 = "*" stringcase = "*" toml = {version = "*", optional = true, markers = "extra == \"toml\""} @@ -1164,12 +1173,12 @@ python-versions = "*" [[package]] name = "statline-bq" -version = "0.1.0" +version = "0.1.2" description = "Library to upload CBS open datasets into Google Cloud Platform" category = "main" optional = false python-versions = "^3.8" -develop = true +develop = false [package.dependencies] click = "^7.1.2" @@ -1179,7 +1188,6 @@ google-auth = "^1.19.2" google-cloud-bigquery = "^1.26.1" google-cloud-core = "^1.3.0" google-cloud-storage = "^1.30.0" -prefect = "^0.14.0" pyarrow = "^2.0.0" pyserde = {version = "^0.2.0", extras = ["toml"]} requests = "^2.24.0" @@ -1187,8 +1195,10 @@ toml = "^0.10.2" tomlkit = "^0.7.0" [package.source] -type = "directory" -url = "../statline-bq" +type = "git" +url = "https://github.com/dataverbinders/statline-bq.git" +reference = "main" +resolved_reference = "7dcfcfd863318e03cc09b0cbd9efe2ce39c41a4d" [[package]] name = "stringcase" @@ -1353,7 +1363,7 @@ heapdict = "*" [metadata] lock-version = "1.1" python-versions = "^3.8" -content-hash = "b688a6d27422e61af316320c27625b6f42125ff0f8948e512712a08551c268a8" +content-hash = "ef5abb95089d94a459ea35a4ab4428eb926fae8da8e4f6300ace344e931415e3" [metadata.files] appdirs = [ @@ -1452,6 +1462,10 @@ dask = [ {file = "dask-2.30.0-py3-none-any.whl", hash = "sha256:4c215aa55951f570b5a294b2ce964ed801c6efd766231c53447460e60f73ea14"}, {file = "dask-2.30.0.tar.gz", hash = "sha256:a1669022e25de99b227c3d83da4801f032415962dac431099bf0534648e41a54"}, ] +dataclasses = [ + {file = "dataclasses-0.6-py3-none-any.whl", hash = "sha256:454a69d788c7fda44efd71e259be79577822f5e3f53f029a22d08004e951dc9f"}, + {file = "dataclasses-0.6.tar.gz", hash = "sha256:6988bd2b895eef432d562370bb707d540f32f7360ab13da45340101bc2307d84"}, +] dataclasses-json = [ {file = "dataclasses-json-0.5.2.tar.gz", hash = "sha256:56ec931959ede74b5dedf65cf20772e6a79764d20c404794cce0111c88c085ff"}, {file = "dataclasses_json-0.5.2-py3-none-any.whl", hash = "sha256:b746c48d9d8e884e2a0ffa59c6220a1b21f94d4f9f12c839da0a8a0efd36dc19"}, @@ -1961,8 +1975,8 @@ pyparsing = [ {file = "pyparsing-2.4.7.tar.gz", hash = "sha256:c203ec8783bf771a155b207279b9bccb8dea02d8f0c9e5f8ead507bc3246ecc1"}, ] pyserde = [ - {file = "pyserde-0.2.1-py3-none-any.whl", hash = "sha256:891289d8b2eb67fb202ec2cccb817c9c188497a83bc21d9c0db969108048d9aa"}, - {file = "pyserde-0.2.1.tar.gz", hash = "sha256:7b4cb6b4a5c806d65ab260664357620e72ef680604f9f541e1ab7c24b777863a"}, + {file = "pyserde-0.2.2-py3-none-any.whl", hash = "sha256:bef911d6223c7aa4ad697b791e99d01b871a972876eb2cc01aad9a80f8debb4a"}, + {file = "pyserde-0.2.2.tar.gz", hash = "sha256:a132e8ffef59ac8be6012c6df5ddf13d53f8cc110bb920cb0530f339f0527423"}, ] pytest = [ {file = "pytest-5.4.3-py3-none-any.whl", hash = "sha256:5c0db86b698e8f170ba4582a492248919255fcd4c79b1ee64ace34301fb589a1"}, diff --git a/pyproject.toml b/pyproject.toml index 84381a7..332256a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,7 @@ statline_bq = {git = "https://github.com/dataverbinders/statline-bq.git", branch # statline_bq = {path = "/Users/tslilstrauss/Projects/statline-bq", develop = true} # takes statline-bq from a local path pandas = "^1.1.5" python-box = "^5.2.0" +dask = "^2.21.0" [tool.poetry.dev-dependencies] black = "^19.10b0"