Skip to content

Commit

Permalink
fix: Timestamp type stored in online store should be parsed with seco…
Browse files Browse the repository at this point in the history
…nds resolution

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex committed Apr 21, 2022
1 parent 104155c commit 33ae245
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 19 deletions.
24 changes: 12 additions & 12 deletions go/types/typeconversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ func ProtoTypeToArrowType(sample *types.Value) (arrow.DataType, error) {
case *types.Value_DoubleListVal:
return arrow.ListOf(arrow.PrimitiveTypes.Float64), nil
case *types.Value_UnixTimestampVal:
return arrow.FixedWidthTypes.Time64ns, nil
return arrow.FixedWidthTypes.Time32s, nil
case *types.Value_UnixTimestampListVal:
return arrow.ListOf(arrow.FixedWidthTypes.Time64ns), nil
return arrow.ListOf(arrow.FixedWidthTypes.Time32s), nil
default:
return nil,
fmt.Errorf("unsupported proto type in proto to arrow conversion: %s", sample.Val)
Expand Down Expand Up @@ -80,9 +80,9 @@ func ValueTypeEnumToArrowType(t types.ValueType_Enum) (arrow.DataType, error) {
case types.ValueType_DOUBLE_LIST:
return arrow.ListOf(arrow.PrimitiveTypes.Float64), nil
case types.ValueType_UNIX_TIMESTAMP:
return arrow.FixedWidthTypes.Time64ns, nil
return arrow.FixedWidthTypes.Time32s, nil
case types.ValueType_UNIX_TIMESTAMP_LIST:
return arrow.ListOf(arrow.FixedWidthTypes.Time64ns), nil
return arrow.ListOf(arrow.FixedWidthTypes.Time32s), nil
default:
return nil,
fmt.Errorf("unsupported value type enum in enum to arrow type conversion: %s", t)
Expand Down Expand Up @@ -119,9 +119,9 @@ func copyProtoValuesToArrowArray(builder array.Builder, values []*types.Value) e
for _, v := range values {
fieldBuilder.Append(v.GetDoubleVal())
}
case *array.Time64Builder:
case *array.Time32Builder:
for _, v := range values {
fieldBuilder.Append(arrow.Time64(v.GetUnixTimestampVal()))
fieldBuilder.Append(arrow.Time32(v.GetUnixTimestampVal()))
}
case *array.ListBuilder:
for _, list := range values {
Expand Down Expand Up @@ -157,9 +157,9 @@ func copyProtoValuesToArrowArray(builder array.Builder, values []*types.Value) e
for _, v := range list.GetDoubleListVal().GetVal() {
valueBuilder.Append(v)
}
case *array.Time64Builder:
case *array.Time32Builder:
for _, v := range list.GetUnixTimestampListVal().GetVal() {
valueBuilder.Append(arrow.Time64(v))
valueBuilder.Append(arrow.Time32(v))
}
}
}
Expand Down Expand Up @@ -227,10 +227,10 @@ func ArrowValuesToProtoValues(arr arrow.Array) ([]*types.Value, error) {
}
values = append(values,
&types.Value{Val: &types.Value_BoolListVal{BoolListVal: &types.BoolList{Val: vals}}})
case arrow.FixedWidthTypes.Time64ns:
case arrow.FixedWidthTypes.Time32s:
vals := make([]int64, int(offsets[idx])-pos)
for j := pos; j < int(offsets[idx]); j++ {
vals[j-pos] = int64(listValues.(*array.Time64).Value(j))
vals[j-pos] = int64(listValues.(*array.Time32).Value(j))
}

values = append(values,
Expand Down Expand Up @@ -278,11 +278,11 @@ func ArrowValuesToProtoValues(arr arrow.Array) ([]*types.Value, error) {
values = append(values,
&types.Value{Val: &types.Value_StringVal{StringVal: arr.(*array.String).Value(idx)}})
}
case arrow.FixedWidthTypes.Time64ns:
case arrow.FixedWidthTypes.Time32s:
for idx := 0; idx < arr.Len(); idx++ {
values = append(values,
&types.Value{Val: &types.Value_UnixTimestampVal{
UnixTimestampVal: int64(arr.(*array.Time64).Value(idx))}})
UnixTimestampVal: int64(arr.(*array.Time32).Value(idx))}})
}
default:
return nil, fmt.Errorf("unsupported arrow to proto conversion for type %s", arr.DataType())
Expand Down
14 changes: 7 additions & 7 deletions sdk/python/feast/embedded_go/online_features_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
pa.bool_(): "bool_val",
pa.string(): "string_val",
pa.binary(): "bytes_val",
pa.time64("ns"): "unix_timestamp_val",
pa.time32("s"): "unix_timestamp_val",
}

ARROW_LIST_TYPE_TO_PROTO_FIELD = {
Expand All @@ -42,7 +42,7 @@
pa.bool_(): "bool_list_val",
pa.string(): "string_list_val",
pa.binary(): "bytes_list_val",
pa.time64("ns"): "unix_timestamp_list_val",
pa.time32("s"): "unix_timestamp_list_val",
}

ARROW_LIST_TYPE_TO_PROTO_LIST_CLASS = {
Expand All @@ -53,7 +53,7 @@
pa.bool_(): Value_pb2.BoolList,
pa.string(): Value_pb2.StringList,
pa.binary(): Value_pb2.BytesList,
pa.time64("ns"): Value_pb2.Int64List,
pa.time32("s"): Value_pb2.Int64List,
}

# used for entity types only
Expand Down Expand Up @@ -270,8 +270,8 @@ def record_batch_to_online_response(record_batch):
proto_field_name = ARROW_LIST_TYPE_TO_PROTO_FIELD[field.type.value_type]

column = record_batch.columns[idx]
if field.type.value_type == pa.time64("ns"):
column = column.cast(pa.list_(pa.int64()))
if field.type.value_type == pa.time32("s"):
column = column.cast(pa.list_(pa.int32()))

for v in column.tolist():
feature_vector.values.append(
Expand All @@ -281,8 +281,8 @@ def record_batch_to_online_response(record_batch):
proto_field_name = ARROW_TYPE_TO_PROTO_FIELD[field.type]

column = record_batch.columns[idx]
if field.type == pa.time64("ns"):
column = column.cast(pa.int64())
if field.type == pa.time32("s"):
column = column.cast(pa.int32())

for v in column.tolist():
feature_vector.values.append(
Expand Down

0 comments on commit 33ae245

Please sign in to comment.