diff --git a/cmd/allocator/main.go b/cmd/allocator/main.go index 1cb2ece548..45a11c0dc9 100644 --- a/cmd/allocator/main.go +++ b/cmd/allocator/main.go @@ -24,13 +24,19 @@ import ( "os" "path/filepath" "strings" + "time" "agones.dev/agones/pkg" "agones.dev/agones/pkg/allocation/converters" pb "agones.dev/agones/pkg/allocation/go/v1alpha1" + allocationv1 "agones.dev/agones/pkg/apis/allocation/v1" "agones.dev/agones/pkg/client/clientset/versioned" + "agones.dev/agones/pkg/client/informers/externalversions" + "agones.dev/agones/pkg/gameserverallocations" + "agones.dev/agones/pkg/gameservers" "agones.dev/agones/pkg/metrics" "agones.dev/agones/pkg/util/runtime" + "agones.dev/agones/pkg/util/signals" "github.com/heptiolabs/healthcheck" prom "github.com/prometheus/client_golang/prometheus" "github.com/spf13/pflag" @@ -38,6 +44,10 @@ import ( "go.opencensus.io/plugin/ochttp" "go.opencensus.io/stats/view" k8serror "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) @@ -75,10 +85,11 @@ func main() { // http.DefaultServerMux is used for http connection, not for https http.Handle("/", health) - agonesClient, err := getAgonesClient() + kubeClient, agonesClient, err := getClients() if err != nil { - logger.WithError(err).Fatal("could not create agones client") + logger.WithError(err).Fatal("could not create clients") } + // This will test the connection to agones on each readiness probe // so if one of the allocator pod can't reach Kubernetes it will be removed // from the Kubernetes service. @@ -87,9 +98,7 @@ func main() { return err }) - h := httpHandler{ - agonesClient: agonesClient, - } + h := newServiceHandler(kubeClient, agonesClient, health) // mux for https server to serve gameserver allocations httpsMux := http.NewServeMux() @@ -126,20 +135,53 @@ func main() { logger.WithError(err).Fatal("allocation service crashed") } +func newServiceHandler(kubeClient kubernetes.Interface, agonesClient versioned.Interface, health healthcheck.Handler) *httpHandler { + defaultResync := 30 * time.Second + agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, defaultResync) + kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync) + gsCounter := gameservers.NewPerNodeCounter(kubeInformerFactory, agonesInformerFactory) + h := httpHandler{ + allocator: gameserverallocations.NewAllocator( + agonesInformerFactory.Multicluster().V1alpha1().GameServerAllocationPolicies(), + kubeInformerFactory.Core().V1().Secrets(), + kubeClient, + gameserverallocations.NewReadyGameServerCache(agonesInformerFactory.Agones().V1().GameServers(), agonesClient.AgonesV1(), gsCounter, health)), + stop: signals.NewStopChannel(), + } + + h.allocationCallback = func(gsa *allocationv1.GameServerAllocation, stop <-chan struct{}) (k8sruntime.Object, error) { + return h.allocator.Allocate(gsa, stop) + } + + kubeInformerFactory.Start(h.stop) + agonesInformerFactory.Start(h.stop) + if err := h.allocator.Start(h.stop); err != nil { + logger.WithError(err).Fatal("starting allocator failed.") + } + + return &h +} + // Set up our client which we will use to call the API -func getAgonesClient() (*versioned.Clientset, error) { +func getClients() (*kubernetes.Clientset, *versioned.Clientset, error) { // Create the in-cluster config config, err := rest.InClusterConfig() if err != nil { - return nil, errors.New("Could not create in cluster config") + return nil, nil, errors.New("Could not create in cluster config") + } + + // Access to the Agones resources through the Agones Clientset + kubeClient, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, nil, errors.New("Could not create the kubernetes api clientset") } // Access to the Agones resources through the Agones Clientset agonesClient, err := versioned.NewForConfig(config) if err != nil { - return nil, errors.New("Could not create the agones api clientset") + return nil, nil, errors.New("Could not create the agones api clientset") } - return agonesClient, nil + return kubeClient, agonesClient, nil } func getCACertPool(path string) (*x509.CertPool, error) { @@ -180,7 +222,9 @@ func (h *httpHandler) postOnly(in handler) handler { } type httpHandler struct { - agonesClient versioned.Interface + allocator *gameserverallocations.Allocator + stop <-chan struct{} + allocationCallback func(*allocationv1.GameServerAllocation, <-chan struct{}) (k8sruntime.Object, error) } func (h *httpHandler) allocateHandler(w http.ResponseWriter, r *http.Request) { @@ -193,18 +237,32 @@ func (h *httpHandler) allocateHandler(w http.ResponseWriter, r *http.Request) { logger.WithField("request", request).Infof("allocation request received") gsa := converters.ConvertAllocationRequestV1Alpha1ToGSAV1(&request) - allocation := h.agonesClient.AllocationV1().GameServerAllocations(gsa.ObjectMeta.Namespace) - allocatedGsa, err := allocation.Create(gsa) + resultObj, err := h.allocationCallback(gsa, h.stop) if err != nil { http.Error(w, err.Error(), httpCode(err)) logger.WithField("gsa", gsa).WithError(err).Info("calling allocation extension API failed") return } + w.Header().Set("Content-Type", "application/json") + if status, ok := resultObj.(*metav1.Status); ok { + w.WriteHeader(int(status.Code)) + err = json.NewEncoder(w).Encode(status) + if err != nil { + http.Error(w, "internal server error", http.StatusInternalServerError) + logger.Error(err) + return + } + } + allocatedGsa, ok := resultObj.(*allocationv1.GameServerAllocation) + if !ok { + http.Error(w, "internal server error", http.StatusInternalServerError) + logger.Errorf("internal server error - Bad GSA format %v", resultObj) + return + } response := converters.ConvertGSAV1ToAllocationResponseV1Alpha1(allocatedGsa) logger.WithField("response", response).Infof("allocation response is being sent") - w.Header().Set("Content-Type", "application/json") err = json.NewEncoder(w).Encode(response) if err != nil { http.Error(w, "internal server error", http.StatusInternalServerError) diff --git a/cmd/allocator/main_test.go b/cmd/allocator/main_test.go index ce33a23232..a1cabbafa2 100644 --- a/cmd/allocator/main_test.go +++ b/cmd/allocator/main_test.go @@ -24,33 +24,29 @@ import ( pb "agones.dev/agones/pkg/allocation/go/v1alpha1" allocationv1 "agones.dev/agones/pkg/apis/allocation/v1" - agonesfake "agones.dev/agones/pkg/client/clientset/versioned/fake" "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" k8serror "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8sruntime "k8s.io/apimachinery/pkg/runtime" - k8stesting "k8s.io/client-go/testing" ) func TestAllocateHandler(t *testing.T) { t.Parallel() - fakeAgones := &agonesfake.Clientset{} h := httpHandler{ - agonesClient: fakeAgones, + allocationCallback: func(gsa *allocationv1.GameServerAllocation, stop <-chan struct{}) (k8sruntime.Object, error) { + return &allocationv1.GameServerAllocation{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + }, + Status: allocationv1.GameServerAllocationStatus{ + State: allocationv1.GameServerAllocationContention, + }, + }, nil + }, } - fakeAgones.AddReactor("create", "gameserverallocations", func(action k8stesting.Action) (bool, k8sruntime.Object, error) { - return true, &allocationv1.GameServerAllocation{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - }, - Status: allocationv1.GameServerAllocationStatus{ - State: allocationv1.GameServerAllocationContention, - }, - }, nil - }) - request := &pb.AllocationRequest{ Namespace: "ns", MultiClusterSetting: &pb.MultiClusterSetting{ @@ -78,15 +74,12 @@ func TestAllocateHandler(t *testing.T) { func TestAllocateHandlerReturnsError(t *testing.T) { t.Parallel() - fakeAgones := &agonesfake.Clientset{} h := httpHandler{ - agonesClient: fakeAgones, + allocationCallback: func(gsa *allocationv1.GameServerAllocation, stop <-chan struct{}) (k8sruntime.Object, error) { + return nil, k8serror.NewBadRequest("error") + }, } - fakeAgones.AddReactor("create", "gameserverallocations", func(action k8stesting.Action) (bool, k8sruntime.Object, error) { - return true, nil, k8serror.NewBadRequest("error") - }) - request := &pb.AllocationRequest{} body, _ := json.Marshal(request) buf := bytes.NewBuffer(body) @@ -101,6 +94,62 @@ func TestAllocateHandlerReturnsError(t *testing.T) { assert.Contains(t, rec.Body.String(), "error") } +func TestHandlingStatus(t *testing.T) { + t.Parallel() + + errorMessage := "GameServerAllocation is invalid" + h := httpHandler{ + allocationCallback: func(gsa *allocationv1.GameServerAllocation, stop <-chan struct{}) (k8sruntime.Object, error) { + return &metav1.Status{ + Status: metav1.StatusFailure, + Message: errorMessage, + Reason: metav1.StatusReasonInvalid, + Details: &metav1.StatusDetails{ + Kind: "GameServerAllocation", + Group: allocationv1.SchemeGroupVersion.Group, + }, + Code: http.StatusUnprocessableEntity, + }, nil + }, + } + + request := &pb.AllocationRequest{} + body, _ := json.Marshal(request) + buf := bytes.NewBuffer(body) + req, err := http.NewRequest(http.MethodPost, "/", buf) + if !assert.Nil(t, err) { + return + } + + rec := httptest.NewRecorder() + h.allocateHandler(rec, req) + assert.Equal(t, rec.Code, 422) + assert.Contains(t, rec.Body.String(), errorMessage) +} + +func TestBadReturnType(t *testing.T) { + t.Parallel() + + h := httpHandler{ + allocationCallback: func(gsa *allocationv1.GameServerAllocation, stop <-chan struct{}) (k8sruntime.Object, error) { + return &corev1.Secret{}, nil + }, + } + + request := &pb.AllocationRequest{} + body, _ := json.Marshal(request) + buf := bytes.NewBuffer(body) + req, err := http.NewRequest(http.MethodPost, "/", buf) + if !assert.Nil(t, err) { + return + } + + rec := httptest.NewRecorder() + h.allocateHandler(rec, req) + assert.Equal(t, rec.Code, 500) + assert.Contains(t, rec.Body.String(), "internal server error") +} + func TestGettingCaCert(t *testing.T) { t.Parallel() diff --git a/install/helm/agones/templates/service/allocation.yaml b/install/helm/agones/templates/service/allocation.yaml index e2488efbcb..04b7859de4 100644 --- a/install/helm/agones/templates/service/allocation.yaml +++ b/install/helm/agones/templates/service/allocation.yaml @@ -136,9 +136,24 @@ metadata: release: {{ $.Release.Name }} heritage: {{ $.Release.Service }} rules: +- apiGroups: [""] + resources: ["events"] + verbs: ["create", "patch"] - apiGroups: ["allocation.agones.dev"] resources: ["gameserverallocations"] verbs: ["create"] +- apiGroups: [""] + resources: ["nodes", "secrets"] + verbs: ["get", "list", "watch"] +- apiGroups: ["agones.dev"] + resources: ["gameservers", "gameserversets"] + verbs: ["get", "list", "update", "watch"] +- apiGroups: ["agones.dev"] + resources: ["gameservers"] + verbs: ["patch"] +- apiGroups: ["multicluster.agones.dev"] + resources: ["gameserverallocationpolicies"] + verbs: ["get", "list", "watch"] --- # Create a ServiceAccount that will be bound to the above role diff --git a/install/yaml/install.yaml b/install/yaml/install.yaml index 10690cfe10..c528c19ee9 100644 --- a/install/yaml/install.yaml +++ b/install/yaml/install.yaml @@ -1174,9 +1174,24 @@ metadata: release: agones-manual heritage: Tiller rules: +- apiGroups: [""] + resources: ["events"] + verbs: ["create", "patch"] - apiGroups: ["allocation.agones.dev"] resources: ["gameserverallocations"] verbs: ["create"] +- apiGroups: [""] + resources: ["nodes", "secrets"] + verbs: ["get", "list", "watch"] +- apiGroups: ["agones.dev"] + resources: ["gameservers", "gameserversets"] + verbs: ["get", "list", "update", "watch"] +- apiGroups: ["agones.dev"] + resources: ["gameservers"] + verbs: ["patch"] +- apiGroups: ["multicluster.agones.dev"] + resources: ["gameserverallocationpolicies"] + verbs: ["get", "list", "watch"] --- # Create a ServiceAccount that will be bound to the above role