Skip to content

Commit

Permalink
Ading a more graceful termination for allocator
Browse files Browse the repository at this point in the history
  • Loading branch information
chiayi committed May 2, 2023
1 parent 08df7cb commit d550adb
Show file tree
Hide file tree
Showing 10 changed files with 641 additions and 240 deletions.
19 changes: 19 additions & 0 deletions build/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ test-e2e:
$(MAKE) DOCKER_RUN_ARGS="$(DOCKER_RUN_ARGS)" test-e2e-integration
$(MAKE) DOCKER_RUN_ARGS="$(DOCKER_RUN_ARGS)" test-e2e-failure
$(MAKE) DOCKER_RUN_ARGS="$(DOCKER_RUN_ARGS)" test-e2e-ha-extensions
$(MAKE) DOCKER_RUN_ARGS="$(DOCKER_RUN_ARGS)" test-e2e-allocator-crash


# e2e test args:
Expand Down Expand Up @@ -364,6 +365,24 @@ else
endif
echo "Finishing e2e extensions high availability test!"

test-e2e-allocator-crash: FEATURE_GATES ?= $(ALPHA_FEATURE_GATES)
test-e2e-allocator-crash: CLOUD_PRODUCT ?= generic
test-e2e-allocator-crash: GO_E2E_TEST_INTEGRATION_ARGS ?=\
--cloud-product=$(CLOUD_PRODUCT) \
--gameserver-image=$(GS_TEST_IMAGE) \
--feature-gates=$(FEATURE_GATES) \
--pullsecret=$(IMAGE_PULL_SECRET)
test-e2e-allocator-crash: $(ensure-build-image)
echo "Starting e2e allocation pod deletion test!"
ifdef E2E_USE_GOTESTSUM
$(GOTESTSUM) --packages=$(agones_package)/test/e2e/allocator -- $(go_test_args) $(ARGS) -parallel=1 -args \
$(GO_E2E_TEST_ARGS) $(GO_E2E_TEST_INTEGRATION_ARGS)
else
$(GO_TEST) $(ARGS) -parallel=1 $(agones_package)/test/e2e/allocator -args \
$(GO_E2E_TEST_ARGS) $(GO_E2E_TEST_INTEGRATION_ARGS)
endif
echo "Finishing e2e allocation pod deletion test!"

# Runs end-to-end stress tests on the current configured cluster
# For minikube user the minikube-stress-test-e2e targets
stress-test-e2e: $(ensure-build-image)
Expand Down
34 changes: 29 additions & 5 deletions cmd/allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ import (
"k8s.io/client-go/rest"
)

var logger = runtime.NewLoggerWithSource("main")
var (
podReady bool
logger = runtime.NewLoggerWithSource("main")
)

const (
certDir = "/home/allocator/client-ca/"
Expand All @@ -77,6 +80,7 @@ const (
apiServerBurstQPSFlag = "api-server-qps-burst"
logLevelFlag = "log-level"
allocationBatchWaitTime = "allocation-batch-wait-time"
readinessShutdownDuration = "readiness-shutdown-duration"
)

func parseEnvFlags() config {
Expand Down Expand Up @@ -109,6 +113,7 @@ func parseEnvFlags() config {
pflag.Duration(totalRemoteAllocationTimeoutFlag, viper.GetDuration(totalRemoteAllocationTimeoutFlag), "Flag to set total remote allocation timeout including retries.")
pflag.String(logLevelFlag, viper.GetString(logLevelFlag), "Agones Log level")
pflag.Duration(allocationBatchWaitTime, viper.GetDuration(allocationBatchWaitTime), "Flag to configure the waiting period between allocations batches")
pflag.Duration(readinessShutdownDuration, viper.GetDuration(readinessShutdownDuration), "Time in seconds for SIGTERM/SIGINT handler to sleep for.")
runtime.FeaturesBindFlags()
pflag.Parse()

Expand All @@ -127,6 +132,7 @@ func parseEnvFlags() config {
runtime.Must(viper.BindEnv(totalRemoteAllocationTimeoutFlag))
runtime.Must(viper.BindEnv(logLevelFlag))
runtime.Must(viper.BindEnv(allocationBatchWaitTime))
runtime.Must(viper.BindEnv(readinessShutdownDuration))
runtime.Must(viper.BindPFlags(pflag.CommandLine))
runtime.Must(runtime.FeaturesBindEnv())

Expand All @@ -147,6 +153,7 @@ func parseEnvFlags() config {
remoteAllocationTimeout: viper.GetDuration(remoteAllocationTimeoutFlag),
totalRemoteAllocationTimeout: viper.GetDuration(totalRemoteAllocationTimeoutFlag),
allocationBatchWaitTime: viper.GetDuration(allocationBatchWaitTime),
ReadinessShutdownDuration: viper.GetDuration(readinessShutdownDuration),
}
}

Expand All @@ -165,6 +172,7 @@ type config struct {
totalRemoteAllocationTimeout time.Duration
remoteAllocationTimeout time.Duration
allocationBatchWaitTime time.Duration
ReadinessShutdownDuration time.Duration
}

// grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC
Expand Down Expand Up @@ -214,12 +222,29 @@ func main() {
// This will test the connection to agones on each readiness probe
// so if one of the allocator pod can't reach Kubernetes it will be removed
// from the Kubernetes service.
ctx, cancelCtx := context.WithCancel(context.Background())
podReady = true
health.AddReadinessCheck("allocator-agones-client", func() error {
if !podReady {
return errors.New("asked to shut down, failed readiness check")
}
_, err := agonesClient.ServerVersion()
return err
if err != nil {
return fmt.Errorf("failed to reach Kubernetes: %w", err)
}
return nil
})

signals.NewSigTermHandler(func() {
logger.Info("Pod shutdown has been requested, failing readiness check")
podReady = false
time.Sleep(conf.ReadinessShutdownDuration)
cancelCtx()
logger.Infof("Readiness shutdown duration has passed, exiting pod")
os.Exit(0)
})

h := newServiceHandler(kubeClient, agonesClient, health, conf.MTLSDisabled, conf.TLSDisabled, conf.remoteAllocationTimeout, conf.totalRemoteAllocationTimeout, conf.allocationBatchWaitTime)
h := newServiceHandler(ctx, kubeClient, agonesClient, health, conf.MTLSDisabled, conf.TLSDisabled, conf.remoteAllocationTimeout, conf.totalRemoteAllocationTimeout, conf.allocationBatchWaitTime)

if !h.tlsDisabled {
cancelTLS, err := fswatch.Watch(logger, tlsDir, time.Second, func() {
Expand Down Expand Up @@ -353,7 +378,7 @@ func runGRPC(h *serviceHandler, grpcPort int) {
}()
}

func newServiceHandler(kubeClient kubernetes.Interface, agonesClient versioned.Interface, health healthcheck.Handler, mTLSDisabled bool, tlsDisabled bool, remoteAllocationTimeout time.Duration, totalRemoteAllocationTimeout time.Duration, allocationBatchWaitTime time.Duration) *serviceHandler {
func newServiceHandler(ctx context.Context, kubeClient kubernetes.Interface, agonesClient versioned.Interface, health healthcheck.Handler, mTLSDisabled bool, tlsDisabled bool, remoteAllocationTimeout time.Duration, totalRemoteAllocationTimeout time.Duration, allocationBatchWaitTime time.Duration) *serviceHandler {
defaultResync := 30 * time.Second
agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, defaultResync)
kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync)
Expand All @@ -369,7 +394,6 @@ func newServiceHandler(kubeClient kubernetes.Interface, agonesClient versioned.I
totalRemoteAllocationTimeout,
allocationBatchWaitTime)

ctx, _ := signals.NewSigKillContext()
h := serviceHandler{
allocationCallback: func(gsa *allocationv1.GameServerAllocation) (k8sruntime.Object, error) {
return allocator.Allocate(ctx, gsa)
Expand Down
6 changes: 6 additions & 0 deletions install/helm/agones/templates/service/allocation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ spec:
{{ toYaml .Values.agones.allocator.tolerations | indent 8 }}
{{- end }}
serviceAccountName: {{ $.Values.agones.serviceaccount.allocator.name }}
terminationGracePeriodSeconds: {{ mul .Values.agones.allocator.readiness.periodSeconds .Values.agones.allocator.readiness.failureThreshold 3 }}
{{- if eq .Values.agones.allocator.disableTLS false }}
volumes:
- name: tls
Expand Down Expand Up @@ -202,6 +203,9 @@ spec:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: {{ .Values.agones.allocator.readiness.initialDelaySeconds }}
periodSeconds: {{ .Values.agones.allocator.readiness.periodSeconds }}
failureThreshold: {{ .Values.agones.allocator.readiness.failureThreshold }}
env:
{{- if .Values.agones.allocator.service.http.enabled }}
- name: HTTP_PORT
Expand Down Expand Up @@ -247,6 +251,8 @@ spec:
value: {{ .Values.agones.featureGates | quote }}
- name: ALLOCATION_BATCH_WAIT_TIME
value: {{ .Values.agones.allocator.allocationBatchWaitTime | quote }}
- name: READINESS_SHUTDOWN_DURATION
value: {{ mul .Values.agones.allocator.readiness.periodSeconds .Values.agones.extensions.readiness.failureThreshold 2 }}s
ports:
{{- if .Values.agones.allocator.service.http.enabled }}
- name: {{ .Values.agones.allocator.service.http.portName }}
Expand Down
4 changes: 4 additions & 0 deletions install/helm/agones/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ agones:
periodSeconds: 3
failureThreshold: 3
timeoutSeconds: 1
readiness:
initialDelaySeconds: 3
periodSeconds: 3
failureThreshold: 3
tolerations:
- key: "agones.dev/agones-system"
operator: "Equal"
Expand Down
6 changes: 6 additions & 0 deletions install/yaml/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16136,6 +16136,7 @@ spec:
operator: Equal
value: "true"
serviceAccountName: agones-allocator
terminationGracePeriodSeconds: 27
volumes:
- name: tls
secret:
Expand All @@ -16159,6 +16160,9 @@ spec:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 3
periodSeconds: 3
failureThreshold: 3
env:
- name: HTTP_PORT
value: "8443"
Expand Down Expand Up @@ -16200,6 +16204,8 @@ spec:
value: ""
- name: ALLOCATION_BATCH_WAIT_TIME
value: "500ms"
- name: READINESS_SHUTDOWN_DURATION
value: 18s
ports:
- name: https
containerPort: 8443
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/signals/signals.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func NewSigKillContext() (context.Context, context.CancelFunc) {
// NewSigTermHandler creates a channel to listen to SIGTERM and runs the handle function
func NewSigTermHandler(handle func()) {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGTERM)
signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)

go func() {
<-c
Expand Down
92 changes: 92 additions & 0 deletions test/e2e/allocator/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2023 Google LLC All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package allocator

import (
"context"
"os"
"strconv"
"testing"
"time"

helper "agones.dev/agones/test/e2e/allochelper"
e2eframework "agones.dev/agones/test/e2e/framework"
log "github.com/sirupsen/logrus"
)

var framework *e2eframework.Framework

func TestMain(m *testing.M) {
log.SetFormatter(&log.TextFormatter{
EnvironmentOverrideColors: true,
FullTimestamp: true,
TimestampFormat: "2006-01-02 15:04:05.000",
})

var (
err error
exitCode int
)

if err = e2eframework.ParseTestFlags(); err != nil {
log.WithError(err).Error("failed to parse go test flags")
os.Exit(1)
}

if framework, err = e2eframework.NewFromFlags(); err != nil {
log.WithError(err).Error("failed to setup framework")
os.Exit(1)
}

if err = helper.CleanupNamespaces(context.Background(), framework); err != nil {
log.WithError(err).Error("failed to cleanup e2e namespaces")
os.Exit(1)
}

if framework.Namespace == "" {
// use a custom namespace - Unix timestamp
framework.Namespace = strconv.Itoa(int(time.Now().Unix()))
log.Infof("Custom namespace is set: %s", framework.Namespace)

if err := framework.CreateNamespace(framework.Namespace); err != nil {
log.WithError(err).Error("failed to create a custom namespace")
os.Exit(1)
}

defer func() {
if derr := framework.DeleteNamespace(framework.Namespace); derr != nil {
log.Error(derr)
}
os.Exit(exitCode)
}()
} else {
// use an already existing namespace
// run cleanup before tests to ensure no resources from previous runs exist
err = framework.CleanUp(framework.Namespace)
if err != nil {
log.WithError(err).Error("failed to cleanup resources")
}

defer func() {
err = framework.CleanUp(framework.Namespace)
if err != nil {
log.WithError(err).Error("failed to cleanup resources")
}
os.Exit(exitCode)
}()
}

exitCode = m.Run()
}
82 changes: 82 additions & 0 deletions test/e2e/allocator/pod_termination_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2023 Google LLC All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package allocator

import (
"context"
"testing"
"time"

pb "agones.dev/agones/pkg/allocation/go"
agonesv1 "agones.dev/agones/pkg/apis/agones/v1"
helper "agones.dev/agones/test/e2e/allochelper"
e2e "agones.dev/agones/test/e2e/framework"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/wait"
)

const (
retryInterval = 1 * time.Second
retryTimeout = 30 * time.Second
)

func TestAllocatorAfterDeleteReplica(t *testing.T) {
ctx := context.Background()

// get allocator pods
list, err := helper.GetAgonesAllocatorPods(ctx, framework)
assert.NoError(t, err)

grpcClient, err := helper.GetAllocatorClient(ctx, t, framework)
require.NoError(t, err, "Could not initialize rpc client")

// create fleet
flt, err := helper.CreateFleet(ctx, framework.Namespace, framework)
if !assert.Nil(t, err) {
return
}
framework.AssertFleetCondition(t, flt, e2e.FleetReadyCount(flt.Spec.Replicas))

var response *pb.AllocationResponse
request := &pb.AllocationRequest{
Namespace: framework.Namespace,
PreferredGameServerSelectors: []*pb.GameServerSelector{{MatchLabels: map[string]string{agonesv1.FleetNameLabel: flt.ObjectMeta.Name}}},
Scheduling: pb.AllocationRequest_Packed,
Metadata: &pb.MetaPatch{Labels: map[string]string{"gslabel": "allocatedbytest"}},
}

// delete 2 of the allocators
for _, pod := range list.Items[1:] {
err = helper.DeleteAgonesAllocatorPod(ctx, pod.ObjectMeta.Name, framework)
require.NoError(t, err, "Could not delete allocator pod")
}

response, err = grpcClient.Allocate(context.Background(), request)
logrus.Info(response)
helper.ValidateAllocatorResponse(t, response)
require.NoError(t, err, "Failed grpc allocation request")

// Wait and make another allocation call once we know the draining time has passed
_ = wait.PollImmediate(retryInterval, retryTimeout, func() (bool, error) {
return false, nil
})

response, err = grpcClient.Allocate(context.Background(), request)
logrus.Info(response)
helper.ValidateAllocatorResponse(t, response)
require.NoError(t, err, "Failed second grpc allocation request")
}
Loading

0 comments on commit d550adb

Please sign in to comment.