Skip to content

Commit

Permalink
Fix list feature format for BigQuery offline datasources. (#1889)
Browse files Browse the repository at this point in the history
* Enable test for list features in BQ

Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com>

* Remove specifc handling of BQ list types

Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com>

* Enable Parquet list inference for BigQuery

Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com>

* Enable `use_compliant_nested_type`

Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com>

* Add potentially missing import

Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com>

* Upload all data to BQ in ARRAY safe manner

Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com>

* Handle empty list in `python_value_to_proto_value`

Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com>
  • Loading branch information
judahrand authored Sep 21, 2021
1 parent af219e7 commit 21f1ef7
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 60 deletions.
53 changes: 42 additions & 11 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import numpy as np
import pandas as pd
import pyarrow
import pyarrow.parquet
from pydantic import StrictStr
from pydantic.typing import Literal
from tenacity import Retrying, retry_if_exception_type, stop_after_delay, wait_fixed
Expand Down Expand Up @@ -222,11 +223,8 @@ def to_bigquery(
job_config = bigquery.QueryJobConfig(destination=path)

if not job_config.dry_run and self.on_demand_feature_views is not None:
transformed_df = self.to_df()
job = self.client.load_table_from_dataframe(
transformed_df,
job_config.destination,
job_config=bigquery.LoadJobConfig(),
job = _write_pyarrow_table_to_bq(
self.client, self.to_arrow(), job_config.destination
)
job.result()
print(f"Done writing to '{job_config.destination}'.")
Expand Down Expand Up @@ -331,12 +329,7 @@ def _upload_entity_df_and_get_entity_schema(
elif isinstance(entity_df, pd.DataFrame):
# Drop the index so that we dont have unnecessary columns
entity_df.reset_index(drop=True, inplace=True)

# Upload the dataframe into BigQuery, creating a temporary table
job_config = bigquery.LoadJobConfig()
job = client.load_table_from_dataframe(
entity_df, table_name, job_config=job_config
)
job = _write_df_to_bq(client, entity_df, table_name)
block_until_done(client, job)

entity_schema = dict(zip(entity_df.columns, entity_df.dtypes))
Expand Down Expand Up @@ -371,6 +364,44 @@ def _get_bigquery_client(project: Optional[str] = None):
return client


def _write_df_to_bq(
client: bigquery.Client, df: pd.DataFrame, table_name: str
) -> bigquery.LoadJob:
# It is complicated to get BQ to understand that we want an ARRAY<value_type>
# https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions
# https://github.com/googleapis/python-bigquery/issues/19
writer = pyarrow.BufferOutputStream()
pyarrow.parquet.write_table(
pyarrow.Table.from_pandas(df), writer, use_compliant_nested_type=True
)
return _write_pyarrow_buffer_to_bq(client, writer.getvalue(), table_name,)


def _write_pyarrow_table_to_bq(
client: bigquery.Client, table: pyarrow.Table, table_name: str
) -> bigquery.LoadJob:
# It is complicated to get BQ to understand that we want an ARRAY<value_type>
# https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions
# https://github.com/googleapis/python-bigquery/issues/19
writer = pyarrow.BufferOutputStream()
pyarrow.parquet.write_table(table, writer, use_compliant_nested_type=True)
return _write_pyarrow_buffer_to_bq(client, writer.getvalue(), table_name,)


def _write_pyarrow_buffer_to_bq(
client: bigquery.Client, buf: pyarrow.Buffer, table_name: str
) -> bigquery.LoadJob:
reader = pyarrow.BufferReader(buf)

parquet_options = bigquery.format_options.ParquetOptions()
parquet_options.enable_list_inference = True
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.PARQUET
job_config.parquet_options = parquet_options

return client.load_table_from_file(reader, table_name, job_config=job_config,)


# TODO: Optimizations
# * Use GENERATE_UUID() instead of ROW_NUMBER(), or join on entity columns directly
# * Precompute ROW_NUMBER() so that it doesn't have to be recomputed for every query on entity_dataframe
Expand Down
15 changes: 10 additions & 5 deletions sdk/python/feast/type_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,11 +277,16 @@ def _python_value_to_proto_value(feast_value_type, value) -> ProtoValue:
def python_value_to_proto_value(
value: Any, feature_type: ValueType = None
) -> ProtoValue:
value_type = (
python_type_to_feast_value_type("", value)
if value is not None
else feature_type
)
value_type = feature_type
if value is not None:
if isinstance(value, (list, np.ndarray)):
value_type = (
feature_type
if len(value) == 0
else python_type_to_feast_value_type("", value)
)
else:
value_type = python_type_to_feast_value_type("", value)
return _python_value_to_proto_value(value_type, value)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@

from feast import BigQuerySource
from feast.data_source import DataSource
from feast.infra.offline_stores.bigquery import BigQueryOfflineStoreConfig
from feast.infra.offline_stores.bigquery import (
BigQueryOfflineStoreConfig,
_write_df_to_bq,
)
from tests.integration.feature_repos.universal.data_source_creator import (
DataSourceCreator,
)
Expand Down Expand Up @@ -61,15 +64,12 @@ def create_data_source(

self.create_dataset()

job_config = bigquery.LoadJobConfig()
if self.gcp_project not in destination_name:
destination_name = (
f"{self.gcp_project}.{self.project_name}.{destination_name}"
)

job = self.client.load_table_from_dataframe(
df, destination_name, job_config=job_config
)
job = _write_df_to_bq(self.client, df, destination_name)
job.result()

self.tables.append(destination_name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
from feast.feature import Feature
from feast.feature_store import FeatureStore, _validate_feature_refs
from feast.feature_view import FeatureView
from feast.infra.offline_stores.bigquery import BigQueryOfflineStoreConfig
from feast.infra.offline_stores.bigquery import (
BigQueryOfflineStoreConfig,
_write_df_to_bq,
)
from feast.infra.offline_stores.offline_utils import (
DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL,
)
Expand Down Expand Up @@ -62,9 +65,8 @@ def stage_driver_hourly_stats_parquet_source(directory, df):

def stage_driver_hourly_stats_bigquery_source(df, table_id):
client = bigquery.Client()
job_config = bigquery.LoadJobConfig()
df.reset_index(drop=True, inplace=True)
job = client.load_table_from_dataframe(df, table_id, job_config=job_config)
job = _write_df_to_bq(client, df, table_id)
job.result()


Expand Down Expand Up @@ -99,9 +101,8 @@ def feature_service(name: str, views) -> FeatureService:

def stage_customer_daily_profile_bigquery_source(df, table_id):
client = bigquery.Client()
job_config = bigquery.LoadJobConfig()
df.reset_index(drop=True, inplace=True)
job = client.load_table_from_dataframe(df, table_id, job_config=job_config)
job = _write_df_to_bq(client, df, table_id)
job.result()


Expand Down Expand Up @@ -231,9 +232,8 @@ def get_expected_training_df(

def stage_orders_bigquery(df, table_id):
client = bigquery.Client()
job_config = bigquery.LoadJobConfig()
df.reset_index(drop=True, inplace=True)
job = client.load_table_from_dataframe(df, table_id, job_config=job_config)
job = _write_df_to_bq(client, df, table_id)
job.result()


Expand Down
37 changes: 8 additions & 29 deletions sdk/python/tests/integration/registration/test_universal_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,6 @@ def populate_test_configs(offline: bool):
# For offline tests, don't need to vary for online store
if offline and test_repo_config.online_store == REDIS_CONFIG:
continue
# TODO(https://github.com/feast-dev/feast/issues/1839): Fix BQ materialization of list features
if (
not offline
and test_repo_config.provider == "gcp"
and feature_is_list is True
):
continue
configs.append(
TypeTestConfig(
entity_type=entity_type,
Expand Down Expand Up @@ -255,16 +248,10 @@ def assert_feature_list_types(
"bool": "bool",
}
assert str(historical_features_df.dtypes["value"]) == "object"
if provider == "gcp":
assert (
feature_list_dtype_to_expected_historical_feature_list_dtype[feature_dtype]
in type(historical_features_df.value[0]["list"][0]["item"]).__name__
)
else:
assert (
feature_list_dtype_to_expected_historical_feature_list_dtype[feature_dtype]
in type(historical_features_df.value[0][0]).__name__
)
assert (
feature_list_dtype_to_expected_historical_feature_list_dtype[feature_dtype]
in type(historical_features_df.value[0][0]).__name__
)


def assert_expected_arrow_types(
Expand All @@ -287,18 +274,10 @@ def assert_expected_arrow_types(
feature_dtype
]
if feature_is_list:
if provider == "gcp":
assert str(
historical_features_arrow.schema.field_by_name("value").type
) in [
f"struct<list: list<item: struct<item: {arrow_type}>> not null>",
f"struct<list: list<item: struct<item: {arrow_type}>>>",
]
else:
assert (
str(historical_features_arrow.schema.field_by_name("value").type)
== f"list<item: {arrow_type}>"
)
assert (
str(historical_features_arrow.schema.field_by_name("value").type)
== f"list<item: {arrow_type}>"
)
else:
assert (
str(historical_features_arrow.schema.field_by_name("value").type)
Expand Down
5 changes: 2 additions & 3 deletions sdk/python/tests/utils/data_source_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from feast import BigQuerySource, FileSource
from feast.data_format import ParquetFormat
from feast.infra.offline_stores.bigquery import _write_df_to_bq


@contextlib.contextmanager
Expand Down Expand Up @@ -38,9 +39,7 @@ def simple_bq_source_using_table_ref_arg(
client.update_dataset(dataset, ["default_table_expiration_ms"])
table_ref = f"{gcp_project}.{bigquery_dataset}.table_{random.randrange(100, 999)}"

job = client.load_table_from_dataframe(
df, table_ref, job_config=bigquery.LoadJobConfig()
)
job = _write_df_to_bq(client, df, table_ref)
job.result()

return BigQuerySource(
Expand Down

0 comments on commit 21f1ef7

Please sign in to comment.