Skip to content

Commit

Permalink
use nil instead of dummy implementation for transformation callback
Browse files Browse the repository at this point in the history
Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex committed Apr 8, 2022
1 parent 92fe8fa commit a5e68a8
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 34 deletions.
6 changes: 1 addition & 5 deletions go/cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ const (
feastServerVersion = "0.18.0"
)

func dummyTransformCallback(ODFVName string, inputArrPtr, inputSchemaPtr, outArrPtr, outSchemaPtr uintptr, fullFeatureNames bool) int {
return 0
}

// TODO: Add a proper logging library such as https://github.com/Sirupsen/logrus
func main() {
repoPath := os.Getenv(flagFeastRepoPath)
Expand All @@ -45,7 +41,7 @@ func main() {
}

log.Println("Initializing feature store...")
fs, err := feast.NewFeatureStore(repoConfig, dummyTransformCallback)
fs, err := feast.NewFeatureStore(repoConfig, nil)
if err != nil {
log.Fatalln(err)
}
Expand Down
2 changes: 1 addition & 1 deletion go/cmd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func getClient(ctx context.Context, basePath string) (serving.ServingServiceClie
if err != nil {
panic(err)
}
fs, err := feast.NewFeatureStore(config, dummyTransformCallback)
fs, err := feast.NewFeatureStore(config, nil)
if err != nil {
panic(err)
}
Expand Down
45 changes: 23 additions & 22 deletions go/internal/feast/featurestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (
)

type FeatureStore struct {
config *RepoConfig
registry *Registry
onlineStore OnlineStore
tranformationCallback TransformationCallback
config *RepoConfig
registry *Registry
onlineStore OnlineStore
transformationCallback TransformationCallback
}

// A Features struct specifies a list of features to be retrieved from the online store. These features
Expand Down Expand Up @@ -86,10 +86,10 @@ func NewFeatureStore(config *RepoConfig, callback TransformationCallback) (*Feat
registry.initializeRegistry()

return &FeatureStore{
config: config,
registry: registry,
onlineStore: onlineStore,
tranformationCallback: callback,
config: config,
registry: registry,
onlineStore: onlineStore,
transformationCallback: callback,
}, nil
}

Expand Down Expand Up @@ -189,22 +189,23 @@ func (fs *FeatureStore) GetOnlineFeatures(
result = append(result, vectors...)
}

onDemandFeatures, err := augmentResponseWithOnDemandTransforms(
requestedOnDemandFeatureViews,
requestData,
joinKeyToEntityValues,
result,
fs.tranformationCallback,
arrowMemory,
numRows,
fullFeatureNames,
)
if err != nil {
return nil, err
if fs.transformationCallback != nil {
onDemandFeatures, err := augmentResponseWithOnDemandTransforms(
requestedOnDemandFeatureViews,
requestData,
joinKeyToEntityValues,
result,
fs.transformationCallback,
arrowMemory,
numRows,
fullFeatureNames,
)
if err != nil {
return nil, err
}
result = append(result, onDemandFeatures...)
}

result = append(result, onDemandFeatures...)

result, err = keepOnlyRequestedFeatures(result, featureRefs, featureService, fullFeatureNames)
if err != nil {
return nil, err
Expand Down
8 changes: 2 additions & 6 deletions go/internal/feast/featurestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ func getRegistryPath() map[string]interface{} {
return registry
}

func dummyTransformCallback(ODFVName string, inputArrPtr, inputSchemaPtr, outArrPtr, outSchemaPtr uintptr, fullFeatureNames bool) int {
return 0
}

func TestNewFeatureStore(t *testing.T) {
t.Skip("@todo(achals): feature_repo isn't checked in yet")
config := RepoConfig{
Expand All @@ -36,7 +32,7 @@ func TestNewFeatureStore(t *testing.T) {
"type": "redis",
},
}
fs, err := NewFeatureStore(&config, dummyTransformCallback)
fs, err := NewFeatureStore(&config, nil)
assert.Nil(t, err)
assert.IsType(t, &RedisOnlineStore{}, fs.onlineStore)
}
Expand All @@ -62,7 +58,7 @@ func TestGetOnlineFeaturesRedis(t *testing.T) {
{Val: &types.Value_Int64Val{Int64Val: 1003}}}},
}

fs, err := NewFeatureStore(&config, dummyTransformCallback)
fs, err := NewFeatureStore(&config, nil)
assert.Nil(t, err)
ctx := context.Background()
response, err := fs.GetOnlineFeatures(
Expand Down
6 changes: 6 additions & 0 deletions go/internal/feast/transformation.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ import (
"unsafe"
)

/*
TransformationCallback is a Python callback function's expected signature.
The function should accept name of the on demand feature view and pointers to input & output record batches.
Each record batch is being passed as two pointers: pointer to array (data) and pointer to schema.
Python function is expected to return number of rows added to the output record batch.
*/
type TransformationCallback func(ODFVName string, inputArrPtr, inputSchemaPtr, outArrPtr, outSchemaPtr uintptr, fullFeatureNames bool) int

func augmentResponseWithOnDemandTransforms(
Expand Down

0 comments on commit a5e68a8

Please sign in to comment.