From f94120bf9f7040a678f207076bb73d73cab7b7cc Mon Sep 17 00:00:00 2001 From: Ivy Gooch Date: Fri, 20 Oct 2023 20:20:13 +0000 Subject: [PATCH] Implements UpdateList, AddValue, and RemoveValue in the SDK Server --- pkg/apis/agones/v1/gameserver.go | 6 +- pkg/apis/agones/v1/gameserver_test.go | 2 +- pkg/sdkserver/sdkserver.go | 235 ++++++++++++++++++++++++-- sdks/go/alpha.go | 34 ++++ 4 files changed, 261 insertions(+), 16 deletions(-) diff --git a/pkg/apis/agones/v1/gameserver.go b/pkg/apis/agones/v1/gameserver.go index 3b0c52ae08..f8e111958c 100644 --- a/pkg/apis/agones/v1/gameserver.go +++ b/pkg/apis/agones/v1/gameserver.go @@ -936,7 +936,7 @@ func (gs *GameServer) AppendListValues(name string, values []string) error { return errors.Errorf("unable to AppendListValues: Name %s, Values %s. No values to append", name, values) } if list, ok := gs.Status.Lists[name]; ok { - mergedList := mergeRemoveDuplicates(list.Values, values) + mergedList := MergeRemoveDuplicates(list.Values, values) // TODO: Truncate and apply up to cutoff if len(mergedList) > int(list.Capacity) { return errors.Errorf("unable to AppendListValues: Name %s, Values %s. Appended list length %d exceeds list capacity %d", name, values, len(mergedList), list.Capacity) @@ -953,10 +953,10 @@ func (gs *GameServer) AppendListValues(name string, values []string) error { return errors.Errorf("unable to AppendListValues: Name %s, Values %s. List not found in GameServer %s", name, values, gs.ObjectMeta.GetName()) } -// mergeRemoveDuplicates merges two lists and removes any duplicate values. +// MergeRemoveDuplicates merges two lists and removes any duplicate values. // Maintains ordering, so new values from list2 are appended to the end of list1. // Returns a new list with unique values only. -func mergeRemoveDuplicates(list1 []string, list2 []string) []string { +func MergeRemoveDuplicates(list1 []string, list2 []string) []string { uniqueList := []string{} listMap := make(map[string]bool) for _, v1 := range list1 { diff --git a/pkg/apis/agones/v1/gameserver_test.go b/pkg/apis/agones/v1/gameserver_test.go index f40a903153..785ae8a6f6 100644 --- a/pkg/apis/agones/v1/gameserver_test.go +++ b/pkg/apis/agones/v1/gameserver_test.go @@ -2054,7 +2054,7 @@ func TestMergeRemoveDuplicates(t *testing.T) { for test, testCase := range testCases { t.Run(test, func(t *testing.T) { - got := mergeRemoveDuplicates(testCase.str1, testCase.str2) + got := MergeRemoveDuplicates(testCase.str1, testCase.str2) assert.Equal(t, testCase.want, got) }) } diff --git a/pkg/sdkserver/sdkserver.go b/pkg/sdkserver/sdkserver.go index 06ad2fe9f7..a78ea3331e 100644 --- a/pkg/sdkserver/sdkserver.go +++ b/pkg/sdkserver/sdkserver.go @@ -61,6 +61,7 @@ const ( updatePlayerCapacity Operation = "updatePlayerCapacity" updateConnectedPlayers Operation = "updateConnectedPlayers" updateCounters Operation = "updateCounters" + updateLists Operation = "updateLists" updatePeriod time.Duration = time.Second ) @@ -81,6 +82,15 @@ type counterUpdateRequest struct { counter agonesv1.CounterStatus } +type listUpdateRequest struct { + // Capacity of the List as set by capacitySet. + capacitySet *int64 + // Values to add to the List + listAppend []string + // Values to remove from the List + listDelete []string +} + // SDKServer is a gRPC server, that is meant to be a sidecar // for a GameServer that will update the game server status on SDK requests // @@ -118,6 +128,7 @@ type SDKServer struct { gsPlayerCapacity int64 gsConnectedPlayers []string gsCounterUpdates map[string]counterUpdateRequest + gsListUpdates map[string]listUpdateRequest gsCopy *agonesv1.GameServer } @@ -160,6 +171,7 @@ func NewSDKServer(gameServerName, namespace string, kubeClient kubernetes.Interf if runtime.FeatureEnabled(runtime.FeatureCountsAndLists) { // Once FeatureCountsAndLists is in GA, move this into SDKServer creation above. s.gsCounterUpdates = map[string]counterUpdateRequest{} + s.gsListUpdates = map[string]listUpdateRequest{} } s.informerFactory = factory @@ -332,6 +344,8 @@ func (s *SDKServer) syncGameServer(ctx context.Context, key string) error { return s.updateConnectedPlayers(ctx) case updateCounters: return s.updateCounter(ctx) + case updateLists: + return s.updateList(ctx) } return errors.Errorf("could not sync game server key: %s", key) @@ -978,48 +992,245 @@ func (s *SDKServer) updateCounter(ctx context.Context) error { return nil } -// GetList returns a List. Returns NOT_FOUND if the List does not exist. +// GetList returns a List. Returns not found if the List does not exist. // [Stage:Alpha] // [FeatureFlag:CountsAndLists] func (s *SDKServer) GetList(ctx context.Context, in *alpha.GetListRequest) (*alpha.List, error) { if !runtime.FeatureEnabled(runtime.FeatureCountsAndLists) { return nil, errors.Errorf("%s not enabled", runtime.FeatureCountsAndLists) } - // TODO(#2716): Implement me - return nil, errors.Errorf("Unimplemented -- GetList coming soon") + s.logger.WithField("name", in.Name).Debug("Getting List") + + gs, err := s.gameServer() + if err != nil { + return nil, err + } + + s.gsUpdateMutex.RLock() + defer s.gsUpdateMutex.RUnlock() + + list, ok := gs.Status.Lists[in.Name] + if !ok { + return nil, errors.Errorf("list not found: %s", in.Name) + } + + s.logger.WithField("Get List", list).Debugf("Got List %s", in.Name) + protoList := alpha.List{Name: in.Name, Values: list.Values, Capacity: list.Capacity} + // If there are batched changes that have not yet been applied, apply them to the List. + // This does NOT validate batched the changes. + if listUpdate, ok := s.gsListUpdates[in.Name]; ok { + if listUpdate.capacitySet != nil { + protoList.Capacity = *listUpdate.capacitySet + } + // TODO: DELETE + // if len(listUpdate.listDelete) != 0 { + // } + // TODO: APPEND + // if len(listUpdate.listAppend) != 0 { + // } + // TODO: Truncate if past capacity + s.logger.WithField("Get List", list).Debugf("Applied Batched List Updates %v", listUpdate) + } + + return &protoList, nil } -// UpdateList returns the updated List. +// UpdateList collapses all update capacity requests for a given List into a single UpdateList request. +// This function currently only updates the Capacity of a List. +// Returns error if the List does not exist (name cannot be updated). +// Returns error if the List update capacity is out of range [0,1000]. // [Stage:Alpha] // [FeatureFlag:CountsAndLists] func (s *SDKServer) UpdateList(ctx context.Context, in *alpha.UpdateListRequest) (*alpha.List, error) { if !runtime.FeatureEnabled(runtime.FeatureCountsAndLists) { return nil, errors.Errorf("%s not enabled", runtime.FeatureCountsAndLists) } - // TODO(#2716): Implement Me - return nil, errors.Errorf("Unimplemented -- UpdateList coming soon") + + if in.List == nil || in.UpdateMask == nil { + return nil, errors.Errorf("invalid argument. List: %v and UpdateMask %v cannot be nil", in.List, in.UpdateMask) + } + + name := in.List.Name + s.logger.WithField("name", name).Debug("Update List -- Currently only used for Updating Capacity") + + s.gsUpdateMutex.Lock() + defer s.gsUpdateMutex.Unlock() + + gs, err := s.gameServer() + if err != nil { + return nil, err + } + + // TODO: Pull in variable Max Capacity from CRD instead of hard-coded number here. + if in.List.Capacity < 0 || in.List.Capacity > 1000 { + return nil, errors.Errorf("out of range. Capacity must be within range [0,1000]. Found Capacity: %d", in.List.Capacity) + } + + if _, ok := gs.Status.Lists[name]; ok { + batchList := s.gsListUpdates[name] + batchList.capacitySet = &in.List.Capacity + s.gsListUpdates[name] = batchList + // Queue up the Update for later batch processing by updateLists. + s.workerqueue.Enqueue(cache.ExplicitKey(updateLists)) + return &alpha.List{}, nil + } + return nil, errors.Errorf("not found. %s List not found", name) } -// AddListValue returns the updated List. +// AddListValue collapses all append a value to the end of a List requests into a single UpdateList request. +// Returns not found if the List does not exist. +// Returns already exists if the value is already in the List. +// Returns out of range if the List is already at Capacity. // [Stage:Alpha] // [FeatureFlag:CountsAndLists] func (s *SDKServer) AddListValue(ctx context.Context, in *alpha.AddListValueRequest) (*alpha.List, error) { if !runtime.FeatureEnabled(runtime.FeatureCountsAndLists) { return nil, errors.Errorf("%s not enabled", runtime.FeatureCountsAndLists) } - // TODO(#2716): Implement Me - return nil, errors.Errorf("Unimplemented -- AddListValue coming soon") + s.logger.WithField("name", in.Name).Debug("Add List Value") + + s.gsUpdateMutex.Lock() + defer s.gsUpdateMutex.Unlock() + + gs, err := s.gameServer() + if err != nil { + return nil, err + } + + if list, ok := gs.Status.Lists[in.Name]; ok { + batchList := s.gsListUpdates[in.Name] + // Verify room to add another value + var capacity int64 + if batchList.capacitySet != nil { + capacity = *batchList.capacitySet + } else { + capacity = int64(len(list.Values) + len(batchList.listAppend) - len(batchList.listDelete)) + } + if list.Capacity <= capacity { + return nil, errors.Errorf("out of range. No available capacity. Current Capacity: %d, List Size: %d", list.Capacity, len(list.Values)) + } + // Verify value does not already exist in the list + // TODO: This does not check batched but not yet applied append / remove values. Should we do this? + // (Easy to check not yet applied values, hard to check removed and re-added values.) I'm + // thinking this would be better / easier to do as part of the batch apply update to list, and + // not verify here. + for _, val := range list.Values { + if in.Value == val { + return nil, errors.Errorf("already exists. Value: %s already in List: %s", in.Value, in.Name) + } + } + batchList.listAppend = append(batchList.listAppend, in.Value) + s.gsListUpdates[in.Name] = batchList + // Queue up the Update for later batch processing by updateLists. + s.workerqueue.Enqueue(cache.ExplicitKey(updateLists)) + return &alpha.List{}, nil + } + return nil, errors.Errorf("not found. %s List not found", in.Name) } -// RemoveListValue returns the updated List. +// RemoveListValue collapses all remove a value from a List requests into a single UpdateList request. +// Returns not found if the List does not exist. +// Returns not found if the value is not in the List. // [Stage:Alpha] // [FeatureFlag:CountsAndLists] func (s *SDKServer) RemoveListValue(ctx context.Context, in *alpha.RemoveListValueRequest) (*alpha.List, error) { if !runtime.FeatureEnabled(runtime.FeatureCountsAndLists) { return nil, errors.Errorf("%s not enabled", runtime.FeatureCountsAndLists) } - // TODO(#2716): Implement Me - return nil, errors.Errorf("Unimplemented -- RemoveListValue coming soon") + + s.logger.WithField("name", in.Name).Debug("Remove List Value") + + s.gsUpdateMutex.Lock() + defer s.gsUpdateMutex.Unlock() + + gs, err := s.gameServer() + if err != nil { + return nil, err + } + + if list, ok := gs.Status.Lists[in.Name]; ok { + // Verify value exists in the list + for _, val := range list.Values { + if in.Value != val { + continue + } + // Add value to remove to gsListUpdates map. + batchList := s.gsListUpdates[in.Name] + batchList.listDelete = append(batchList.listDelete, in.Value) + s.gsListUpdates[in.Name] = batchList + // Queue up the Update for later batch processing by updateLists. + s.workerqueue.Enqueue(cache.ExplicitKey(updateLists)) + return &alpha.List{}, nil + } + return nil, errors.Errorf("not found. Value: %s not found in List: %s", in.Value, in.Name) + } + return nil, errors.Errorf("not found. %s List not found", in.Name) +} + +// updateList updates the Lists in the GameServer's Status with the batched update list requests. +// Includes all SetCapacity, AddValue, and RemoveValue requests in the batched request. +func (s *SDKServer) updateList(ctx context.Context) error { + gs, err := s.gameServer() + if err != nil { + return err + } + gsCopy := gs.DeepCopy() + + s.logger.WithField("batchListUpdates", s.gsListUpdates).Debug("Batch updating List(s)") + s.gsUpdateMutex.Lock() + defer s.gsUpdateMutex.Unlock() + + names := []string{} + + for name, listReq := range s.gsListUpdates { + list, ok := gsCopy.Status.Lists[name] + if !ok { + continue + } + if listReq.capacitySet != nil { + list.Capacity = *listReq.capacitySet + } + if len(listReq.listDelete) != 0 { + // TODO: Is there a quicker way of doing this while still keeping the ordering in tact? + for _, dltVal := range listReq.listDelete { + for i, val := range list.Values { + if dltVal == val { + // Remove value (maintains list ordering and modifies underlying gameserverstatus List.Values array). + list.Values = append(list.Values[:i], list.Values[i+1:]...) + } + } + } + } + if len(listReq.listAppend) != 0 { + list.Values = agonesv1.MergeRemoveDuplicates(list.Values, listReq.listAppend) + } + + if int64(len(list.Values)) > list.Capacity { + s.logger.Debugf("truncating Values in Update List request to List Capacity %d", list.Capacity) + list.Values = append([]string{}, list.Values[:list.Capacity]...) + } + gsCopy.Status.Lists[name] = list + names = append(names, name) + } + + gs, err = s.gameServerGetter.GameServers(s.namespace).Update(ctx, gsCopy, metav1.UpdateOptions{}) + if err != nil { + return err + } + + // Record an event per List update + for _, name := range names { + s.recorder.Event(gs, corev1.EventTypeNormal, "UpdateList", fmt.Sprintf("List %s updated", name)) + s.logger.Debugf("List %s updated to List Capacity: %d, Values: %v", + name, gs.Status.Lists[name].Capacity, gs.Status.Lists[name].Values) + } + + // Cache a copy of the successfully updated gameserver + s.gsCopy = gs + // Clear the gsCounterUpdates + s.gsCounterUpdates = map[string]counterUpdateRequest{} + + return nil } // sendGameServerUpdate sends a watch game server event diff --git a/sdks/go/alpha.go b/sdks/go/alpha.go index b7c6a05258..0af7d64219 100644 --- a/sdks/go/alpha.go +++ b/sdks/go/alpha.go @@ -20,6 +20,7 @@ import ( "agones.dev/agones/pkg/sdk/alpha" "github.com/pkg/errors" "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/fieldmaskpb" "google.golang.org/protobuf/types/known/wrapperspb" ) @@ -172,3 +173,36 @@ func (a *Alpha) SetCounterCapacity(key string, amount int64) (bool, error) { } return true, err } + +// Alpha().ListAppend(key, value): boolean +// Alpha().ListDelete(key, value): boolean + +// GetListCapacity returns the Capacity for a List, given the List's key (name). +// Will error if the key was not predefined in the GameServer resource on creation. +func (a *Alpha) GetListCapacity(key string) (int64, error) { + list, err := a.client.GetList(context.Background(), &alpha.GetListRequest{Name: key}) + if err != nil { + return -1, errors.Wrapf(err, "could not get List %s capacity", key) + } + return list.Capacity, nil +} + +// SetListCapacity sets the capacity for a given list. Capacity must be between 0 and 1000. +// Will error if the key was not predefined in the GameServer resource on creation. +func (a *Alpha) SetListCapacity(key string, amount int64) (bool, error) { + _, err := a.client.UpdateList(context.Background(), &alpha.UpdateListRequest{ + List: &alpha.List{ + Name: key, + Capacity: amount, + }, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"capacity"}}, + }) + if err != nil { + return false, errors.Wrapf(err, "could not set List %s capacity to amount %d", key, amount) + } + return true, err +} + +// Alpha().ListContains(key, value): boolean +// Alpha().ListLength(key, value): integer +// Alpha().ListGet(key): []string