Skip to content
This repository has been archived by the owner on Jul 15, 2024. It is now read-only.

Using secrets to authenticate to GCP #53

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,10 @@ dask-worker-space/

# Local testing notebook
test.ipynb
test2.ipynb

# Local temp files
temp/

# Prefect token
prefect_cloud_token.txt
# Prefect secrets
secrets/
65 changes: 52 additions & 13 deletions nl_open_data/flows/cbs/register_statline_bq_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@

from datetime import datetime

# from box import Box
from prefect import task, Flow, unmapped, Parameter
from prefect.client import Secret
from prefect.executors import DaskExecutor
from statline_bq.utils import (
check_gcp_env,
Expand All @@ -37,7 +37,7 @@
bq_update_main_table_col_descriptions,
)

from nl_open_data.tasks import upper, lower, remove_dir, skip_task
from nl_open_data.tasks import get_gcp_credentials, upper, lower, remove_dir, skip_task

# Converting statline-bq functions to tasks
check_gcp_env = task(check_gcp_env)
Expand Down Expand Up @@ -82,14 +82,21 @@
force : bool, default = False
If set to True, processes datasets, even if Modified dates are
identical in source and target locations.

service_account_info : str (JSON)
A service account info from GCP provided as a JSON string, such
as the one required by:
google.oauth2.service_account.Credentials.from_service_account_info()
"""

ids = Parameter("ids")
source = Parameter("source", default="cbs")
third_party = Parameter("third_party", default=False)
gcp_env = Parameter("gcp_env", default="dev")
force = Parameter("force", default=False)
service_account_info = Parameter("service_account_info")

gcp_credentials = get_gcp_credentials(service_account_info)
ids = upper.map(
ids
) # TODO: Do we need a different variable name here (ids_upper = ...)?
Expand All @@ -101,7 +108,11 @@
)
source_metas = get_metadata_cbs.map(urls=urls, odata_version=odata_versions)
gcp_metas = get_metadata_gcp.map(
id=ids, source=unmapped(source), odata_version=odata_versions, gcp=unmapped(gcp)
id=ids,
source=unmapped(source),
odata_version=odata_versions,
gcp=unmapped(gcp),
credentials=unmapped(gcp_credentials),
) # TODO: skip if force=True
cbs_modifieds = get_from_meta.map(meta=source_metas, key=unmapped("Modified"))
gcp_modifieds = get_gcp_modified.map(
Expand Down Expand Up @@ -154,6 +165,7 @@
id=ids,
config=unmapped(config),
gcp_env=unmapped(gcp_env),
credentials=unmapped(gcp_credentials),
upstream_tasks=[files_parquet, col_desc_files, go_nogo],
)
file_names = get_file_names.map(files_parquet, upstream_tasks=[go_nogo],)
Expand All @@ -166,6 +178,7 @@
gcs_folder=gcs_folders,
file_names=file_names,
gcp_env=unmapped(gcp_env),
credentials=unmapped(gcp_credentials),
upstream_tasks=[gcs_folders, go_nogo],
)
desc_dicts = get_col_descs_from_gcs.map(
Expand All @@ -175,14 +188,16 @@
config=unmapped(config),
gcp_env=unmapped(gcp_env),
gcs_folder=gcs_folders,
credentials=unmapped(gcp_credentials),
upstream_tasks=[gcs_folders, go_nogo],
)
bq_updates = bq_update_main_table_col_descriptions.map(
dataset_ref=dataset_refs,
descriptions=desc_dicts,
config=unmapped(config),
gcp_env=unmapped(gcp_env),
upstream_tasks=[desc_dicts, go_nogo],
credentials=unmapped(gcp_credentials),
upstream_tasks=[desc_dicts, go_nogo], # TODO: Add dataset_refs as upstream?
)
remove = remove_dir.map(
pq_dir, upstream_tasks=[gcs_folders]
Expand All @@ -192,22 +207,46 @@
if __name__ == "__main__":
# Register flow
statline_flow.executor = DaskExecutor()
print("Registration Output")
print("------------------------")
flow_id = statline_flow.register(
project_name="nl_open_data", version_group_id="statline_bq"
)
print(f" └── Registered on: {datetime.today()}")

"""
Output last registration
------------------------
Flow URL: https://cloud.prefect.io/dataverbinders/flow/eef07631-c5d3-4313-9b2c-41b1e8d180a8
└── ID: 2dedcace-27ec-42b9-8be7-dcdd954078e4
└── Project: nl_open_data
└── Labels: ['tud0029822']
└── Registered on: 2021-01-12 14:52:31.387941
"""
########################

# Run locally
# ids = ["83583ned"]
# ids = ["83583NED", "83765NED", "84799NED", "84583NED", "84286NED"]
# state = statline_flow.run(parameters={"ids": ids, "force": False})

########################

# Schedule flow-run on prefect cloud
from prefect import Client

STATLINE_VERSION_GROUP_ID = "statline_bq"

TENANT_SLUG = "dataverbinders"
DATA = ["83583NED"]
SOURCE = "cbs"
THIRD_PARTY = False
GCP_ENV = "dev"
FORCE = False
SERVICE_ACCOUNT_INFO = Secret("GCP_CREDENTIALS").get()

client = Client() # Local api key has been stored previously
client.login_to_tenant(tenant_slug=TENANT_SLUG) # For user-scoped API token

statline_parameters = {
"ids": DATA,
"source": SOURCE,
"third_party": THIRD_PARTY,
"gcp_env": GCP_ENV,
"force": FORCE,
"service_account_info": SERVICE_ACCOUNT_INFO,
}
flow_run_id = client.create_flow_run(
version_group_id=STATLINE_VERSION_GROUP_ID, parameters=statline_parameters
)
16 changes: 16 additions & 0 deletions nl_open_data/flows/cbs/register_temp_secrets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from nl_open_data.config import config
from prefect import Flow, task, Parameter

from nl_open_data.utils import check_bq_dataset
from nl_open_data.tasks import get_gcp_credentials

check_bq_dataset = task(check_bq_dataset)

with Flow("test_secrets") as flow:
service_account_info = Parameter("service_account_info")
gcp_credentials = get_gcp_credentials(service_account_info)
check_bq_dataset(
dataset_id="83583NED", gcp=config.gcp.dev, credentials=gcp_credentials
)

flow.register("nl_open_data", version_group_id="test_secrets")
9 changes: 7 additions & 2 deletions nl_open_data/flows/cbs/register_zip_csv_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@
"bq_dataset_description", default=None
) # TODO: implement
source = Parameter("source", required=False)
service_account_info = Parameter("service_account_info")

gcp_credentials = nlt.get_gcp_credentials(service_account_info)

filename = nlt.get_filename_from_url(url)
filepath = local_folder / nlt.path_wrap(filename)
Expand All @@ -74,6 +77,7 @@
gcs_folder=unmapped(gcs_folder),
config=unmapped(config),
gcp_env=unmapped(gcp_env),
credentials=unmapped(gcp_credentials),
upstream_tasks=[pq_files],
)
tables = nlt.gcs_to_bq(
Expand All @@ -82,6 +86,7 @@
config=config,
gcp_env=gcp_env,
source=source,
credentials=unmapped(gcp_credentials),
description=bq_dataset_description,
upstream_tasks=[gcs_ids],
)
Expand All @@ -102,8 +107,8 @@
------------------------
Result check: OK
Flow URL: https://cloud.prefect.io/dataverbinders/flow/24e3c567-88c7-4a6e-8333-72a9cd1abebd
└── ID: b91257e3-7c63-468c-9460-c7403e602a0a
└── ID: dfe6b519-e20d-4de8-8188-8bf195168d83
└── Project: nl_open_data
└── Labels: ['tud0029822']
└── Registered on: 2021-01-12 18:27:56.586313
└── Registered on: 2021-01-26 12:58:09.831253
"""
5 changes: 5 additions & 0 deletions nl_open_data/flows/cbs/run_mlz.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@

[^mlz]: https://mlzopendata.cbs.nl/#/MLZ/nl/
"""
# the config object must be imported from config.py before any Prefect imports
from nl_open_data.config import config

from prefect import Client
from prefect.client import Secret

VERSION_GROUP_ID = "statline_bq"

Expand All @@ -15,6 +18,7 @@
THIRD_PARTY = True
GCP_ENV = "dev"
FORCE = False
SERVICE_ACCOUNT_INFO = Secret("GCP_CREDENTIALS").get()

client = Client() # Local api key has been stored previously
client.login_to_tenant(tenant_slug=TENANT_SLUG) # For user-scoped API token
Expand All @@ -24,6 +28,7 @@
"third_party": THIRD_PARTY,
"gcp_env": GCP_ENV,
"force": FORCE,
"service_account_info": SERVICE_ACCOUNT_INFO,
}
flow_run_id = client.create_flow_run(
version_group_id=VERSION_GROUP_ID, parameters=parameters
Expand Down
14 changes: 10 additions & 4 deletions nl_open_data/flows/cbs/run_regionaal.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,19 @@
"""
from pathlib import Path

# the config object must be imported from config.py before any Prefect imports
from nl_open_data.config import config

from prefect import Client
from prefect.client import Secret

STATLINE_VERSION_GROUP_ID = "statline_bq"
ZIP_VERSION_GROUP_ID = "zipped_csv"

TENANT_SLUG = "dataverbinders"
ODATA_REGIONAAL = [ # TODO: check datasets, add and organize
# Regionale kerncijfers Nederland
"70072NED"
"70072NED",
# Kerncijfers wijken en buurten
"84583NED", # 2019
"84286NED", # 2018
Expand All @@ -44,6 +47,7 @@
THIRD_PARTY = False
GCP_ENV = "dev"
FORCE = False
SERVICE_ACCOUNT_INFO = Secret("GCP_CREDENTIALS").get()

client = Client() # Local api key has been stored previously
client.login_to_tenant(tenant_slug=TENANT_SLUG) # For user-scoped API token
Expand All @@ -55,10 +59,11 @@
"third_party": THIRD_PARTY,
"gcp_env": GCP_ENV,
"force": FORCE,
"service_account_info": SERVICE_ACCOUNT_INFO,
}
# flow_run_id = client.create_flow_run(
# version_group_id=STATLINE_VERSION_GROUP_ID, parameters=statline_parameters
# )
flow_run_id = client.create_flow_run(
version_group_id=STATLINE_VERSION_GROUP_ID, parameters=statline_parameters
)

####################

Expand All @@ -82,6 +87,7 @@
"bq_dataset_name": BQ_DATASET_NAME,
"bq_dataset_description": BQ_DATASET_DESCRIPTION,
"source": SOURCE,
"service_account_info": SERVICE_ACCOUNT_INFO,
}

flow_run_id = client.create_flow_run(
Expand Down
5 changes: 5 additions & 0 deletions nl_open_data/flows/cbs/run_rivm.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@

[^rivm]: https://statline.rivm.nl/#/RIVM/nl/dataset/50052NED/table?ts=1589622516137
"""
# the config object must be imported from config.py before any Prefect imports
from nl_open_data.config import config

from prefect import Client
from prefect.client import Secret

VERSION_GROUP_ID = "statline_bq"

Expand All @@ -17,6 +20,7 @@
THIRD_PARTY = True
GCP_ENV = "dev"
FORCE = False
SERVICE_ACCOUNT_INFO = Secret("GCP_CREDENTIALS").get()

client = Client() # Local api key has been stored previously
client.login_to_tenant(tenant_slug=TENANT_SLUG) # For user-scoped API token
Expand All @@ -26,6 +30,7 @@
"third_party": THIRD_PARTY,
"gcp_env": GCP_ENV,
"force": FORCE,
"service_account_info": SERVICE_ACCOUNT_INFO,
}
flow_run_id = client.create_flow_run(
version_group_id=VERSION_GROUP_ID, parameters=parameters
Expand Down
15 changes: 15 additions & 0 deletions nl_open_data/flows/cbs/run_temp_secrets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from nl_open_data.config import config

from prefect import Client
from prefect.client import Secret

TENANT_SLUG = "dataverbinders"
GCP_SERVICE_ACCOUNT_INFO = Secret("GCP_CREDENTIALS").get()

client = Client() # Local api key has been stored previously
client.login_to_tenant(tenant_slug=TENANT_SLUG) # For user-scoped API token

client.create_flow_run(
version_group_id="test_secrets",
parameters={"service_account_info": GCP_SERVICE_ACCOUNT_INFO},
)
Loading