From 6a728fe66db0286ea10301d1fe693d6dcba4e4f4 Mon Sep 17 00:00:00 2001 From: Nicholas Zeolla Date: Wed, 13 Sep 2023 20:17:40 +0100 Subject: [PATCH] feat: Add support for `table_create_disposition` in bigquery job for offline store (#3762) * Add bigquery table create disposition to offline store Signed-off-by: Nick Zeolla * linting Signed-off-by: Nick Zeolla --------- Signed-off-by: Nick Zeolla --- sdk/python/feast/infra/offline_stores/bigquery.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 5913b60f62..86c587c7fd 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -19,7 +19,7 @@ import pandas as pd import pyarrow import pyarrow.parquet -from pydantic import StrictStr, validator +from pydantic import ConstrainedStr, StrictStr, validator from pydantic.typing import Literal from tenacity import Retrying, retry_if_exception_type, stop_after_delay, wait_fixed @@ -72,6 +72,13 @@ def get_http_client_info(): return http_client_info.ClientInfo(user_agent=get_user_agent()) +class BigQueryTableCreateDisposition(ConstrainedStr): + """Custom constraint for table_create_disposition. To understand more, see: + https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad.FIELDS.create_disposition""" + + values = {"CREATE_NEVER", "CREATE_IF_NEEDED"} + + class BigQueryOfflineStoreConfig(FeastConfigBaseModel): """Offline store config for GCP BigQuery""" @@ -95,6 +102,9 @@ class BigQueryOfflineStoreConfig(FeastConfigBaseModel): gcs_staging_location: Optional[str] = None """ (optional) GCS location used for offloading BigQuery results as parquet files.""" + table_create_disposition: Optional[BigQueryTableCreateDisposition] = None + """ (optional) Specifies whether the job is allowed to create new tables. The default value is CREATE_IF_NEEDED.""" + @validator("billing_project_id") def project_id_exists(cls, v, values, **kwargs): if v and not values["project_id"]: @@ -324,6 +334,7 @@ def write_logged_features( job_config = bigquery.LoadJobConfig( source_format=bigquery.SourceFormat.PARQUET, schema=arrow_schema_to_bq_schema(source.get_schema(registry)), + create_disposition=config.offline_store.table_create_disposition, time_partitioning=bigquery.TimePartitioning( type_=bigquery.TimePartitioningType.DAY, field=source.get_log_timestamp_column(), @@ -384,6 +395,7 @@ def offline_write_batch( job_config = bigquery.LoadJobConfig( source_format=bigquery.SourceFormat.PARQUET, schema=arrow_schema_to_bq_schema(pa_schema), + create_disposition=config.offline_store.table_create_disposition, write_disposition="WRITE_APPEND", # Default but included for clarity )