Skip to content

Commit

Permalink
fix: CGO Memory leak issue in GO Feature server (feast-dev#4291)
Browse files Browse the repository at this point in the history
  • Loading branch information
EXPEbdodla committed Jun 29, 2024
1 parent 2c38946 commit 43e198f
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 4 deletions.
24 changes: 20 additions & 4 deletions go/internal/feast/server/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/feast-dev/feast/go/internal/feast"
"github.com/feast-dev/feast/go/internal/feast/model"
"github.com/feast-dev/feast/go/internal/feast/onlineserving"
"github.com/feast-dev/feast/go/internal/feast/server/logging"
"github.com/feast-dev/feast/go/protos/feast/serving"
prototypes "github.com/feast-dev/feast/go/protos/feast/types"
"github.com/feast-dev/feast/go/types"
"net/http"
"time"
)

type httpServer struct {
Expand Down Expand Up @@ -210,15 +212,15 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) {
"results": results,
}

w.Header().Set("Content-Type", "application/json")

err = json.NewEncoder(w).Encode(response)

if err != nil {
http.Error(w, fmt.Sprintf("Error encoding response: %+v", err), http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")

if featureService != nil && featureService.LoggingConfig != nil && s.loggingService != nil {
logger, err := s.loggingService.GetOrCreateLogger(featureService)
if err != nil {
Expand Down Expand Up @@ -250,18 +252,32 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) {
return
}
}
go releaseCGOMemory(featureVectors)
}

func releaseCGOMemory(featureVectors []*onlineserving.FeatureVector) {
for _, vector := range featureVectors {
vector.Values.Release()
}
}

func (s *httpServer) Serve(host string, port int) error {
s.server = &http.Server{Addr: fmt.Sprintf("%s:%d", host, port), Handler: nil}
http.HandleFunc("/get-online-features", s.getOnlineFeatures)
http.HandleFunc("/health", healthCheckHandler)
err := s.server.ListenAndServe()
// Don't return the error if it's caused by graceful shutdown using Stop()
if err == http.ErrServerClosed {
return nil
}
return err
}

func healthCheckHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "Healthy")
}

func (s *httpServer) Stop() error {
if s.server != nil {
return s.server.Shutdown(context.Background())
Expand Down
12 changes: 12 additions & 0 deletions go/internal/feast/transformation/transformation.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,15 @@ func AugmentResponseWithOnDemandTransforms(
for name, values := range requestData {
requestContextArrow[name], err = types.ProtoValuesToArrowArray(values.Val, arrowMemory, numRows)
if err != nil {
ReleaseArrowContext(requestContextArrow)
return nil, err
}
}

for name, values := range entityRows {
requestContextArrow[name], err = types.ProtoValuesToArrowArray(values.Val, arrowMemory, numRows)
if err != nil {
ReleaseArrowContext(requestContextArrow)
return nil, err
}
}
Expand All @@ -71,14 +73,24 @@ func AugmentResponseWithOnDemandTransforms(
fullFeatureNames,
)
if err != nil {
ReleaseArrowContext(requestContextArrow)
return nil, err
}
result = append(result, onDemandFeatures...)

ReleaseArrowContext(requestContextArrow)
}

return result, nil
}

func ReleaseArrowContext(requestContextArrow map[string]arrow.Array) {
// Release memory used by requestContextArrow
for _, arrowArray := range requestContextArrow {
arrowArray.Release()
}
}

func CallTransformations(
featureView *model.OnDemandFeatureView,
retrievedFeatures map[string]arrow.Array,
Expand Down

0 comments on commit 43e198f

Please sign in to comment.