Skip to content

Commit

Permalink
Add ValueType.NULL (#1893)
Browse files Browse the repository at this point in the history
Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com>
  • Loading branch information
judahrand authored Sep 22, 2021
1 parent 7afa987 commit e893e7f
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 25 deletions.
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/utils/aws_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def upload_df_to_redshift(
column_names, column_types = [], []
for field in table.schema:
column_names.append(field.name)
column_types.append(pa_to_redshift_value_type(str(field.type)))
column_types.append(pa_to_redshift_value_type(field.type))
column_query_list = ", ".join(
[
f"{column_name} {column_type}"
Expand Down
8 changes: 4 additions & 4 deletions sdk/python/feast/online_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

from collections import defaultdict
from typing import Any, Dict, List, Optional, cast
from typing import Any, Dict, List, cast

import pandas as pd

Expand All @@ -24,7 +24,7 @@
)
from feast.protos.feast.types.Value_pb2 import Value as Value
from feast.type_map import (
_proto_str_to_value_type,
_proto_value_to_value_type,
_python_value_to_proto_value,
feast_value_type_to_python_type,
python_values_to_feast_value_type,
Expand Down Expand Up @@ -96,14 +96,14 @@ def _infer_online_entity_rows(

entity_rows_dicts = cast(List[Dict[str, Any]], entity_rows)
entity_row_list = []
entity_type_map: Dict[str, Optional[ValueType]] = dict()
entity_type_map: Dict[str, ValueType] = dict()
entity_python_values_map = defaultdict(list)

# Flatten keys-value dicts into lists for type inference
for entity in entity_rows_dicts:
for key, value in entity.items():
if isinstance(value, Value):
inferred_type = _proto_str_to_value_type(str(value.WhichOneof("val")))
inferred_type = _proto_value_to_value_type(value)
# If any ProtoValues were present their types must all be the same
if key in entity_type_map and entity_type_map.get(key) != inferred_type:
raise TypeError(
Expand Down
58 changes: 38 additions & 20 deletions sdk/python/feast/type_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

import re
from datetime import datetime
from typing import Any, Dict, List, Optional, Set, Tuple, Union
from typing import Any, Dict, List, Optional, Set, Tuple, Type

import numpy as np
import pandas as pd
import pyarrow
from google.protobuf.json_format import MessageToDict
from google.protobuf.pyext.cpp_message import GeneratedProtocolMessageType
from google.protobuf.timestamp_pb2 import Timestamp

from feast.protos.feast.types.Value_pb2 import (
Expand Down Expand Up @@ -149,9 +151,9 @@ def python_type_to_feast_value_type(
common_item_value_type = None
for item in list_items:
if isinstance(item, ProtoValue):
current_item_value_type: Optional[
ValueType
] = _proto_str_to_value_type(str(item.WhichOneof("val")))
current_item_value_type: ValueType = _proto_value_to_value_type(
item
)
else:
# Get the type from the current item, only one level deep
current_item_value_type = python_type_to_feast_value_type(
Expand Down Expand Up @@ -183,7 +185,9 @@ def python_type_to_feast_value_type(
return type_map[value.dtype.__str__()]


def python_values_to_feast_value_type(name: str, values: Any, recurse: bool = True):
def python_values_to_feast_value_type(
name: str, values: Any, recurse: bool = True
) -> ValueType:
inferred_dtype = ValueType.UNKNOWN
for row in values:
current_dtype = python_type_to_feast_value_type(
Expand All @@ -193,11 +197,14 @@ def python_values_to_feast_value_type(name: str, values: Any, recurse: bool = Tr
if inferred_dtype is ValueType.UNKNOWN:
inferred_dtype = current_dtype
else:
if current_dtype != inferred_dtype and current_dtype != ValueType.UNKNOWN:
if current_dtype != inferred_dtype and current_dtype not in (
ValueType.UNKNOWN,
ValueType.NULL,
):
raise TypeError(
f"Input entity {name} has mixed types, {current_dtype} and {inferred_dtype}. That is not allowed. "
)
if inferred_dtype is ValueType.UNKNOWN:
if inferred_dtype in (ValueType.UNKNOWN, ValueType.NULL):
raise ValueError(
f"field {name} cannot have all null values for type inference."
)
Expand All @@ -206,10 +213,12 @@ def python_values_to_feast_value_type(name: str, values: Any, recurse: bool = Tr


def _type_err(item, dtype):
raise ValueError(f'Value "{item}" is of type {type(item)} not of type {dtype}')
raise TypeError(f'Value "{item}" is of type {type(item)} not of type {dtype}')


PYTHON_LIST_VALUE_TYPE_TO_PROTO_VALUE: Dict[Any, Tuple[Any, str, List[Any]]] = {
PYTHON_LIST_VALUE_TYPE_TO_PROTO_VALUE: Dict[
ValueType, Tuple[GeneratedProtocolMessageType, str, List[Type]]
] = {
ValueType.FLOAT_LIST: (
FloatList,
"float_list_val",
Expand All @@ -233,7 +242,7 @@ def _type_err(item, dtype):
}

PYTHON_SCALAR_VALUE_TYPE_TO_PROTO_VALUE: Dict[
Any, Tuple[str, Any, Optional[Set[Any]]]
ValueType, Tuple[str, Any, Optional[Set[Type]]]
] = {
ValueType.INT32: ("int32_val", lambda x: int(x), None),
ValueType.INT64: ("int64_val", lambda x: int(x), None),
Expand All @@ -245,7 +254,7 @@ def _type_err(item, dtype):
}


def _python_value_to_proto_value(feast_value_type, value) -> ProtoValue:
def _python_value_to_proto_value(feast_value_type: ValueType, value: Any) -> ProtoValue:
"""
Converts a Python (native, pandas) value to a Feast Proto Value based
on a provided value type
Expand All @@ -257,9 +266,12 @@ def _python_value_to_proto_value(feast_value_type, value) -> ProtoValue:
Returns:
Feast Value Proto
"""

# Detect list type and handle separately
if "list" in feast_value_type.name.lower():
# Feature can be list but None is still valid
if value is None:
return ProtoValue()

if feast_value_type in PYTHON_LIST_VALUE_TYPE_TO_PROTO_VALUE:
proto_type, field_name, valid_types = PYTHON_LIST_VALUE_TYPE_TO_PROTO_VALUE[
feast_value_type
Expand All @@ -279,13 +291,15 @@ def _python_value_to_proto_value(feast_value_type, value) -> ProtoValue:
else:
if pd.isnull(value):
return ProtoValue()
elif feast_value_type == ValueType.UNIX_TIMESTAMP:

if feast_value_type == ValueType.UNIX_TIMESTAMP:
if isinstance(value, datetime):
return ProtoValue(int64_val=int(value.timestamp()))
elif isinstance(value, Timestamp):
return ProtoValue(int64_val=int(value.ToSeconds()))
return ProtoValue(int64_val=int(value))
elif feast_value_type in PYTHON_SCALAR_VALUE_TYPE_TO_PROTO_VALUE:

if feast_value_type in PYTHON_SCALAR_VALUE_TYPE_TO_PROTO_VALUE:
(
field_name,
func,
Expand All @@ -300,7 +314,7 @@ def _python_value_to_proto_value(feast_value_type, value) -> ProtoValue:


def python_value_to_proto_value(
value: Any, feature_type: ValueType = None
value: Any, feature_type: ValueType = ValueType.UNKNOWN
) -> ProtoValue:
value_type = feature_type
if value is not None:
Expand All @@ -315,7 +329,7 @@ def python_value_to_proto_value(
return _python_value_to_proto_value(value_type, value)


def _proto_str_to_value_type(proto_str: str) -> ValueType:
def _proto_value_to_value_type(proto_value: ProtoValue) -> ValueType:
"""
Returns Feast ValueType given Feast ValueType string.
Expand All @@ -325,6 +339,7 @@ def _proto_str_to_value_type(proto_str: str) -> ValueType:
Returns:
A variant of ValueType.
"""
proto_str = proto_value.WhichOneof("val")
type_map = {
"int32_val": ValueType.INT32,
"int64_val": ValueType.INT64,
Expand All @@ -340,6 +355,7 @@ def _proto_str_to_value_type(proto_str: str) -> ValueType:
"string_list_val": ValueType.STRING_LIST,
"bytes_list_val": ValueType.BYTES_LIST,
"bool_list_val": ValueType.BOOL_LIST,
None: ValueType.NULL,
}

return type_map[proto_str]
Expand All @@ -364,12 +380,13 @@ def pa_to_feast_value_type(pa_type_as_str: str) -> ValueType:
"list<item: string>": ValueType.STRING_LIST,
"list<item: binary>": ValueType.BYTES_LIST,
"list<item: bool>": ValueType.BOOL_LIST,
"null": ValueType.NULL,
}
return type_map[pa_type_as_str]


def bq_to_feast_value_type(bq_type_as_str):
type_map: Dict[ValueType, Union[str, Dict[str, Any]]] = {
def bq_to_feast_value_type(bq_type_as_str: str) -> ValueType:
type_map: Dict[str, ValueType] = {
"DATETIME": ValueType.UNIX_TIMESTAMP,
"TIMESTAMP": ValueType.UNIX_TIMESTAMP,
"INTEGER": ValueType.INT64,
Expand All @@ -384,6 +401,7 @@ def bq_to_feast_value_type(bq_type_as_str):
"ARRAY<STRING>": ValueType.STRING_LIST,
"ARRAY<BYTES>": ValueType.BYTES_LIST,
"ARRAY<BOOL>": ValueType.BOOL_LIST,
"NULL": ValueType.NULL,
}

return type_map[bq_type_as_str]
Expand All @@ -409,10 +427,10 @@ def redshift_to_feast_value_type(redshift_type_as_str: str) -> ValueType:
return type_map[redshift_type_as_str.lower()]


def pa_to_redshift_value_type(pa_type_as_str: str) -> str:
def pa_to_redshift_value_type(pa_type: pyarrow.DataType) -> str:
# PyArrow types: https://arrow.apache.org/docs/python/api/datatypes.html
# Redshift type: https://docs.aws.amazon.com/redshift/latest/dg/c_Supported_data_types.html
pa_type_as_str = pa_type_as_str.lower()
pa_type_as_str = str(pa_type).lower()
if pa_type_as_str.startswith("timestamp"):
if "tz=" in pa_type_as_str:
return "timestamptz"
Expand Down
1 change: 1 addition & 0 deletions sdk/python/feast/value_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class ValueType(enum.Enum):
FLOAT_LIST = 16
BOOL_LIST = 17
UNIX_TIMESTAMP_LIST = 18
NULL = 19

def to_tfx_schema_feature_type(self):
if self.value in [
Expand Down

0 comments on commit e893e7f

Please sign in to comment.