Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Download BigQuery table to pyarrow table for python-based ingestion flow #1366

Merged
merged 11 commits into from
Mar 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions sdk/python/feast/big_query_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,34 @@ class BigQuerySource:

def __init__(
self,
table_ref: Optional[str],
event_timestamp_column: str,
created_timestamp_column: Optional[str],
field_mapping: Optional[Dict[str, str]],
query: Optional[str],
table_ref: Optional[str] = None,
created_timestamp_column: Optional[str] = None,
field_mapping: Optional[Dict[str, str]] = None,
query: Optional[str] = None,
):
if (table_ref is None) != (query is None):
raise Exception("Exactly one of table_ref and query should be specified")
if (table_ref is None) == (query is None):
raise ValueError("Exactly one of table_ref and query should be specified")
if field_mapping is not None:
for value in field_mapping.values():
if list(field_mapping.values()).count(value) > 1:
raise ValueError(
f"Two fields cannot be mapped to the same name {value}"
)

if event_timestamp_column in field_mapping.keys():
raise ValueError(
f"The field {event_timestamp_column} is mapped to {field_mapping[event_timestamp_column]}. Please either remove this field mapping or use {field_mapping[event_timestamp_column]} as the event_timestamp_column."
)

if (
created_timestamp_column is not None
and created_timestamp_column in field_mapping.keys()
):
raise ValueError(
f"The field {created_timestamp_column} is mapped to {field_mapping[created_timestamp_column]}. Please either remove this field mapping or use {field_mapping[created_timestamp_column]} as the _timestamp_column."
)

self.table_ref = table_ref
self.event_timestamp_column = event_timestamp_column
self.created_timestamp_column = created_timestamp_column
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ class FeatureStore:
config: RepoConfig

def __init__(
self, repo_path: Optional[str], config: Optional[RepoConfig],
self, repo_path: Optional[str] = None, config: Optional[RepoConfig] = None,
):
if repo_path is not None and config is not None:
raise Exception("You cannot specify both repo_path and config")
raise ValueError("You cannot specify both repo_path and config")
if config is not None:
self.config = config
elif repo_path is not None:
Expand Down
7 changes: 7 additions & 0 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ def __init__(
inputs: BigQuerySource,
feature_start_time: datetime,
):
cols = [entity.name for entity in entities] + [feat.name for feat in features]
for col in cols:
if inputs.field_mapping is not None and col in inputs.field_mapping.keys():
raise ValueError(
f"The field {col} is mapped to {inputs.field_mapping[col]} for this data source. Please either remove this field mapping or use {inputs.field_mapping[col]} as the Entity or Feature name."
)

self.name = name
self.entities = entities
self.features = features
Expand Down
146 changes: 146 additions & 0 deletions sdk/python/feast/offline_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# Copyright 2019 The Feast Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABC, abstractmethod
from datetime import datetime
from typing import List, Optional, Tuple

import pyarrow

from feast.feature_view import FeatureView


class OfflineStore(ABC):
"""
OfflineStore is an object used for all interaction between Feast and the service used for offline storage of features. Currently BigQuery is supported.
"""

@staticmethod
@abstractmethod
def pull_latest_from_table(
feature_view: FeatureView, start_date: datetime, end_date: datetime,
) -> Optional[pyarrow.Table]:
pass


class BigQueryOfflineStore(OfflineStore):
@staticmethod
def pull_latest_from_table(
feature_view: FeatureView, start_date: datetime, end_date: datetime,
) -> pyarrow.Table:
if feature_view.inputs.table_ref is None:
raise ValueError(
"This function can only be called on a FeatureView with a table_ref"
)

(
entity_names,
feature_names,
event_timestamp_column,
created_timestamp_column,
) = run_reverse_field_mapping(feature_view)

partition_by_entity_string = ", ".join(entity_names)
if partition_by_entity_string != "":
partition_by_entity_string = "PARTITION BY " + partition_by_entity_string
timestamps = [event_timestamp_column]
if created_timestamp_column is not None:
timestamps.append(created_timestamp_column)
timestamp_desc_string = " DESC, ".join(timestamps) + " DESC"
field_string = ", ".join(entity_names + feature_names + timestamps)

query = f"""
SELECT {field_string}
FROM (
SELECT {field_string},
ROW_NUMBER() OVER({partition_by_entity_string} ORDER BY {timestamp_desc_string}) AS _feast_row
FROM `{feature_view.inputs.table_ref}`
WHERE {event_timestamp_column} BETWEEN TIMESTAMP('{start_date}') AND TIMESTAMP('{end_date}')
)
WHERE _feast_row = 1
"""

table = BigQueryOfflineStore._pull_query(query)
table = run_forward_field_mapping(table, feature_view)
return table

@staticmethod
def _pull_query(query: str) -> pyarrow.Table:
from google.cloud import bigquery

client = bigquery.Client()
query_job = client.query(query)
return query_job.to_arrow()


def run_reverse_field_mapping(
woop marked this conversation as resolved.
Show resolved Hide resolved
feature_view: FeatureView,
) -> Tuple[List[str], List[str], str, Optional[str]]:
"""
If a field mapping exists, run it in reverse on the entity names, feature names, event timestamp column, and created timestamp column to get the names of the relevant columns in the BigQuery table.

Args:
feature_view: FeatureView object containing the field mapping as well as the names to reverse-map.
Returns:
Tuple containing the list of reverse-mapped entity names, reverse-mapped feature names, reverse-mapped event timestamp column, and reverse-mapped created timestamp column that will be passed into the query to the offline store.
"""
# if we have mapped fields, use the original field names in the call to the offline store
event_timestamp_column = feature_view.inputs.event_timestamp_column
entity_names = [entity.name for entity in feature_view.entities]
feature_names = [feature.name for feature in feature_view.features]
created_timestamp_column = feature_view.inputs.created_timestamp_column
if feature_view.inputs.field_mapping is not None:
reverse_field_mapping = {
v: k for k, v in feature_view.inputs.field_mapping.items()
}
event_timestamp_column = (
reverse_field_mapping[event_timestamp_column]
if event_timestamp_column in reverse_field_mapping.keys()
else event_timestamp_column
)
created_timestamp_column = (
reverse_field_mapping[created_timestamp_column]
if created_timestamp_column is not None
and created_timestamp_column in reverse_field_mapping.keys()
else created_timestamp_column
)
entity_names = [
reverse_field_mapping[col] if col in reverse_field_mapping.keys() else col
for col in entity_names
]
feature_names = [
reverse_field_mapping[col] if col in reverse_field_mapping.keys() else col
for col in feature_names
]
return (
entity_names,
feature_names,
event_timestamp_column,
created_timestamp_column,
)


def run_forward_field_mapping(
woop marked this conversation as resolved.
Show resolved Hide resolved
table: pyarrow.Table, feature_view: FeatureView
) -> pyarrow.Table:
# run field mapping in the forward direction
if table is not None and feature_view.inputs.field_mapping is not None:
cols = table.column_names
mapped_cols = [
feature_view.inputs.field_mapping[col]
if col in feature_view.inputs.field_mapping.keys()
else col
for col in cols
]
table = table.rename_columns(mapped_cols)
return table
1 change: 0 additions & 1 deletion sdk/python/requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ Click==7.*
google-api-core==1.22.4
google-auth==1.22.1
google-cloud-bigquery==1.18
google-cloud-bigquery-storage==0.7.0
google-cloud-dataproc==2.0.2
google-cloud-storage==1.20.0
google-resumable-media>=0.5
Expand Down
1 change: 0 additions & 1 deletion sdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
"google-cloud-storage==1.20.*",
"google-cloud-core==1.4.*",
"googleapis-common-protos==1.52.*",
"google-cloud-bigquery-storage==0.7.*",
"grpcio==1.31.0",
"pandas~=1.0.0",
"pandavro==1.5.*",
Expand Down