Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a more graceful termination to Allocator #3105

Merged
merged 1 commit into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions build/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ test-e2e:
$(MAKE) DOCKER_RUN_ARGS="$(DOCKER_RUN_ARGS)" test-e2e-integration
$(MAKE) DOCKER_RUN_ARGS="$(DOCKER_RUN_ARGS)" test-e2e-failure
$(MAKE) DOCKER_RUN_ARGS="$(DOCKER_RUN_ARGS)" test-e2e-ha-extensions
$(MAKE) DOCKER_RUN_ARGS="$(DOCKER_RUN_ARGS)" test-e2e-allocator-crash


# e2e test args:
Expand Down Expand Up @@ -364,6 +365,24 @@ else
endif
echo "Finishing e2e extensions high availability test!"

test-e2e-allocator-crash: FEATURE_GATES ?= $(ALPHA_FEATURE_GATES)
test-e2e-allocator-crash: CLOUD_PRODUCT ?= generic
test-e2e-allocator-crash: GO_E2E_TEST_INTEGRATION_ARGS ?=\
--cloud-product=$(CLOUD_PRODUCT) \
--gameserver-image=$(GS_TEST_IMAGE) \
--feature-gates=$(FEATURE_GATES) \
--pullsecret=$(IMAGE_PULL_SECRET)
test-e2e-allocator-crash: $(ensure-build-image)
echo "Starting e2e allocation pod deletion test!"
ifdef E2E_USE_GOTESTSUM
$(GOTESTSUM) --packages=$(agones_package)/test/e2e/allocator -- $(go_test_args) $(ARGS) -parallel=1 -args \
$(GO_E2E_TEST_ARGS) $(GO_E2E_TEST_INTEGRATION_ARGS)
else
$(GO_TEST) $(ARGS) -parallel=1 $(agones_package)/test/e2e/allocator -args \
$(GO_E2E_TEST_ARGS) $(GO_E2E_TEST_INTEGRATION_ARGS)
endif
echo "Finishing e2e allocation pod deletion test!"

# Runs end-to-end stress tests on the current configured cluster
# For minikube user the minikube-stress-test-e2e targets
stress-test-e2e: $(ensure-build-image)
Expand Down
34 changes: 29 additions & 5 deletions cmd/allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ import (
"k8s.io/client-go/rest"
)

var logger = runtime.NewLoggerWithSource("main")
var (
podReady bool
logger = runtime.NewLoggerWithSource("main")
)

const (
certDir = "/home/allocator/client-ca/"
Expand All @@ -77,6 +80,7 @@ const (
apiServerBurstQPSFlag = "api-server-qps-burst"
logLevelFlag = "log-level"
allocationBatchWaitTime = "allocation-batch-wait-time"
readinessShutdownDuration = "readiness-shutdown-duration"
)

func parseEnvFlags() config {
Expand Down Expand Up @@ -109,6 +113,7 @@ func parseEnvFlags() config {
pflag.Duration(totalRemoteAllocationTimeoutFlag, viper.GetDuration(totalRemoteAllocationTimeoutFlag), "Flag to set total remote allocation timeout including retries.")
pflag.String(logLevelFlag, viper.GetString(logLevelFlag), "Agones Log level")
pflag.Duration(allocationBatchWaitTime, viper.GetDuration(allocationBatchWaitTime), "Flag to configure the waiting period between allocations batches")
pflag.Duration(readinessShutdownDuration, viper.GetDuration(readinessShutdownDuration), "Time in seconds for SIGTERM/SIGINT handler to sleep for.")
runtime.FeaturesBindFlags()
pflag.Parse()

Expand All @@ -127,6 +132,7 @@ func parseEnvFlags() config {
runtime.Must(viper.BindEnv(totalRemoteAllocationTimeoutFlag))
runtime.Must(viper.BindEnv(logLevelFlag))
runtime.Must(viper.BindEnv(allocationBatchWaitTime))
runtime.Must(viper.BindEnv(readinessShutdownDuration))
runtime.Must(viper.BindPFlags(pflag.CommandLine))
runtime.Must(runtime.FeaturesBindEnv())

Expand All @@ -147,6 +153,7 @@ func parseEnvFlags() config {
remoteAllocationTimeout: viper.GetDuration(remoteAllocationTimeoutFlag),
totalRemoteAllocationTimeout: viper.GetDuration(totalRemoteAllocationTimeoutFlag),
allocationBatchWaitTime: viper.GetDuration(allocationBatchWaitTime),
ReadinessShutdownDuration: viper.GetDuration(readinessShutdownDuration),
}
}

Expand All @@ -165,6 +172,7 @@ type config struct {
totalRemoteAllocationTimeout time.Duration
remoteAllocationTimeout time.Duration
allocationBatchWaitTime time.Duration
ReadinessShutdownDuration time.Duration
}

// grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC
Expand Down Expand Up @@ -214,12 +222,29 @@ func main() {
// This will test the connection to agones on each readiness probe
// so if one of the allocator pod can't reach Kubernetes it will be removed
// from the Kubernetes service.
ctx, cancelCtx := context.WithCancel(context.Background())
podReady = true
health.AddReadinessCheck("allocator-agones-client", func() error {
if !podReady {
return errors.New("asked to shut down, failed readiness check")
}
_, err := agonesClient.ServerVersion()
return err
if err != nil {
return fmt.Errorf("failed to reach Kubernetes: %w", err)
}
return nil
})

signals.NewSigTermHandler(func() {
logger.Info("Pod shutdown has been requested, failing readiness check")
podReady = false
time.Sleep(conf.ReadinessShutdownDuration)
cancelCtx()
logger.Infof("Readiness shutdown duration has passed, exiting pod")
os.Exit(0)
})

h := newServiceHandler(kubeClient, agonesClient, health, conf.MTLSDisabled, conf.TLSDisabled, conf.remoteAllocationTimeout, conf.totalRemoteAllocationTimeout, conf.allocationBatchWaitTime)
h := newServiceHandler(ctx, kubeClient, agonesClient, health, conf.MTLSDisabled, conf.TLSDisabled, conf.remoteAllocationTimeout, conf.totalRemoteAllocationTimeout, conf.allocationBatchWaitTime)

if !h.tlsDisabled {
cancelTLS, err := fswatch.Watch(logger, tlsDir, time.Second, func() {
Expand Down Expand Up @@ -353,7 +378,7 @@ func runGRPC(h *serviceHandler, grpcPort int) {
}()
}

func newServiceHandler(kubeClient kubernetes.Interface, agonesClient versioned.Interface, health healthcheck.Handler, mTLSDisabled bool, tlsDisabled bool, remoteAllocationTimeout time.Duration, totalRemoteAllocationTimeout time.Duration, allocationBatchWaitTime time.Duration) *serviceHandler {
func newServiceHandler(ctx context.Context, kubeClient kubernetes.Interface, agonesClient versioned.Interface, health healthcheck.Handler, mTLSDisabled bool, tlsDisabled bool, remoteAllocationTimeout time.Duration, totalRemoteAllocationTimeout time.Duration, allocationBatchWaitTime time.Duration) *serviceHandler {
defaultResync := 30 * time.Second
agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, defaultResync)
kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync)
Expand All @@ -369,7 +394,6 @@ func newServiceHandler(kubeClient kubernetes.Interface, agonesClient versioned.I
totalRemoteAllocationTimeout,
allocationBatchWaitTime)

ctx, _ := signals.NewSigKillContext()
h := serviceHandler{
allocationCallback: func(gsa *allocationv1.GameServerAllocation) (k8sruntime.Object, error) {
return allocator.Allocate(ctx, gsa)
Expand Down
6 changes: 6 additions & 0 deletions install/helm/agones/templates/service/allocation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ spec:
{{ toYaml .Values.agones.allocator.tolerations | indent 8 }}
{{- end }}
serviceAccountName: {{ $.Values.agones.serviceaccount.allocator.name }}
terminationGracePeriodSeconds: {{ mul .Values.agones.allocator.readiness.periodSeconds .Values.agones.allocator.readiness.failureThreshold 3 }}
{{- if eq .Values.agones.allocator.disableTLS false }}
volumes:
- name: tls
Expand Down Expand Up @@ -205,6 +206,9 @@ spec:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: {{ .Values.agones.allocator.readiness.initialDelaySeconds }}
periodSeconds: {{ .Values.agones.allocator.readiness.periodSeconds }}
failureThreshold: {{ .Values.agones.allocator.readiness.failureThreshold }}
env:
{{- if .Values.agones.allocator.service.http.enabled }}
- name: HTTP_PORT
Expand Down Expand Up @@ -250,6 +254,8 @@ spec:
value: {{ .Values.agones.featureGates | quote }}
- name: ALLOCATION_BATCH_WAIT_TIME
value: {{ .Values.agones.allocator.allocationBatchWaitTime | quote }}
- name: READINESS_SHUTDOWN_DURATION
value: {{ mul .Values.agones.allocator.readiness.periodSeconds .Values.agones.extensions.readiness.failureThreshold 2 }}s
ports:
{{- if .Values.agones.allocator.service.http.enabled }}
- name: {{ .Values.agones.allocator.service.http.portName }}
Expand Down
4 changes: 4 additions & 0 deletions install/helm/agones/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ agones:
periodSeconds: 3
failureThreshold: 3
timeoutSeconds: 1
readiness:
initialDelaySeconds: 3
periodSeconds: 3
failureThreshold: 3
tolerations:
- key: "agones.dev/agones-system"
operator: "Equal"
Expand Down
6 changes: 6 additions & 0 deletions install/yaml/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16136,6 +16136,7 @@ spec:
operator: Equal
value: "true"
serviceAccountName: agones-allocator
terminationGracePeriodSeconds: 27
volumes:
- name: tls
secret:
Expand All @@ -16159,6 +16160,9 @@ spec:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 3
periodSeconds: 3
failureThreshold: 3
env:
- name: HTTP_PORT
value: "8443"
Expand Down Expand Up @@ -16200,6 +16204,8 @@ spec:
value: ""
- name: ALLOCATION_BATCH_WAIT_TIME
value: "500ms"
- name: READINESS_SHUTDOWN_DURATION
value: 18s
ports:
- name: https
containerPort: 8443
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/signals/signals.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func NewSigKillContext() (context.Context, context.CancelFunc) {
// NewSigTermHandler creates a channel to listen to SIGTERM and runs the handle function
func NewSigTermHandler(handle func()) {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGTERM)
signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)

go func() {
<-c
Expand Down
92 changes: 92 additions & 0 deletions test/e2e/allocator/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2023 Google LLC All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package allocator

import (
"context"
"os"
"strconv"
"testing"
"time"

helper "agones.dev/agones/test/e2e/allochelper"
e2eframework "agones.dev/agones/test/e2e/framework"
log "github.com/sirupsen/logrus"
)

var framework *e2eframework.Framework

func TestMain(m *testing.M) {
log.SetFormatter(&log.TextFormatter{
EnvironmentOverrideColors: true,
FullTimestamp: true,
TimestampFormat: "2006-01-02 15:04:05.000",
})

var (
err error
exitCode int
)

if err = e2eframework.ParseTestFlags(); err != nil {
log.WithError(err).Error("failed to parse go test flags")
os.Exit(1)
}

if framework, err = e2eframework.NewFromFlags(); err != nil {
log.WithError(err).Error("failed to setup framework")
os.Exit(1)
}

if err = helper.CleanupNamespaces(context.Background(), framework); err != nil {
log.WithError(err).Error("failed to cleanup e2e namespaces")
os.Exit(1)
}

if framework.Namespace == "" {
// use a custom namespace - Unix timestamp
framework.Namespace = strconv.Itoa(int(time.Now().Unix()))
log.Infof("Custom namespace is set: %s", framework.Namespace)

if err := framework.CreateNamespace(framework.Namespace); err != nil {
log.WithError(err).Error("failed to create a custom namespace")
os.Exit(1)
}

defer func() {
if derr := framework.DeleteNamespace(framework.Namespace); derr != nil {
log.Error(derr)
}
os.Exit(exitCode)
}()
} else {
// use an already existing namespace
// run cleanup before tests to ensure no resources from previous runs exist
err = framework.CleanUp(framework.Namespace)
if err != nil {
log.WithError(err).Error("failed to cleanup resources")
}

defer func() {
err = framework.CleanUp(framework.Namespace)
if err != nil {
log.WithError(err).Error("failed to cleanup resources")
}
os.Exit(exitCode)
}()
}

exitCode = m.Run()
}
95 changes: 95 additions & 0 deletions test/e2e/allocator/pod_termination_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2023 Google LLC All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package allocator

import (
"context"
"testing"
"time"

pb "agones.dev/agones/pkg/allocation/go"
agonesv1 "agones.dev/agones/pkg/apis/agones/v1"
helper "agones.dev/agones/test/e2e/allochelper"
e2e "agones.dev/agones/test/e2e/framework"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
)

const (
retryInterval = 5 * time.Second
retryTimeout = 45 * time.Second
)

func TestAllocatorAfterDeleteReplica(t *testing.T) {
ctx := context.Background()

var list *v1.PodList

// poll and wait until all allocator pods are running
_ = wait.PollImmediate(retryInterval, retryTimeout, func() (done bool, err error) {
list, err = helper.GetAgonesAllocatorPods(ctx, framework)
if err != nil {
return true, err
}

for _, allocpod := range list.Items {
podstatus := string(allocpod.Status.Phase)
logrus.Infof("Allocator Pod %s, has status of %s", allocpod.ObjectMeta.Name, podstatus)
if podstatus != "Running" {
return false, nil
}
}

return true, nil
})

grpcClient, err := helper.GetAllocatorClient(ctx, t, framework)
require.NoError(t, err, "Could not initialize rpc client")

// create fleet
flt, err := helper.CreateFleet(ctx, framework.Namespace, framework)
if !assert.Nil(t, err) {
return
}
framework.AssertFleetCondition(t, flt, e2e.FleetReadyCount(flt.Spec.Replicas))

var response *pb.AllocationResponse
request := &pb.AllocationRequest{
Namespace: framework.Namespace,
PreferredGameServerSelectors: []*pb.GameServerSelector{{MatchLabels: map[string]string{agonesv1.FleetNameLabel: flt.ObjectMeta.Name}}},
Scheduling: pb.AllocationRequest_Packed,
Metadata: &pb.MetaPatch{Labels: map[string]string{"gslabel": "allocatedbytest"}},
}

// delete all of the allocators except 1
for _, pod := range list.Items[1:] {
err = helper.DeleteAgonesAllocatorPod(ctx, pod.ObjectMeta.Name, framework)
require.NoError(t, err, "Could not delete allocator pod")
}

// Wait and keep making calls till we know the draining time has passed
_ = wait.PollImmediate(retryInterval, retryTimeout, func() (bool, error) {
zmerlynn marked this conversation as resolved.
Show resolved Hide resolved
response, err = grpcClient.Allocate(context.Background(), request)
logrus.Info(response)
helper.ValidateAllocatorResponse(t, response)
require.NoError(t, err, "Failed grpc allocation request")
err = helper.DeleteAgonesPod(ctx, response.GameServerName, framework.Namespace, framework)
require.NoError(t, err, "Failed to delete game server pod %s", response.GameServerName)
return false, nil
})
}
Loading