Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agones-{extensions,allocator}: Make servers context aware #3845

Merged
merged 6 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 56 additions & 25 deletions cmd/allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
grpchealth "google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -52,6 +54,7 @@ import (
"agones.dev/agones/pkg/gameserverallocations"
"agones.dev/agones/pkg/gameservers"
"agones.dev/agones/pkg/util/fswatch"
"agones.dev/agones/pkg/util/httpserver"
"agones.dev/agones/pkg/util/runtime"
"agones.dev/agones/pkg/util/signals"
)
Expand Down Expand Up @@ -218,19 +221,18 @@ func main() {
health, closer := setupMetricsRecorder(conf)
defer closer()

// http.DefaultServerMux is used for http connection, not for https
http.Handle("/", health)

kubeClient, agonesClient, err := getClients(conf)
if err != nil {
logger.WithError(err).Fatal("could not create clients")
}

listenCtx, cancelListenCtx := context.WithCancel(context.Background())

// This will test the connection to agones on each readiness probe
// so if one of the allocator pod can't reach Kubernetes it will be removed
// from the Kubernetes service.
ctx, cancelCtx := context.WithCancel(context.Background())
podReady = true
grpcHealth := grpchealth.NewServer() // only used for gRPC, ignored o/w
health.AddReadinessCheck("allocator-agones-client", func() error {
if !podReady {
return errors.New("asked to shut down, failed readiness check")
Expand All @@ -245,16 +247,15 @@ func main() {
signals.NewSigTermHandler(func() {
logger.Info("Pod shutdown has been requested, failing readiness check")
podReady = false
grpcHealth.Shutdown()
time.Sleep(conf.ReadinessShutdownDuration)
cancelCtx()
logger.Infof("Readiness shutdown duration has passed, context cancelled")
time.Sleep(1 * time.Second) // allow a brief time for cleanup, but force exit if main doesn't
os.Exit(0)
cancelListenCtx()
})

grpcUnallocatedStatusCode := grpcCodeFromHTTPStatus(conf.httpUnallocatedStatusCode)

h := newServiceHandler(ctx, kubeClient, agonesClient, health, conf.MTLSDisabled, conf.TLSDisabled, conf.remoteAllocationTimeout, conf.totalRemoteAllocationTimeout, conf.allocationBatchWaitTime, grpcUnallocatedStatusCode)
workerCtx, cancelWorkerCtx := context.WithCancel(context.Background())
h := newServiceHandler(workerCtx, kubeClient, agonesClient, health, conf.MTLSDisabled, conf.TLSDisabled, conf.remoteAllocationTimeout, conf.totalRemoteAllocationTimeout, conf.allocationBatchWaitTime, grpcUnallocatedStatusCode)

if !h.tlsDisabled {
cancelTLS, err := fswatch.Watch(logger, tlsDir, time.Second, func() {
Expand Down Expand Up @@ -294,51 +295,62 @@ func main() {

// If grpc and http use the same port then use a mux.
if conf.GRPCPort == conf.HTTPPort {
runMux(h, conf.HTTPPort)
runMux(listenCtx, workerCtx, h, grpcHealth, conf.HTTPPort)
} else {
// Otherwise, run each on a dedicated port.
if validPort(conf.HTTPPort) {
runREST(h, conf.HTTPPort)
runREST(listenCtx, workerCtx, h, conf.HTTPPort)
}
if validPort(conf.GRPCPort) {
runGRPC(h, conf.GRPCPort)
runGRPC(listenCtx, h, grpcHealth, conf.GRPCPort)
}
}

// Finally listen on 8080 (http) and block the main goroutine
// this is used to serve /live and /ready handlers for Kubernetes probes.
err = http.ListenAndServe(":8080", http.DefaultServeMux)
logger.WithError(err).Fatal("allocation service crashed")
// Finally listen on 8080 (http), used to serve /live and /ready handlers for Kubernetes probes.
healthserver := httpserver.Server{Logger: logger}
healthserver.Handle("/", health)
go func() { _ = healthserver.Run(listenCtx, 0) }()

// TODO: This is messy. Contexts are the wrong way to handle this - we should be using shutdown,
// and a cascading graceful shutdown instead of multiple contexts and sleeps.
<-listenCtx.Done()
logger.Infof("Listen context cancelled")
time.Sleep(5 * time.Second)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming this is because we don't actually know that the gRPC/HTTP servers have gracefully shut down yet, so we just wait for a bit?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah .. fully knowing they're done takes a bit more plumbing.

cancelWorkerCtx()
logger.Infof("Worker context cancelled")
time.Sleep(1 * time.Second)
logger.Info("Shut down allocator")
}

func validPort(port int) bool {
const maxPort = 65535
return port >= 0 && port < maxPort
}

func runMux(h *serviceHandler, httpPort int) {
func runMux(listenCtx context.Context, workerCtx context.Context, h *serviceHandler, grpcHealth *grpchealth.Server, httpPort int) {
logger.Infof("Running the mux handler on port %d", httpPort)
grpcServer := grpc.NewServer(h.getMuxServerOptions()...)
pb.RegisterAllocationServiceServer(grpcServer, h)
grpc_health_v1.RegisterHealthServer(grpcServer, grpcHealth)

mux := runtime.NewServerMux()
if err := pb.RegisterAllocationServiceHandlerServer(context.Background(), mux, h); err != nil {
panic(err)
}

runHTTP(h, httpPort, grpcHandlerFunc(grpcServer, mux))
runHTTP(listenCtx, workerCtx, h, httpPort, grpcHandlerFunc(grpcServer, mux))
}

func runREST(h *serviceHandler, httpPort int) {
func runREST(listenCtx context.Context, workerCtx context.Context, h *serviceHandler, httpPort int) {
logger.WithField("port", httpPort).Info("Running the rest handler")
mux := runtime.NewServerMux()
if err := pb.RegisterAllocationServiceHandlerServer(context.Background(), mux, h); err != nil {
panic(err)
}
runHTTP(h, httpPort, mux)
runHTTP(listenCtx, workerCtx, h, httpPort, mux)
}

func runHTTP(h *serviceHandler, httpPort int, handler http.Handler) {
func runHTTP(listenCtx context.Context, workerCtx context.Context, h *serviceHandler, httpPort int, handler http.Handler) {
cfg := &tls.Config{}
if !h.tlsDisabled {
cfg.GetCertificate = h.getTLSCert
Expand All @@ -356,21 +368,29 @@ func runHTTP(h *serviceHandler, httpPort int, handler http.Handler) {
}

go func() {
go func() {
<-listenCtx.Done()
_ = server.Shutdown(workerCtx)
}()

var err error
if !h.tlsDisabled {
err = server.ListenAndServeTLS("", "")
} else {
err = server.ListenAndServe()
}

if err != nil {
if err == http.ErrServerClosed {
logger.WithError(err).Info("HTTP/HTTPS server closed")
os.Exit(0)
} else {
logger.WithError(err).Fatal("Unable to start HTTP/HTTPS listener")
os.Exit(1)
}
}()
}

func runGRPC(h *serviceHandler, grpcPort int) {
func runGRPC(ctx context.Context, h *serviceHandler, grpcHealth *grpchealth.Server, grpcPort int) {
logger.WithField("port", grpcPort).Info("Running the grpc handler on port")
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", grpcPort))
if err != nil {
Expand All @@ -380,11 +400,22 @@ func runGRPC(h *serviceHandler, grpcPort int) {

grpcServer := grpc.NewServer(h.getGRPCServerOptions()...)
pb.RegisterAllocationServiceServer(grpcServer, h)
grpc_health_v1.RegisterHealthServer(grpcServer, grpcHealth)

go func() {
go func() {
<-ctx.Done()
grpcServer.GracefulStop()
}()

err := grpcServer.Serve(listener)
logger.WithError(err).Fatal("allocation service crashed")
os.Exit(1)
if err != nil {
logger.WithError(err).Fatal("allocation service crashed")
os.Exit(1)
} else {
logger.Info("allocation server closed")
os.Exit(0)
}
}()
}

Expand Down
27 changes: 2 additions & 25 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -54,6 +53,7 @@ import (
"agones.dev/agones/pkg/gameservers"
"agones.dev/agones/pkg/gameserversets"
"agones.dev/agones/pkg/metrics"
"agones.dev/agones/pkg/util/httpserver"
"agones.dev/agones/pkg/util/runtime"
"agones.dev/agones/pkg/util/signals"
)
Expand Down Expand Up @@ -171,7 +171,7 @@ func main() {
agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, defaultResync)
kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync)

server := &httpServer{}
server := &httpserver.Server{Logger: logger}
var rs []runner
var health healthcheck.Handler

Expand Down Expand Up @@ -547,10 +547,6 @@ type runner interface {
Run(ctx context.Context, workers int) error
}

type httpServer struct {
http.ServeMux
}

func whenLeader(ctx context.Context, cancel context.CancelFunc, logger *logrus.Entry, doLeaderElection bool, kubeClient *kubernetes.Clientset, namespace string, start func(_ context.Context)) {
if !doLeaderElection {
start(ctx)
Expand Down Expand Up @@ -600,22 +596,3 @@ func whenLeader(ctx context.Context, cancel context.CancelFunc, logger *logrus.E
},
})
}

func (h *httpServer) Run(_ context.Context, _ int) error {
logger.Info("Starting http server...")
srv := &http.Server{
Addr: ":8080",
Handler: h,
}
defer srv.Close() // nolint: errcheck

if err := srv.ListenAndServe(); err != nil {
if err == http.ErrServerClosed {
logger.WithError(err).Info("http server closed")
} else {
wrappedErr := errors.Wrap(err, "Could not listen on :8080")
runtime.HandleError(logger.WithError(wrappedErr), wrappedErr)
}
}
return nil
}
27 changes: 2 additions & 25 deletions cmd/extensions/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package main
import (
"context"
"io"
"net/http"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -46,6 +45,7 @@ import (
"agones.dev/agones/pkg/metrics"
"agones.dev/agones/pkg/util/apiserver"
"agones.dev/agones/pkg/util/https"
"agones.dev/agones/pkg/util/httpserver"
"agones.dev/agones/pkg/util/runtime"
"agones.dev/agones/pkg/util/signals"
"agones.dev/agones/pkg/util/webhooks"
Expand Down Expand Up @@ -150,7 +150,7 @@ func main() {
agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, defaultResync)
kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync)

server := &httpServer{}
server := &httpserver.Server{Logger: logger}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
server := &httpserver.Server{Logger: logger}
server := &http.Server{Logger: logger}

That wouldn't be redundant 😃

var health healthcheck.Handler

// Stackdriver metrics
Expand Down Expand Up @@ -340,26 +340,3 @@ type config struct {
type runner interface {
Run(ctx context.Context, workers int) error
}

type httpServer struct {
http.ServeMux
}

func (h *httpServer) Run(_ context.Context, _ int) error {
logger.Info("Starting http server...")
srv := &http.Server{
Addr: ":8080",
Handler: h,
}
defer srv.Close() // nolint: errcheck

if err := srv.ListenAndServe(); err != nil {
if err == http.ErrServerClosed {
logger.WithError(err).Info("http server closed")
} else {
wrappedErr := errors.Wrap(err, "Could not listen on :8080")
runtime.HandleError(logger.WithError(wrappedErr), wrappedErr)
}
}
return nil
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
contrib.go.opencensus.io/exporter/stackdriver v0.8.0
fortio.org/fortio v1.3.1
github.com/ahmetb/gen-crd-api-reference-docs v0.3.0
github.com/evanphx/json-patch v4.12.0+incompatible
github.com/fsnotify/fsnotify v1.6.0
github.com/go-openapi/spec v0.19.5
github.com/google/go-cmp v0.5.9
Expand All @@ -23,7 +24,6 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.16.0
github.com/sirupsen/logrus v1.9.0
github.com/spf13/cast v1.3.0
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.7.0
github.com/stretchr/testify v1.8.2
Expand Down Expand Up @@ -60,7 +60,6 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
Expand Down Expand Up @@ -92,6 +91,7 @@ require (
github.com/prometheus/procfs v0.10.1 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/spf13/afero v1.9.2 // indirect
github.com/spf13/cast v1.3.0 // indirect
github.com/spf13/jwalterweatherman v1.0.0 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/https/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const (

// tls is a http server interface to enable easier testing
type tls interface {
Close() error
Shutdown(context.Context) error
ListenAndServeTLS(certFile, keyFile string) error
}

Expand Down Expand Up @@ -126,7 +126,7 @@ func (s *Server) WatchForCertificateChanges() (func(), error) {
func (s *Server) Run(ctx context.Context, _ int) error {
go func() {
<-ctx.Done()
s.tls.Close() // nolint: errcheck,gosec
_ = s.tls.Shutdown(context.Background())
}()

s.logger.WithField("server", s).Infof("https server started")
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/https/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type testServer struct {
server *httptest.Server
}

func (ts *testServer) Close() error {
func (ts *testServer) Shutdown(_ context.Context) error {
ts.server.Close()
return nil
}
Expand Down
Loading
Loading