From d550adb098b130fe421f649daee6a0ba949d0c80 Mon Sep 17 00:00:00 2001 From: Aaron Liang Date: Mon, 17 Apr 2023 20:06:32 +0000 Subject: [PATCH] Ading a more graceful termination for allocator --- build/Makefile | 19 + cmd/allocator/main.go | 34 +- .../agones/templates/service/allocation.yaml | 6 + install/helm/agones/values.yaml | 4 + install/yaml/install.yaml | 6 + pkg/util/signals/signals.go | 2 +- test/e2e/allocator/main_test.go | 92 +++++ test/e2e/allocator/pod_termination_test.go | 82 ++++ test/e2e/allocator_test.go | 268 ++----------- test/e2e/allochelper/helper_func.go | 368 ++++++++++++++++++ 10 files changed, 641 insertions(+), 240 deletions(-) create mode 100644 test/e2e/allocator/main_test.go create mode 100644 test/e2e/allocator/pod_termination_test.go create mode 100644 test/e2e/allochelper/helper_func.go diff --git a/build/Makefile b/build/Makefile index 07522ea44d..5a18926599 100644 --- a/build/Makefile +++ b/build/Makefile @@ -303,6 +303,7 @@ test-e2e: $(MAKE) DOCKER_RUN_ARGS="$(DOCKER_RUN_ARGS)" test-e2e-integration $(MAKE) DOCKER_RUN_ARGS="$(DOCKER_RUN_ARGS)" test-e2e-failure $(MAKE) DOCKER_RUN_ARGS="$(DOCKER_RUN_ARGS)" test-e2e-ha-extensions + $(MAKE) DOCKER_RUN_ARGS="$(DOCKER_RUN_ARGS)" test-e2e-allocator-crash # e2e test args: @@ -364,6 +365,24 @@ else endif echo "Finishing e2e extensions high availability test!" +test-e2e-allocator-crash: FEATURE_GATES ?= $(ALPHA_FEATURE_GATES) +test-e2e-allocator-crash: CLOUD_PRODUCT ?= generic +test-e2e-allocator-crash: GO_E2E_TEST_INTEGRATION_ARGS ?=\ + --cloud-product=$(CLOUD_PRODUCT) \ + --gameserver-image=$(GS_TEST_IMAGE) \ + --feature-gates=$(FEATURE_GATES) \ + --pullsecret=$(IMAGE_PULL_SECRET) +test-e2e-allocator-crash: $(ensure-build-image) + echo "Starting e2e allocation pod deletion test!" +ifdef E2E_USE_GOTESTSUM + $(GOTESTSUM) --packages=$(agones_package)/test/e2e/allocator -- $(go_test_args) $(ARGS) -parallel=1 -args \ + $(GO_E2E_TEST_ARGS) $(GO_E2E_TEST_INTEGRATION_ARGS) +else + $(GO_TEST) $(ARGS) -parallel=1 $(agones_package)/test/e2e/allocator -args \ + $(GO_E2E_TEST_ARGS) $(GO_E2E_TEST_INTEGRATION_ARGS) +endif + echo "Finishing e2e allocation pod deletion test!" + # Runs end-to-end stress tests on the current configured cluster # For minikube user the minikube-stress-test-e2e targets stress-test-e2e: $(ensure-build-image) diff --git a/cmd/allocator/main.go b/cmd/allocator/main.go index 4f18c3ee5e..97e56b894e 100644 --- a/cmd/allocator/main.go +++ b/cmd/allocator/main.go @@ -55,7 +55,10 @@ import ( "k8s.io/client-go/rest" ) -var logger = runtime.NewLoggerWithSource("main") +var ( + podReady bool + logger = runtime.NewLoggerWithSource("main") +) const ( certDir = "/home/allocator/client-ca/" @@ -77,6 +80,7 @@ const ( apiServerBurstQPSFlag = "api-server-qps-burst" logLevelFlag = "log-level" allocationBatchWaitTime = "allocation-batch-wait-time" + readinessShutdownDuration = "readiness-shutdown-duration" ) func parseEnvFlags() config { @@ -109,6 +113,7 @@ func parseEnvFlags() config { pflag.Duration(totalRemoteAllocationTimeoutFlag, viper.GetDuration(totalRemoteAllocationTimeoutFlag), "Flag to set total remote allocation timeout including retries.") pflag.String(logLevelFlag, viper.GetString(logLevelFlag), "Agones Log level") pflag.Duration(allocationBatchWaitTime, viper.GetDuration(allocationBatchWaitTime), "Flag to configure the waiting period between allocations batches") + pflag.Duration(readinessShutdownDuration, viper.GetDuration(readinessShutdownDuration), "Time in seconds for SIGTERM/SIGINT handler to sleep for.") runtime.FeaturesBindFlags() pflag.Parse() @@ -127,6 +132,7 @@ func parseEnvFlags() config { runtime.Must(viper.BindEnv(totalRemoteAllocationTimeoutFlag)) runtime.Must(viper.BindEnv(logLevelFlag)) runtime.Must(viper.BindEnv(allocationBatchWaitTime)) + runtime.Must(viper.BindEnv(readinessShutdownDuration)) runtime.Must(viper.BindPFlags(pflag.CommandLine)) runtime.Must(runtime.FeaturesBindEnv()) @@ -147,6 +153,7 @@ func parseEnvFlags() config { remoteAllocationTimeout: viper.GetDuration(remoteAllocationTimeoutFlag), totalRemoteAllocationTimeout: viper.GetDuration(totalRemoteAllocationTimeoutFlag), allocationBatchWaitTime: viper.GetDuration(allocationBatchWaitTime), + ReadinessShutdownDuration: viper.GetDuration(readinessShutdownDuration), } } @@ -165,6 +172,7 @@ type config struct { totalRemoteAllocationTimeout time.Duration remoteAllocationTimeout time.Duration allocationBatchWaitTime time.Duration + ReadinessShutdownDuration time.Duration } // grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC @@ -214,12 +222,29 @@ func main() { // This will test the connection to agones on each readiness probe // so if one of the allocator pod can't reach Kubernetes it will be removed // from the Kubernetes service. + ctx, cancelCtx := context.WithCancel(context.Background()) + podReady = true health.AddReadinessCheck("allocator-agones-client", func() error { + if !podReady { + return errors.New("asked to shut down, failed readiness check") + } _, err := agonesClient.ServerVersion() - return err + if err != nil { + return fmt.Errorf("failed to reach Kubernetes: %w", err) + } + return nil + }) + + signals.NewSigTermHandler(func() { + logger.Info("Pod shutdown has been requested, failing readiness check") + podReady = false + time.Sleep(conf.ReadinessShutdownDuration) + cancelCtx() + logger.Infof("Readiness shutdown duration has passed, exiting pod") + os.Exit(0) }) - h := newServiceHandler(kubeClient, agonesClient, health, conf.MTLSDisabled, conf.TLSDisabled, conf.remoteAllocationTimeout, conf.totalRemoteAllocationTimeout, conf.allocationBatchWaitTime) + h := newServiceHandler(ctx, kubeClient, agonesClient, health, conf.MTLSDisabled, conf.TLSDisabled, conf.remoteAllocationTimeout, conf.totalRemoteAllocationTimeout, conf.allocationBatchWaitTime) if !h.tlsDisabled { cancelTLS, err := fswatch.Watch(logger, tlsDir, time.Second, func() { @@ -353,7 +378,7 @@ func runGRPC(h *serviceHandler, grpcPort int) { }() } -func newServiceHandler(kubeClient kubernetes.Interface, agonesClient versioned.Interface, health healthcheck.Handler, mTLSDisabled bool, tlsDisabled bool, remoteAllocationTimeout time.Duration, totalRemoteAllocationTimeout time.Duration, allocationBatchWaitTime time.Duration) *serviceHandler { +func newServiceHandler(ctx context.Context, kubeClient kubernetes.Interface, agonesClient versioned.Interface, health healthcheck.Handler, mTLSDisabled bool, tlsDisabled bool, remoteAllocationTimeout time.Duration, totalRemoteAllocationTimeout time.Duration, allocationBatchWaitTime time.Duration) *serviceHandler { defaultResync := 30 * time.Second agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, defaultResync) kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync) @@ -369,7 +394,6 @@ func newServiceHandler(kubeClient kubernetes.Interface, agonesClient versioned.I totalRemoteAllocationTimeout, allocationBatchWaitTime) - ctx, _ := signals.NewSigKillContext() h := serviceHandler{ allocationCallback: func(gsa *allocationv1.GameServerAllocation) (k8sruntime.Object, error) { return allocator.Allocate(ctx, gsa) diff --git a/install/helm/agones/templates/service/allocation.yaml b/install/helm/agones/templates/service/allocation.yaml index 03383e6eae..1915f7bdf9 100644 --- a/install/helm/agones/templates/service/allocation.yaml +++ b/install/helm/agones/templates/service/allocation.yaml @@ -175,6 +175,7 @@ spec: {{ toYaml .Values.agones.allocator.tolerations | indent 8 }} {{- end }} serviceAccountName: {{ $.Values.agones.serviceaccount.allocator.name }} + terminationGracePeriodSeconds: {{ mul .Values.agones.allocator.readiness.periodSeconds .Values.agones.allocator.readiness.failureThreshold 3 }} {{- if eq .Values.agones.allocator.disableTLS false }} volumes: - name: tls @@ -202,6 +203,9 @@ spec: httpGet: path: /ready port: 8080 + initialDelaySeconds: {{ .Values.agones.allocator.readiness.initialDelaySeconds }} + periodSeconds: {{ .Values.agones.allocator.readiness.periodSeconds }} + failureThreshold: {{ .Values.agones.allocator.readiness.failureThreshold }} env: {{- if .Values.agones.allocator.service.http.enabled }} - name: HTTP_PORT @@ -247,6 +251,8 @@ spec: value: {{ .Values.agones.featureGates | quote }} - name: ALLOCATION_BATCH_WAIT_TIME value: {{ .Values.agones.allocator.allocationBatchWaitTime | quote }} + - name: READINESS_SHUTDOWN_DURATION + value: {{ mul .Values.agones.allocator.readiness.periodSeconds .Values.agones.extensions.readiness.failureThreshold 2 }}s ports: {{- if .Values.agones.allocator.service.http.enabled }} - name: {{ .Values.agones.allocator.service.http.portName }} diff --git a/install/helm/agones/values.yaml b/install/helm/agones/values.yaml index 66156d3a82..e48e34881e 100644 --- a/install/helm/agones/values.yaml +++ b/install/helm/agones/values.yaml @@ -172,6 +172,10 @@ agones: periodSeconds: 3 failureThreshold: 3 timeoutSeconds: 1 + readiness: + initialDelaySeconds: 3 + periodSeconds: 3 + failureThreshold: 3 tolerations: - key: "agones.dev/agones-system" operator: "Equal" diff --git a/install/yaml/install.yaml b/install/yaml/install.yaml index af8e67d438..9a4e5b69e6 100644 --- a/install/yaml/install.yaml +++ b/install/yaml/install.yaml @@ -16136,6 +16136,7 @@ spec: operator: Equal value: "true" serviceAccountName: agones-allocator + terminationGracePeriodSeconds: 27 volumes: - name: tls secret: @@ -16159,6 +16160,9 @@ spec: httpGet: path: /ready port: 8080 + initialDelaySeconds: 3 + periodSeconds: 3 + failureThreshold: 3 env: - name: HTTP_PORT value: "8443" @@ -16200,6 +16204,8 @@ spec: value: "" - name: ALLOCATION_BATCH_WAIT_TIME value: "500ms" + - name: READINESS_SHUTDOWN_DURATION + value: 18s ports: - name: https containerPort: 8443 diff --git a/pkg/util/signals/signals.go b/pkg/util/signals/signals.go index 33d6584973..3d8c0573d2 100644 --- a/pkg/util/signals/signals.go +++ b/pkg/util/signals/signals.go @@ -32,7 +32,7 @@ func NewSigKillContext() (context.Context, context.CancelFunc) { // NewSigTermHandler creates a channel to listen to SIGTERM and runs the handle function func NewSigTermHandler(handle func()) { c := make(chan os.Signal, 1) - signal.Notify(c, syscall.SIGTERM) + signal.Notify(c, syscall.SIGTERM, syscall.SIGINT) go func() { <-c diff --git a/test/e2e/allocator/main_test.go b/test/e2e/allocator/main_test.go new file mode 100644 index 0000000000..04433f0d75 --- /dev/null +++ b/test/e2e/allocator/main_test.go @@ -0,0 +1,92 @@ +// Copyright 2023 Google LLC All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package allocator + +import ( + "context" + "os" + "strconv" + "testing" + "time" + + helper "agones.dev/agones/test/e2e/allochelper" + e2eframework "agones.dev/agones/test/e2e/framework" + log "github.com/sirupsen/logrus" +) + +var framework *e2eframework.Framework + +func TestMain(m *testing.M) { + log.SetFormatter(&log.TextFormatter{ + EnvironmentOverrideColors: true, + FullTimestamp: true, + TimestampFormat: "2006-01-02 15:04:05.000", + }) + + var ( + err error + exitCode int + ) + + if err = e2eframework.ParseTestFlags(); err != nil { + log.WithError(err).Error("failed to parse go test flags") + os.Exit(1) + } + + if framework, err = e2eframework.NewFromFlags(); err != nil { + log.WithError(err).Error("failed to setup framework") + os.Exit(1) + } + + if err = helper.CleanupNamespaces(context.Background(), framework); err != nil { + log.WithError(err).Error("failed to cleanup e2e namespaces") + os.Exit(1) + } + + if framework.Namespace == "" { + // use a custom namespace - Unix timestamp + framework.Namespace = strconv.Itoa(int(time.Now().Unix())) + log.Infof("Custom namespace is set: %s", framework.Namespace) + + if err := framework.CreateNamespace(framework.Namespace); err != nil { + log.WithError(err).Error("failed to create a custom namespace") + os.Exit(1) + } + + defer func() { + if derr := framework.DeleteNamespace(framework.Namespace); derr != nil { + log.Error(derr) + } + os.Exit(exitCode) + }() + } else { + // use an already existing namespace + // run cleanup before tests to ensure no resources from previous runs exist + err = framework.CleanUp(framework.Namespace) + if err != nil { + log.WithError(err).Error("failed to cleanup resources") + } + + defer func() { + err = framework.CleanUp(framework.Namespace) + if err != nil { + log.WithError(err).Error("failed to cleanup resources") + } + os.Exit(exitCode) + }() + } + + exitCode = m.Run() +} diff --git a/test/e2e/allocator/pod_termination_test.go b/test/e2e/allocator/pod_termination_test.go new file mode 100644 index 0000000000..05c8bb0f09 --- /dev/null +++ b/test/e2e/allocator/pod_termination_test.go @@ -0,0 +1,82 @@ +// Copyright 2023 Google LLC All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package allocator + +import ( + "context" + "testing" + "time" + + pb "agones.dev/agones/pkg/allocation/go" + agonesv1 "agones.dev/agones/pkg/apis/agones/v1" + helper "agones.dev/agones/test/e2e/allochelper" + e2e "agones.dev/agones/test/e2e/framework" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/util/wait" +) + +const ( + retryInterval = 1 * time.Second + retryTimeout = 30 * time.Second +) + +func TestAllocatorAfterDeleteReplica(t *testing.T) { + ctx := context.Background() + + // get allocator pods + list, err := helper.GetAgonesAllocatorPods(ctx, framework) + assert.NoError(t, err) + + grpcClient, err := helper.GetAllocatorClient(ctx, t, framework) + require.NoError(t, err, "Could not initialize rpc client") + + // create fleet + flt, err := helper.CreateFleet(ctx, framework.Namespace, framework) + if !assert.Nil(t, err) { + return + } + framework.AssertFleetCondition(t, flt, e2e.FleetReadyCount(flt.Spec.Replicas)) + + var response *pb.AllocationResponse + request := &pb.AllocationRequest{ + Namespace: framework.Namespace, + PreferredGameServerSelectors: []*pb.GameServerSelector{{MatchLabels: map[string]string{agonesv1.FleetNameLabel: flt.ObjectMeta.Name}}}, + Scheduling: pb.AllocationRequest_Packed, + Metadata: &pb.MetaPatch{Labels: map[string]string{"gslabel": "allocatedbytest"}}, + } + + // delete 2 of the allocators + for _, pod := range list.Items[1:] { + err = helper.DeleteAgonesAllocatorPod(ctx, pod.ObjectMeta.Name, framework) + require.NoError(t, err, "Could not delete allocator pod") + } + + response, err = grpcClient.Allocate(context.Background(), request) + logrus.Info(response) + helper.ValidateAllocatorResponse(t, response) + require.NoError(t, err, "Failed grpc allocation request") + + // Wait and make another allocation call once we know the draining time has passed + _ = wait.PollImmediate(retryInterval, retryTimeout, func() (bool, error) { + return false, nil + }) + + response, err = grpcClient.Allocate(context.Background(), request) + logrus.Info(response) + helper.ValidateAllocatorResponse(t, response) + require.NoError(t, err, "Failed second grpc allocation request") +} diff --git a/test/e2e/allocator_test.go b/test/e2e/allocator_test.go index 7850d47c59..237af76c92 100644 --- a/test/e2e/allocator_test.go +++ b/test/e2e/allocator_test.go @@ -17,17 +17,9 @@ package e2e import ( "bytes" "context" - "crypto/rand" - "crypto/rsa" - "crypto/tls" - "crypto/x509" - "crypto/x509/pkix" "encoding/json" - "encoding/pem" "fmt" "io" - "math/big" - "net" "net/http" "testing" "time" @@ -36,14 +28,13 @@ import ( agonesv1 "agones.dev/agones/pkg/apis/agones/v1" multiclusterv1 "agones.dev/agones/pkg/apis/multicluster/v1" "agones.dev/agones/pkg/util/runtime" + helper "agones.dev/agones/test/e2e/allochelper" e2e "agones.dev/agones/test/e2e/framework" - "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials" "google.golang.org/grpc/status" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/uuid" @@ -64,20 +55,20 @@ const ( func TestAllocatorWithDeprecatedRequired(t *testing.T) { ctx := context.Background() - ip, port := getAllocatorEndpoint(ctx, t) + ip, port := helper.GetAllocatorEndpoint(ctx, t, framework) requestURL := fmt.Sprintf(allocatorReqURLFmt, ip, port) - tlsCA := refreshAllocatorTLSCerts(ctx, t, ip) + tlsCA := helper.RefreshAllocatorTLSCerts(ctx, t, ip, framework) var flt *agonesv1.Fleet var err error if runtime.FeatureEnabled(runtime.FeaturePlayerAllocationFilter) { - flt, err = createFleetWithOpts(ctx, framework.Namespace, func(f *agonesv1.Fleet) { + flt, err = helper.CreateFleetWithOpts(ctx, framework.Namespace, framework, func(f *agonesv1.Fleet) { f.Spec.Template.Spec.Players = &agonesv1.PlayersSpec{ InitialCapacity: 10, } }) } else { - flt, err = createFleet(ctx, framework.Namespace) + flt, err = helper.CreateFleet(ctx, framework.Namespace, framework) } assert.NoError(t, err) @@ -94,7 +85,7 @@ func TestAllocatorWithDeprecatedRequired(t *testing.T) { // wait for the allocation system to come online err = wait.PollImmediate(2*time.Second, 5*time.Minute, func() (bool, error) { // create the grpc client each time, as we may end up looking at an old cert - dialOpts, err := createRemoteClusterDialOption(ctx, allocatorClientSecretNamespace, allocatorClientSecretName, tlsCA) + dialOpts, err := helper.CreateRemoteClusterDialOption(ctx, allocatorClientSecretNamespace, allocatorClientSecretName, tlsCA, framework) if err != nil { return false, err } @@ -112,7 +103,7 @@ func TestAllocatorWithDeprecatedRequired(t *testing.T) { logrus.WithError(err).Info("failing Allocate request") return false, nil } - validateAllocatorResponse(t, response) + helper.ValidateAllocatorResponse(t, response) // let's do a re-allocation if runtime.FeatureEnabled(runtime.FeatureStateAllocationFilter) && runtime.FeatureEnabled(runtime.FeaturePlayerAllocationFilter) { @@ -122,7 +113,7 @@ func TestAllocatorWithDeprecatedRequired(t *testing.T) { allocatedResponse, err := grpcClient.Allocate(context.Background(), request) require.NoError(t, err) require.Equal(t, response.GameServerName, allocatedResponse.GameServerName) - validateAllocatorResponse(t, allocatedResponse) + helper.ValidateAllocatorResponse(t, allocatedResponse) // do a capacity based allocation logrus.Info("testing capacity allocation filter") @@ -134,7 +125,7 @@ func TestAllocatorWithDeprecatedRequired(t *testing.T) { allocatedResponse, err = grpcClient.Allocate(context.Background(), request) require.NoError(t, err) require.Equal(t, response.GameServerName, allocatedResponse.GameServerName) - validateAllocatorResponse(t, allocatedResponse) + helper.ValidateAllocatorResponse(t, allocatedResponse) // do a capacity based allocation that should fail // nolint:staticcheck @@ -160,20 +151,20 @@ func TestAllocatorWithDeprecatedRequired(t *testing.T) { func TestAllocatorWithSelectors(t *testing.T) { ctx := context.Background() - ip, port := getAllocatorEndpoint(ctx, t) + ip, port := helper.GetAllocatorEndpoint(ctx, t, framework) requestURL := fmt.Sprintf(allocatorReqURLFmt, ip, port) - tlsCA := refreshAllocatorTLSCerts(ctx, t, ip) + tlsCA := helper.RefreshAllocatorTLSCerts(ctx, t, ip, framework) var flt *agonesv1.Fleet var err error if runtime.FeatureEnabled(runtime.FeaturePlayerAllocationFilter) { - flt, err = createFleetWithOpts(ctx, framework.Namespace, func(f *agonesv1.Fleet) { + flt, err = helper.CreateFleetWithOpts(ctx, framework.Namespace, framework, func(f *agonesv1.Fleet) { f.Spec.Template.Spec.Players = &agonesv1.PlayersSpec{ InitialCapacity: 10, } }) } else { - flt, err = createFleet(ctx, framework.Namespace) + flt, err = helper.CreateFleet(ctx, framework.Namespace, framework) } assert.NoError(t, err) @@ -189,7 +180,7 @@ func TestAllocatorWithSelectors(t *testing.T) { // wait for the allocation system to come online err = wait.PollImmediate(2*time.Second, 5*time.Minute, func() (bool, error) { // create the grpc client each time, as we may end up looking at an old cert - dialOpts, err := createRemoteClusterDialOption(ctx, allocatorClientSecretNamespace, allocatorClientSecretName, tlsCA) + dialOpts, err := helper.CreateRemoteClusterDialOption(ctx, allocatorClientSecretNamespace, allocatorClientSecretName, tlsCA, framework) if err != nil { return false, err } @@ -207,7 +198,7 @@ func TestAllocatorWithSelectors(t *testing.T) { logrus.WithError(err).Info("failing Allocate request") return false, nil } - validateAllocatorResponse(t, response) + helper.ValidateAllocatorResponse(t, response) // let's do a re-allocation if runtime.FeatureEnabled(runtime.FeatureStateAllocationFilter) && runtime.FeatureEnabled(runtime.FeaturePlayerAllocationFilter) { @@ -216,7 +207,7 @@ func TestAllocatorWithSelectors(t *testing.T) { allocatedResponse, err := grpcClient.Allocate(context.Background(), request) require.NoError(t, err) require.Equal(t, response.GameServerName, allocatedResponse.GameServerName) - validateAllocatorResponse(t, allocatedResponse) + helper.ValidateAllocatorResponse(t, allocatedResponse) // do a capacity based allocation logrus.Info("testing capacity allocation filter") @@ -227,7 +218,7 @@ func TestAllocatorWithSelectors(t *testing.T) { allocatedResponse, err = grpcClient.Allocate(context.Background(), request) require.NoError(t, err) require.Equal(t, response.GameServerName, allocatedResponse.GameServerName) - validateAllocatorResponse(t, allocatedResponse) + helper.ValidateAllocatorResponse(t, allocatedResponse) // do a capacity based allocation that should fail request.GameServerSelectors[0].GameServerState = pb.GameServerSelector_ALLOCATED @@ -249,11 +240,11 @@ func TestAllocatorWithSelectors(t *testing.T) { func TestRestAllocatorWithDeprecatedRequired(t *testing.T) { ctx := context.Background() - ip, port := getAllocatorEndpoint(ctx, t) + ip, port := helper.GetAllocatorEndpoint(ctx, t, framework) requestURL := fmt.Sprintf(allocatorReqURLFmt, ip, port) - tlsCA := refreshAllocatorTLSCerts(ctx, t, ip) + tlsCA := helper.RefreshAllocatorTLSCerts(ctx, t, ip, framework) - flt, err := createFleet(ctx, framework.Namespace) + flt, err := helper.CreateFleet(ctx, framework.Namespace, framework) if !assert.Nil(t, err) { return } @@ -265,7 +256,7 @@ func TestRestAllocatorWithDeprecatedRequired(t *testing.T) { Scheduling: pb.AllocationRequest_Packed, Metadata: &pb.MetaPatch{Labels: map[string]string{"gslabel": "allocatedbytest"}}, } - tlsCfg, err := getTLSConfig(ctx, allocatorClientSecretNamespace, allocatorClientSecretName, tlsCA) + tlsCfg, err := helper.GetTLSConfig(ctx, allocatorClientSecretNamespace, allocatorClientSecretName, tlsCA, framework) if !assert.Nil(t, err) { return } @@ -303,7 +294,7 @@ func TestRestAllocatorWithDeprecatedRequired(t *testing.T) { logrus.WithError(err).Info("failed to unmarshal Allocate response") return false, nil } - validateAllocatorResponse(t, &response) + helper.ValidateAllocatorResponse(t, &response) return true, nil }) @@ -313,11 +304,11 @@ func TestRestAllocatorWithDeprecatedRequired(t *testing.T) { func TestRestAllocatorWithSelectors(t *testing.T) { ctx := context.Background() - ip, port := getAllocatorEndpoint(ctx, t) + ip, port := helper.GetAllocatorEndpoint(ctx, t, framework) requestURL := fmt.Sprintf(allocatorReqURLFmt, ip, port) - tlsCA := refreshAllocatorTLSCerts(ctx, t, ip) + tlsCA := helper.RefreshAllocatorTLSCerts(ctx, t, ip, framework) - flt, err := createFleet(ctx, framework.Namespace) + flt, err := helper.CreateFleet(ctx, framework.Namespace, framework) if !assert.Nil(t, err) { return } @@ -328,7 +319,7 @@ func TestRestAllocatorWithSelectors(t *testing.T) { Scheduling: pb.AllocationRequest_Packed, Metadata: &pb.MetaPatch{Labels: map[string]string{"gslabel": "allocatedbytest", "blue-frog.fred_thing": "test.dog_fred-blue"}}, } - tlsCfg, err := getTLSConfig(ctx, allocatorClientSecretNamespace, allocatorClientSecretName, tlsCA) + tlsCfg, err := helper.GetTLSConfig(ctx, allocatorClientSecretNamespace, allocatorClientSecretName, tlsCA, framework) if !assert.Nil(t, err) { return } @@ -366,7 +357,7 @@ func TestRestAllocatorWithSelectors(t *testing.T) { logrus.WithError(err).Info("failed to unmarshal Allocate response") return false, nil } - validateAllocatorResponse(t, &response) + helper.ValidateAllocatorResponse(t, &response) return true, nil }) require.NoError(t, err) @@ -384,13 +375,13 @@ func TestRestAllocatorWithSelectors(t *testing.T) { func TestAllocatorCrossNamespace(t *testing.T) { ctx := context.Background() - ip, port := getAllocatorEndpoint(ctx, t) + ip, port := helper.GetAllocatorEndpoint(ctx, t, framework) requestURL := fmt.Sprintf(allocatorReqURLFmt, ip, port) - tlsCA := refreshAllocatorTLSCerts(ctx, t, ip) + tlsCA := helper.RefreshAllocatorTLSCerts(ctx, t, ip, framework) // Create namespaces A and B namespaceA := framework.Namespace // let's reuse an existing one - copyDefaultAllocatorClientSecret(ctx, t, namespaceA) + helper.CopyDefaultAllocatorClientSecret(ctx, t, namespaceA, framework) namespaceB := fmt.Sprintf("allocator-b-%s", uuid.NewUUID()) err := framework.CreateNamespace(namespaceB) @@ -420,10 +411,10 @@ func TestAllocatorCrossNamespace(t *testing.T) { }, }, } - createAllocationPolicy(ctx, t, p) + helper.CreateAllocationPolicy(ctx, t, framework, p) // Create a fleet in namespace B. Allocation should not happen in A according to policy - flt, err := createFleet(ctx, namespaceB) + flt, err := helper.CreateFleet(ctx, namespaceB, framework) if !assert.Nil(t, err) { return } @@ -438,7 +429,7 @@ func TestAllocatorCrossNamespace(t *testing.T) { // wait for the allocation system to come online err = wait.PollImmediate(2*time.Second, 5*time.Minute, func() (bool, error) { // create the grpc client each time, as we may end up looking at an old cert - dialOpts, err := createRemoteClusterDialOption(ctx, namespaceA, allocatorClientSecretName, tlsCA) + dialOpts, err := helper.CreateRemoteClusterDialOption(ctx, namespaceA, allocatorClientSecretName, tlsCA, framework) if err != nil { return false, err } @@ -456,200 +447,9 @@ func TestAllocatorCrossNamespace(t *testing.T) { logrus.WithError(err).Info("failing Allocate request") return false, nil } - validateAllocatorResponse(t, response) + helper.ValidateAllocatorResponse(t, response) return true, nil }) assert.NoError(t, err) } - -func copyDefaultAllocatorClientSecret(ctx context.Context, t *testing.T, toNamespace string) { - kubeCore := framework.KubeClient.CoreV1() - clientSecret, err := kubeCore.Secrets(allocatorClientSecretNamespace).Get(ctx, allocatorClientSecretName, metav1.GetOptions{}) - if err != nil { - t.Fatalf("Could not retrieve default allocator client secret %s/%s: %v", allocatorClientSecretNamespace, allocatorClientSecretName, err) - } - clientSecret.ObjectMeta.Namespace = toNamespace - clientSecret.ResourceVersion = "" - _, err = kubeCore.Secrets(toNamespace).Create(ctx, clientSecret, metav1.CreateOptions{}) - if err != nil { - t.Fatalf("Could not copy default allocator client %s/%s secret to namespace %s: %v", allocatorClientSecretNamespace, allocatorClientSecretName, toNamespace, err) - } -} - -func createAllocationPolicy(ctx context.Context, t *testing.T, p *multiclusterv1.GameServerAllocationPolicy) { - t.Helper() - - mc := framework.AgonesClient.MulticlusterV1() - policy, err := mc.GameServerAllocationPolicies(p.Namespace).Create(ctx, p, metav1.CreateOptions{}) - if err != nil { - t.Fatalf("creating allocation policy failed: %s", err) - } - t.Logf("created allocation policy %v", policy) -} - -func getAllocatorEndpoint(ctx context.Context, t *testing.T) (string, int32) { - kubeCore := framework.KubeClient.CoreV1() - svc, err := kubeCore.Services(agonesSystemNamespace).Get(ctx, allocatorServiceName, metav1.GetOptions{}) - if !assert.Nil(t, err) { - t.FailNow() - } - if !assert.NotNil(t, svc.Status.LoadBalancer) { - t.FailNow() - } - if !assert.Equal(t, 1, len(svc.Status.LoadBalancer.Ingress)) { - t.FailNow() - } - if !assert.NotNil(t, 0, svc.Status.LoadBalancer.Ingress[0].IP) { - t.FailNow() - } - - port := svc.Spec.Ports[0] - return svc.Status.LoadBalancer.Ingress[0].IP, port.Port -} - -// createRemoteClusterDialOption creates a grpc client dial option with proper certs to make a remote call. -// -//nolint:unparam -func createRemoteClusterDialOption(ctx context.Context, namespace, clientSecretName string, tlsCA []byte) (grpc.DialOption, error) { - tlsConfig, err := getTLSConfig(ctx, namespace, clientSecretName, tlsCA) - if err != nil { - return nil, err - } - - return grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), nil -} - -func getTLSConfig(ctx context.Context, namespace, clientSecretName string, tlsCA []byte) (*tls.Config, error) { - kubeCore := framework.KubeClient.CoreV1() - clientSecret, err := kubeCore.Secrets(namespace).Get(ctx, clientSecretName, metav1.GetOptions{}) - if err != nil { - return nil, errors.Errorf("getting client secret %s/%s failed: %s", namespace, clientSecretName, err) - } - - // Create http client using cert - clientCert := clientSecret.Data[tlsCrtTag] - clientKey := clientSecret.Data[tlsKeyTag] - if clientCert == nil || clientKey == nil { - return nil, errors.New("missing certificate") - } - - // Load client cert - cert, err := tls.X509KeyPair(clientCert, clientKey) - if err != nil { - return nil, err - } - - rootCA := x509.NewCertPool() - if !rootCA.AppendCertsFromPEM(tlsCA) { - return nil, errors.New("could not append PEM format CA cert") - } - - return &tls.Config{ - Certificates: []tls.Certificate{cert}, - RootCAs: rootCA, - }, nil -} - -func createFleet(ctx context.Context, namespace string) (*agonesv1.Fleet, error) { - return createFleetWithOpts(ctx, namespace, func(*agonesv1.Fleet) {}) -} - -func createFleetWithOpts(ctx context.Context, namespace string, opts func(fleet *agonesv1.Fleet)) (*agonesv1.Fleet, error) { - fleets := framework.AgonesClient.AgonesV1().Fleets(namespace) - fleet := defaultFleet(namespace) - opts(fleet) - return fleets.Create(ctx, fleet, metav1.CreateOptions{}) -} - -func refreshAllocatorTLSCerts(ctx context.Context, t *testing.T, host string) []byte { - t.Helper() - - pub, priv := generateTLSCertPair(t, host) - // verify key pair - if _, err := tls.X509KeyPair(pub, priv); err != nil { - t.Fatalf("generated key pair failed create cert: %s", err) - } - - kubeCore := framework.KubeClient.CoreV1() - - require.Eventually(t, func() bool { - s, err := kubeCore.Secrets(agonesSystemNamespace).Get(ctx, allocatorTLSName, metav1.GetOptions{}) - if err != nil { - t.Logf("failed getting secret %s/%s failed: %s", agonesSystemNamespace, allocatorTLSName, err) - return false - } - - s.Data[tlsCrtTag] = pub - s.Data[tlsKeyTag] = priv - if _, err := kubeCore.Secrets(agonesSystemNamespace).Update(ctx, s, metav1.UpdateOptions{}); err != nil { - t.Logf("failed updating secrets failed: %s", err) - return false - } - - return true - }, time.Minute, time.Second, "Could not update Secret") - - t.Logf("Allocator TLS is refreshed with public CA: %s for endpoint %s", string(pub), host) - return pub -} - -func generateTLSCertPair(t *testing.T, host string) ([]byte, []byte) { - t.Helper() - - priv, err := rsa.GenerateKey(rand.Reader, 2048) - if err != nil { - t.Fatalf("generating RSA key failed: %s", err) - } - - notBefore := time.Now() - notAfter := notBefore.Add(time.Hour) - - serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 128) - serialNumber, err := rand.Int(rand.Reader, serialNumberLimit) - if err != nil { - t.Fatalf("generating serial number failed: %s", err) - } - - template := x509.Certificate{ - SerialNumber: serialNumber, - Subject: pkix.Name{ - CommonName: host, - Organization: []string{"testing"}, - }, - NotBefore: notBefore, - NotAfter: notAfter, - KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign, - ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth}, - SignatureAlgorithm: x509.SHA1WithRSA, - BasicConstraintsValid: true, - IsCA: true, - } - - if host != "" { - template.IPAddresses = []net.IP{net.ParseIP(host)} - } - derBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, &priv.PublicKey, priv) - if err != nil { - t.Fatalf("creating certificate failed: %s", err) - } - pemPubBytes := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: derBytes}) - privBytes, err := x509.MarshalPKCS8PrivateKey(priv) - if err != nil { - t.Fatalf("marshalling private key failed: %v", err) - } - pemPrivBytes := pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: privBytes}) - - return pemPubBytes, pemPrivBytes -} - -func validateAllocatorResponse(t *testing.T, resp *pb.AllocationResponse) { - t.Helper() - if !assert.NotNil(t, resp) { - return - } - assert.Greater(t, len(resp.Ports), 0) - assert.NotEmpty(t, resp.GameServerName) - assert.NotEmpty(t, resp.Address) - assert.NotEmpty(t, resp.NodeName) -} diff --git a/test/e2e/allochelper/helper_func.go b/test/e2e/allochelper/helper_func.go new file mode 100644 index 0000000000..4ce897ac0e --- /dev/null +++ b/test/e2e/allochelper/helper_func.go @@ -0,0 +1,368 @@ +// Copyright 2023 Google LLC All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package allochelper is a package for helper function that is used by e2e tests +package allochelper + +import ( + "context" + "crypto/rand" + "crypto/rsa" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "fmt" + "math/big" + "net" + "testing" + "time" + + pb "agones.dev/agones/pkg/allocation/go" + agonesv1 "agones.dev/agones/pkg/apis/agones/v1" + multiclusterv1 "agones.dev/agones/pkg/apis/multicluster/v1" + e2e "agones.dev/agones/test/e2e/framework" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/wait" +) + +const ( + agonesSystemNamespace = "agones-system" + allocatorServiceName = "agones-allocator" + allocatorTLSName = "allocator-tls" + tlsCrtTag = "tls.crt" + tlsKeyTag = "tls.key" + allocatorReqURLFmt = "%s:%d" + allocatorClientSecretName = "allocator-client.default" + allocatorClientSecretNamespace = "default" + replicasCount = 3 +) + +// CopyDefaultAllocatorClientSecret copys the allocator client secret +func CopyDefaultAllocatorClientSecret(ctx context.Context, t *testing.T, toNamespace string, framework *e2e.Framework) { + kubeCore := framework.KubeClient.CoreV1() + clientSecret, err := kubeCore.Secrets(allocatorClientSecretNamespace).Get(ctx, allocatorClientSecretName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Could not retrieve default allocator client secret %s/%s: %v", allocatorClientSecretNamespace, allocatorClientSecretName, err) + } + clientSecret.ObjectMeta.Namespace = toNamespace + clientSecret.ResourceVersion = "" + _, err = kubeCore.Secrets(toNamespace).Create(ctx, clientSecret, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Could not copy default allocator client %s/%s secret to namespace %s: %v", allocatorClientSecretNamespace, allocatorClientSecretName, toNamespace, err) + } +} + +// CreateAllocationPolicy create a allocation policy +func CreateAllocationPolicy(ctx context.Context, t *testing.T, framework *e2e.Framework, p *multiclusterv1.GameServerAllocationPolicy) { + t.Helper() + + mc := framework.AgonesClient.MulticlusterV1() + policy, err := mc.GameServerAllocationPolicies(p.Namespace).Create(ctx, p, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("creating allocation policy failed: %s", err) + } + t.Logf("created allocation policy %v", policy) +} + +// GetAllocatorEndpoint gets the allocator LB endpoint +func GetAllocatorEndpoint(ctx context.Context, t *testing.T, framework *e2e.Framework) (string, int32) { + kubeCore := framework.KubeClient.CoreV1() + svc, err := kubeCore.Services(agonesSystemNamespace).Get(ctx, allocatorServiceName, metav1.GetOptions{}) + if !assert.Nil(t, err) { + t.FailNow() + } + if !assert.NotNil(t, svc.Status.LoadBalancer) { + t.FailNow() + } + if !assert.Equal(t, 1, len(svc.Status.LoadBalancer.Ingress)) { + t.FailNow() + } + if !assert.NotNil(t, 0, svc.Status.LoadBalancer.Ingress[0].IP) { + t.FailNow() + } + + port := svc.Spec.Ports[0] + return svc.Status.LoadBalancer.Ingress[0].IP, port.Port +} + +// CreateRemoteClusterDialOption creates a grpc client dial option with proper certs to make a remote call. +// +//nolint:unparam +func CreateRemoteClusterDialOption(ctx context.Context, namespace, clientSecretName string, tlsCA []byte, framework *e2e.Framework) (grpc.DialOption, error) { + tlsConfig, err := GetTLSConfig(ctx, namespace, clientSecretName, tlsCA, framework) + if err != nil { + return nil, err + } + + return grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), nil +} + +// GetTLSConfig gets the namesapce client secret +func GetTLSConfig(ctx context.Context, namespace, clientSecretName string, tlsCA []byte, framework *e2e.Framework) (*tls.Config, error) { + kubeCore := framework.KubeClient.CoreV1() + clientSecret, err := kubeCore.Secrets(namespace).Get(ctx, clientSecretName, metav1.GetOptions{}) + if err != nil { + return nil, errors.Errorf("getting client secret %s/%s failed: %s", namespace, clientSecretName, err) + } + + // Create http client using cert + clientCert := clientSecret.Data[tlsCrtTag] + clientKey := clientSecret.Data[tlsKeyTag] + if clientCert == nil || clientKey == nil { + return nil, errors.New("missing certificate") + } + + // Load client cert + cert, err := tls.X509KeyPair(clientCert, clientKey) + if err != nil { + return nil, err + } + + rootCA := x509.NewCertPool() + if !rootCA.AppendCertsFromPEM(tlsCA) { + return nil, errors.New("could not append PEM format CA cert") + } + + return &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: rootCA, + }, nil +} + +// CreateFleet creates a game server fleet +func CreateFleet(ctx context.Context, namespace string, framework *e2e.Framework) (*agonesv1.Fleet, error) { + return CreateFleetWithOpts(ctx, namespace, framework, func(*agonesv1.Fleet) {}) +} + +// CreateFleetWithOpts creates a game server fleet with the designated options +func CreateFleetWithOpts(ctx context.Context, namespace string, framework *e2e.Framework, opts func(fleet *agonesv1.Fleet)) (*agonesv1.Fleet, error) { + fleets := framework.AgonesClient.AgonesV1().Fleets(namespace) + fleet := defaultFleet(namespace, framework) + opts(fleet) + return fleets.Create(ctx, fleet, metav1.CreateOptions{}) +} + +// RefreshAllocatorTLSCerts refreshes the allocator TLS cert with a newly generated cert +func RefreshAllocatorTLSCerts(ctx context.Context, t *testing.T, host string, framework *e2e.Framework) []byte { + t.Helper() + + pub, priv := generateTLSCertPair(t, host) + // verify key pair + if _, err := tls.X509KeyPair(pub, priv); err != nil { + t.Fatalf("generated key pair failed create cert: %s", err) + } + + kubeCore := framework.KubeClient.CoreV1() + + require.Eventually(t, func() bool { + s, err := kubeCore.Secrets(agonesSystemNamespace).Get(ctx, allocatorTLSName, metav1.GetOptions{}) + if err != nil { + t.Logf("failed getting secret %s/%s failed: %s", agonesSystemNamespace, allocatorTLSName, err) + return false + } + + s.Data[tlsCrtTag] = pub + s.Data[tlsKeyTag] = priv + if _, err := kubeCore.Secrets(agonesSystemNamespace).Update(ctx, s, metav1.UpdateOptions{}); err != nil { + t.Logf("failed updating secrets failed: %s", err) + return false + } + + return true + }, time.Minute, time.Second, "Could not update Secret") + + t.Logf("Allocator TLS is refreshed with public CA: %s for endpoint %s", string(pub), host) + return pub +} + +func generateTLSCertPair(t *testing.T, host string) ([]byte, []byte) { + t.Helper() + + priv, err := rsa.GenerateKey(rand.Reader, 2048) + if err != nil { + t.Fatalf("generating RSA key failed: %s", err) + } + + notBefore := time.Now() + notAfter := notBefore.Add(time.Hour) + + serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 128) + serialNumber, err := rand.Int(rand.Reader, serialNumberLimit) + if err != nil { + t.Fatalf("generating serial number failed: %s", err) + } + + template := x509.Certificate{ + SerialNumber: serialNumber, + Subject: pkix.Name{ + CommonName: host, + Organization: []string{"testing"}, + }, + NotBefore: notBefore, + NotAfter: notAfter, + KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth}, + SignatureAlgorithm: x509.SHA1WithRSA, + BasicConstraintsValid: true, + IsCA: true, + } + + if host != "" { + template.IPAddresses = []net.IP{net.ParseIP(host)} + } + derBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, &priv.PublicKey, priv) + if err != nil { + t.Fatalf("creating certificate failed: %s", err) + } + pemPubBytes := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: derBytes}) + privBytes, err := x509.MarshalPKCS8PrivateKey(priv) + if err != nil { + t.Fatalf("marshalling private key failed: %v", err) + } + pemPrivBytes := pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: privBytes}) + + return pemPubBytes, pemPrivBytes +} + +// ValidateAllocatorResponse validates the response returned by the allcoator +func ValidateAllocatorResponse(t *testing.T, resp *pb.AllocationResponse) { + t.Helper() + if !assert.NotNil(t, resp) { + return + } + assert.Greater(t, len(resp.Ports), 0) + assert.NotEmpty(t, resp.GameServerName) + assert.NotEmpty(t, resp.Address) + assert.NotEmpty(t, resp.NodeName) +} + +// DeleteAgonesAllocatorPod deletes a Agones allocator pod +func DeleteAgonesAllocatorPod(ctx context.Context, podName string, framework *e2e.Framework) error { + policy := metav1.DeletePropagationBackground + err := framework.KubeClient.CoreV1().Pods("agones-system").Delete(ctx, podName, + metav1.DeleteOptions{PropagationPolicy: &policy}) + return err +} + +// GetAgonesAllocatorPods returns all the Agones allocator pods +func GetAgonesAllocatorPods(ctx context.Context, framework *e2e.Framework) (*corev1.PodList, error) { + opts := metav1.ListOptions{LabelSelector: labels.Set{"multicluster.agones.dev/role": "allocator"}.String()} + return framework.KubeClient.CoreV1().Pods("agones-system").List(ctx, opts) +} + +// GetAllocatorClient creates a client and ensure that it can be connected to +func GetAllocatorClient(ctx context.Context, t *testing.T, framework *e2e.Framework) (pb.AllocationServiceClient, error) { + ip, port := GetAllocatorEndpoint(ctx, t, framework) + requestURL := fmt.Sprintf(allocatorReqURLFmt, ip, port) + tlsCA := RefreshAllocatorTLSCerts(ctx, t, ip, framework) + + flt, err := CreateFleet(ctx, framework.Namespace, framework) + if !assert.Nil(t, err) { + return nil, err + } + framework.AssertFleetCondition(t, flt, e2e.FleetReadyCount(flt.Spec.Replicas)) + + dialOpts, err := CreateRemoteClusterDialOption(ctx, allocatorClientSecretNamespace, allocatorClientSecretName, tlsCA, framework) + if err != nil { + return nil, err + } + + conn, err := grpc.Dial(requestURL, dialOpts) + require.NoError(t, err, "Failed grpc.Dial") + go func() { + <-ctx.Done() + conn.Close() // nolint: errcheck + }() + + grpcClient := pb.NewAllocationServiceClient(conn) + + request := &pb.AllocationRequest{ + Namespace: framework.Namespace, + PreferredGameServerSelectors: []*pb.GameServerSelector{{MatchLabels: map[string]string{agonesv1.FleetNameLabel: flt.ObjectMeta.Name}}}, + Scheduling: pb.AllocationRequest_Packed, + Metadata: &pb.MetaPatch{Labels: map[string]string{"gslabel": "allocatedbytest"}}, + } + + var response *pb.AllocationResponse + err = wait.PollImmediate(2*time.Second, 5*time.Minute, func() (bool, error) { + response, err = grpcClient.Allocate(context.Background(), request) + if err != nil { + logrus.WithError(err).Info("failing Allocate request") + return false, nil + } + ValidateAllocatorResponse(t, response) + return true, nil + }) + if err != nil { + return nil, err + } + + return grpcClient, nil +} + +// CleanupNamespaces cleans up the framework namespace +func CleanupNamespaces(ctx context.Context, framework *e2e.Framework) error { + // list all e2e namespaces + opts := metav1.ListOptions{LabelSelector: labels.Set(e2e.NamespaceLabel).String()} + list, err := framework.KubeClient.CoreV1().Namespaces().List(ctx, opts) + if err != nil { + return err + } + + // loop through them, and delete them + for _, ns := range list.Items { + if err := framework.DeleteNamespace(ns.ObjectMeta.Name); err != nil { + cause := errors.Cause(err) + if k8serrors.IsConflict(cause) { + logrus.WithError(cause).Warn("namespace already being deleted") + continue + } + // here just in case we need to catch other errors + logrus.WithField("reason", k8serrors.ReasonForError(cause)).Info("cause for namespace deletion error") + return cause + } + } + + return nil +} + +// From fleet_test +// defaultFleet returns a default fleet configuration +func defaultFleet(namespace string, framework *e2e.Framework) *agonesv1.Fleet { + gs := framework.DefaultGameServer(namespace) + return fleetWithGameServerSpec(&gs.Spec, namespace) +} + +// fleetWithGameServerSpec returns a fleet with specified gameserver spec +func fleetWithGameServerSpec(gsSpec *agonesv1.GameServerSpec, namespace string) *agonesv1.Fleet { + return &agonesv1.Fleet{ + ObjectMeta: metav1.ObjectMeta{GenerateName: "simple-fleet-1.0", Namespace: namespace}, + Spec: agonesv1.FleetSpec{ + Replicas: replicasCount, + Template: agonesv1.GameServerTemplateSpec{ + Spec: *gsSpec, + }, + }, + } +}