diff --git a/go/cmd/server/main.go b/go/cmd/server/main.go index ae0f4dae2e..c0e735a422 100644 --- a/go/cmd/server/main.go +++ b/go/cmd/server/main.go @@ -17,10 +17,6 @@ const ( feastServerVersion = "0.18.0" ) -func dummyTransformCallback(ODFVName string, inputArrPtr, inputSchemaPtr, outArrPtr, outSchemaPtr uintptr, fullFeatureNames bool) int { - return 0 -} - // TODO: Add a proper logging library such as https://github.com/Sirupsen/logrus func main() { repoPath := os.Getenv(flagFeastRepoPath) @@ -45,7 +41,7 @@ func main() { } log.Println("Initializing feature store...") - fs, err := feast.NewFeatureStore(repoConfig, dummyTransformCallback) + fs, err := feast.NewFeatureStore(repoConfig, nil) if err != nil { log.Fatalln(err) } diff --git a/go/cmd/server/server_test.go b/go/cmd/server/server_test.go index 7e19600b38..e5e448a465 100644 --- a/go/cmd/server/server_test.go +++ b/go/cmd/server/server_test.go @@ -41,7 +41,7 @@ func getClient(ctx context.Context, basePath string) (serving.ServingServiceClie if err != nil { panic(err) } - fs, err := feast.NewFeatureStore(config, dummyTransformCallback) + fs, err := feast.NewFeatureStore(config, nil) if err != nil { panic(err) } diff --git a/go/internal/feast/featurestore.go b/go/internal/feast/featurestore.go index 6f033b6ab9..3b196bde3a 100644 --- a/go/internal/feast/featurestore.go +++ b/go/internal/feast/featurestore.go @@ -19,10 +19,10 @@ import ( ) type FeatureStore struct { - config *RepoConfig - registry *Registry - onlineStore OnlineStore - tranformationCallback TransformationCallback + config *RepoConfig + registry *Registry + onlineStore OnlineStore + transformationCallback TransformationCallback } // A Features struct specifies a list of features to be retrieved from the online store. These features @@ -86,10 +86,10 @@ func NewFeatureStore(config *RepoConfig, callback TransformationCallback) (*Feat registry.initializeRegistry() return &FeatureStore{ - config: config, - registry: registry, - onlineStore: onlineStore, - tranformationCallback: callback, + config: config, + registry: registry, + onlineStore: onlineStore, + transformationCallback: callback, }, nil } @@ -189,22 +189,23 @@ func (fs *FeatureStore) GetOnlineFeatures( result = append(result, vectors...) } - onDemandFeatures, err := augmentResponseWithOnDemandTransforms( - requestedOnDemandFeatureViews, - requestData, - joinKeyToEntityValues, - result, - fs.tranformationCallback, - arrowMemory, - numRows, - fullFeatureNames, - ) - if err != nil { - return nil, err + if fs.transformationCallback != nil { + onDemandFeatures, err := augmentResponseWithOnDemandTransforms( + requestedOnDemandFeatureViews, + requestData, + joinKeyToEntityValues, + result, + fs.transformationCallback, + arrowMemory, + numRows, + fullFeatureNames, + ) + if err != nil { + return nil, err + } + result = append(result, onDemandFeatures...) } - result = append(result, onDemandFeatures...) - result, err = keepOnlyRequestedFeatures(result, featureRefs, featureService, fullFeatureNames) if err != nil { return nil, err diff --git a/go/internal/feast/featurestore_test.go b/go/internal/feast/featurestore_test.go index 7c393b3d40..6fdf4fd3b6 100644 --- a/go/internal/feast/featurestore_test.go +++ b/go/internal/feast/featurestore_test.go @@ -22,10 +22,6 @@ func getRegistryPath() map[string]interface{} { return registry } -func dummyTransformCallback(ODFVName string, inputArrPtr, inputSchemaPtr, outArrPtr, outSchemaPtr uintptr, fullFeatureNames bool) int { - return 0 -} - func TestNewFeatureStore(t *testing.T) { t.Skip("@todo(achals): feature_repo isn't checked in yet") config := RepoConfig{ @@ -36,7 +32,7 @@ func TestNewFeatureStore(t *testing.T) { "type": "redis", }, } - fs, err := NewFeatureStore(&config, dummyTransformCallback) + fs, err := NewFeatureStore(&config, nil) assert.Nil(t, err) assert.IsType(t, &RedisOnlineStore{}, fs.onlineStore) } @@ -62,7 +58,7 @@ func TestGetOnlineFeaturesRedis(t *testing.T) { {Val: &types.Value_Int64Val{Int64Val: 1003}}}}, } - fs, err := NewFeatureStore(&config, dummyTransformCallback) + fs, err := NewFeatureStore(&config, nil) assert.Nil(t, err) ctx := context.Background() response, err := fs.GetOnlineFeatures( diff --git a/go/internal/feast/transformation.go b/go/internal/feast/transformation.go index d89e241c04..00cc0b4236 100644 --- a/go/internal/feast/transformation.go +++ b/go/internal/feast/transformation.go @@ -15,6 +15,12 @@ import ( "unsafe" ) +/* + TransformationCallback is a Python callback function's expected signature. + The function should accept name of the on demand feature view and pointers to input & output record batches. + Each record batch is being passed as two pointers: pointer to array (data) and pointer to schema. + Python function is expected to return number of rows added to the output record batch. +*/ type TransformationCallback func(ODFVName string, inputArrPtr, inputSchemaPtr, outArrPtr, outSchemaPtr uintptr, fullFeatureNames bool) int func augmentResponseWithOnDemandTransforms(