diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 39fcb40dbe..f0b30c492e 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -16,10 +16,10 @@ package main import ( - "strings" - "time" "os" "path/filepath" + "strings" + "time" "agones.dev/agones/pkg" "agones.dev/agones/pkg/client/clientset/versioned" @@ -28,7 +28,6 @@ import ( "agones.dev/agones/pkg/util/runtime" "agones.dev/agones/pkg/util/signals" "agones.dev/agones/pkg/util/webhooks" - "github.com/sirupsen/logrus" "github.com/spf13/pflag" "github.com/spf13/viper" extclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" @@ -46,15 +45,15 @@ const ( keyFileFlag = "key-file" ) -func init() { - logrus.SetFormatter(&logrus.JSONFormatter{}) -} +var ( + logger = runtime.NewLoggerWithSource("main") +) // main starts the operator for the gameserver CRD func main() { exec, err := os.Executable() if err != nil { - logrus.WithError(err).Fatal("Could not get executable path") + logger.WithError(err).Fatal("Could not get executable path") } base := filepath.Dir(exec) @@ -87,7 +86,7 @@ func main() { keyFile := viper.GetString(keyFileFlag) certFile := viper.GetString(certFileFlag) - logrus.WithField(sidecarFlag, sidecarImage). + logger.WithField(sidecarFlag, sidecarImage). WithField("minPort", minPort). WithField("maxPort", maxPort). WithField(keyFileFlag, keyFile). @@ -96,29 +95,29 @@ func main() { WithField("Version", pkg.Version).Info("starting gameServer operator...") if minPort <= 0 || maxPort <= 0 { - logrus.Fatal("Min Port and Max Port values are required.") + logger.Fatal("Min Port and Max Port values are required.") } else if maxPort < minPort { - logrus.Fatal("Max Port cannot be set less that the Min Port") + logger.Fatal("Max Port cannot be set less that the Min Port") } config, err := rest.InClusterConfig() if err != nil { - logrus.WithError(err).Fatal("Could not create in cluster config") + logger.WithError(err).Fatal("Could not create in cluster config") } kubeClient, err := kubernetes.NewForConfig(config) if err != nil { - logrus.WithError(err).Fatal("Could not create the kubernetes clientset") + logger.WithError(err).Fatal("Could not create the kubernetes clientset") } extClient, err := extclientset.NewForConfig(config) if err != nil { - logrus.WithError(err).Fatal("Could not create the api extension clientset") + logger.WithError(err).Fatal("Could not create the api extension clientset") } agonesClient, err := versioned.NewForConfig(config) if err != nil { - logrus.WithError(err).Fatal("Could not create the agones api clientset") + logger.WithError(err).Fatal("Could not create the agones api clientset") } agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, 30*time.Second) @@ -134,14 +133,14 @@ func main() { go func() { if err := wh.Run(stop); err != nil { // nolint: vetshadow - logrus.WithError(err).Fatal("could not run webhook server") + logger.WithError(err).Fatal("could not run webhook server") } }() err = c.Run(2, stop) if err != nil { - logrus.WithError(err).Fatal("Could not run gameserver controller") + logger.WithError(err).Fatal("Could not run gameserver controller") } - logrus.Info("Shut down gameserver controller") + logger.Info("Shut down gameserver controller") } diff --git a/cmd/sdk-server/main.go b/cmd/sdk-server/main.go index c0d5035e1a..441884b93f 100644 --- a/cmd/sdk-server/main.go +++ b/cmd/sdk-server/main.go @@ -26,7 +26,6 @@ import ( "agones.dev/agones/pkg/gameservers" "agones.dev/agones/pkg/sdk" "agones.dev/agones/pkg/util/runtime" - "github.com/sirupsen/logrus" "github.com/spf13/pflag" "github.com/spf13/viper" "golang.org/x/net/context" @@ -51,9 +50,9 @@ const ( healthFailureThresholdFlag = "health-failure-threshold" ) -func init() { - logrus.SetFormatter(&logrus.JSONFormatter{}) -} +var ( + logger = runtime.NewLoggerWithSource("main") +) func main() { viper.SetDefault(localFlag, false) @@ -92,7 +91,7 @@ func main() { healthInitialDelay := time.Duration(viper.GetInt64(healthInitialDelayFlag)) * time.Second healthFailureThreshold := viper.GetInt64(healthFailureThresholdFlag) - logrus.WithField(localFlag, isLocal).WithField("version", pkg.Version). + logger.WithField(localFlag, isLocal).WithField("version", pkg.Version). WithField("port", port).WithField(addressFlag, address). WithField(healthDisabledFlag, healthDisabled).WithField(healthTimeoutFlag, healthTimeout). WithField(healthFailureThresholdFlag, healthFailureThreshold). @@ -100,7 +99,7 @@ func main() { lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", address, port)) if err != nil { - logrus.WithField("port", port).WithField("address", address).Fatalf("Could not listen on port") + logger.WithField("port", port).WithField("address", address).Fatalf("Could not listen on port") } grpcServer := grpc.NewServer() @@ -109,24 +108,24 @@ func main() { } else { config, err := rest.InClusterConfig() if err != nil { - logrus.WithError(err).Fatal("Could not create in cluster config") + logger.WithError(err).Fatal("Could not create in cluster config") } kubeClient, err := kubernetes.NewForConfig(config) if err != nil { - logrus.WithError(err).Fatal("Could not create the kubernetes clientset") + logger.WithError(err).Fatal("Could not create the kubernetes clientset") } agonesClient, err := versioned.NewForConfig(config) if err != nil { - logrus.WithError(err).Fatalf("Could not create the agones api clientset") + logger.WithError(err).Fatalf("Could not create the agones api clientset") } var s *gameservers.SDKServer s, err = gameservers.NewSDKServer(viper.GetString(gameServerNameEnv), viper.GetString(podNamespaceEnv), healthDisabled, healthTimeout, healthFailureThreshold, healthInitialDelay, kubeClient, agonesClient) if err != nil { - logrus.WithError(err).Fatalf("Could not start sidecar") + logger.WithError(err).Fatalf("Could not start sidecar") } ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -137,6 +136,6 @@ func main() { err = grpcServer.Serve(lis) if err != nil { - logrus.WithError(err).Error("Could not serve grpc server") + logger.WithError(err).Error("Could not serve grpc server") } } diff --git a/pkg/apis/stable/v1alpha1/types_test.go b/pkg/apis/stable/v1alpha1/types_test.go index 264fb148d1..97e52c4dea 100644 --- a/pkg/apis/stable/v1alpha1/types_test.go +++ b/pkg/apis/stable/v1alpha1/types_test.go @@ -18,7 +18,6 @@ import ( "testing" "agones.dev/agones/pkg/apis/stable" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -218,7 +217,6 @@ func TestGameServerPod(t *testing.T) { assert.Equal(t, fixture.Spec.HostPort, pod.Spec.Containers[0].Ports[0].HostPort) assert.Equal(t, fixture.Spec.ContainerPort, pod.Spec.Containers[0].Ports[0].ContainerPort) assert.Equal(t, corev1.Protocol("UDP"), pod.Spec.Containers[0].Ports[0].Protocol) - logrus.SetFormatter(&logrus.JSONFormatter{}) assert.True(t, metav1.IsControlledBy(pod, fixture)) sidecar := corev1.Container{Name: "sidecar", Image: "container/sidecar"} diff --git a/pkg/gameservers/controller.go b/pkg/gameservers/controller.go index f4267f2c67..8e291e2cd1 100644 --- a/pkg/gameservers/controller.go +++ b/pkg/gameservers/controller.go @@ -57,6 +57,7 @@ var ( // Controller is a the main GameServer crd controller type Controller struct { + logger *logrus.Entry sidecarImage string alwaysPullSidecarImage bool crdGetter v1beta1.CustomResourceDefinitionInterface @@ -90,11 +91,6 @@ func NewController( gameServers := agonesInformerFactory.Stable().V1alpha1().GameServers() gsInformer := gameServers.Informer() - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(logrus.Infof) - eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "gameserver-controller"}) - c := &Controller{ sidecarImage: sidecarImage, alwaysPullSidecarImage: alwaysPullSidecarImage, @@ -108,9 +104,15 @@ func NewController( queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), stable.GroupName+".GameServerController"), portAllocator: NewPortAllocator(minPort, maxPort, kubeInformerFactory, agonesInformerFactory), healthController: NewHealthController(kubeClient, agonesClient, kubeInformerFactory, agonesInformerFactory), - recorder: recorder, } + c.logger = runtime.NewLoggerWithType(c) + + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(c.logger.Infof) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) + c.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "gameserver-controller"}) + wh.AddHandler("/mutate", stablev1alpha1.Kind("GameServer"), admv1beta1.Create, c.creationHandler) gsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -143,7 +145,7 @@ func NewController( mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { _, err := w.Write([]byte("ok")) if err != nil { - logrus.WithError(err).Error("could not send ok response on healthz") + c.logger.WithError(err).Error("could not send ok response on healthz") w.WriteHeader(http.StatusInternalServerError) } }) @@ -160,7 +162,7 @@ func NewController( // the default values on the GameServer, and validates the results // Should only be called on gameserver create operations. func (c *Controller) creationHandler(review admv1beta1.AdmissionReview) (admv1beta1.AdmissionReview, error) { - logrus.WithField("review", review).Info("creationHandler") + c.logger.WithField("review", review).Info("creationHandler") obj := review.Request.Object gs := &stablev1alpha1.GameServer{} @@ -188,7 +190,7 @@ func (c *Controller) creationHandler(review admv1beta1.AdmissionReview) (admv1be Details: &details, } - logrus.WithField("review", review).Info("Invalid GameServer") + c.logger.WithField("review", review).Info("Invalid GameServer") return review, nil } @@ -207,7 +209,7 @@ func (c *Controller) creationHandler(review admv1beta1.AdmissionReview) (admv1be return review, errors.Wrapf(err, "error creating json for patch for GameServer %s", gs.ObjectMeta.Name) } - logrus.WithField("gs", gs.ObjectMeta.Name).WithField("patch", string(json)).Infof("patch created!") + c.logger.WithField("gs", gs.ObjectMeta.Name).WithField("patch", string(json)).Infof("patch created!") pt := admv1beta1.PatchTypeJSONPatch review.Response.PatchType = &pt @@ -221,14 +223,14 @@ func (c *Controller) creationHandler(review admv1beta1.AdmissionReview) (admv1be func (c Controller) Run(threadiness int, stop <-chan struct{}) error { defer c.queue.ShutDown() - logrus.Info("Starting health check...") + c.logger.Info("Starting health check...") go func() { if err := c.server.ListenAndServe(); err != nil { if err == http.ErrServerClosed { - logrus.WithError(err).Info("health check: http server closed") + c.logger.WithError(err).Info("health check: http server closed") } else { err := errors.Wrap(err, "Could not listen on :8080") - runtime.HandleError(logrus.WithError(err), err) + runtime.HandleError(c.logger.WithError(err), err) } } }() @@ -239,7 +241,7 @@ func (c Controller) Run(threadiness int, stop <-chan struct{}) error { return err } - logrus.Info("Wait for cache sync") + c.logger.Info("Wait for cache sync") if !cache.WaitForCacheSync(stop, c.gameServerSynced) { return errors.New("failed to wait for caches to sync") } @@ -252,7 +254,7 @@ func (c Controller) Run(threadiness int, stop <-chan struct{}) error { // Run the Health Controller go c.healthController.Run(stop) - logrus.Info("Starting workers...") + c.logger.Info("Starting workers...") for i := 0; i < threadiness; i++ { go wait.Until(c.runWorker, time.Second, stop) } @@ -269,7 +271,7 @@ func (c Controller) enqueueGameServer(obj interface{}) { var err error if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { err := errors.Wrap(err, "Error creating key for object") - runtime.HandleError(logrus.WithField("obj", obj), err) + runtime.HandleError(c.logger.WithField("obj", obj), err) return } c.queue.AddRateLimited(key) @@ -290,12 +292,12 @@ func (c *Controller) processNextWorkItem() bool { } defer c.queue.Done(obj) - logrus.WithField("obj", obj).Info("Processing obj") + c.logger.WithField("obj", obj).Info("Processing obj") var key string var ok bool if key, ok = obj.(string); !ok { - runtime.HandleError(logrus.WithField("obj", obj), errors.Errorf("expected string in queue, but got %T", obj)) + runtime.HandleError(c.logger.WithField("obj", obj), errors.Errorf("expected string in queue, but got %T", obj)) // this is a bad entry, we don't want to reprocess c.queue.Forget(obj) return true @@ -303,7 +305,7 @@ func (c *Controller) processNextWorkItem() bool { if err := c.syncHandler(key); err != nil { // we don't forget here, because we want this to be retried via the queue - runtime.HandleError(logrus.WithField("obj", obj), err) + runtime.HandleError(c.logger.WithField("obj", obj), err) c.queue.AddRateLimited(obj) return true } @@ -315,20 +317,20 @@ func (c *Controller) processNextWorkItem() bool { // syncGameServer synchronises the Pods for the GameServers. // and reacts to status changes that can occur through the client SDK func (c *Controller) syncGameServer(key string) error { - logrus.WithField("key", key).Info("Synchronising") + c.logger.WithField("key", key).Info("Synchronising") // Convert the namespace/name string into a distinct namespace and name namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { // don't return an error, as we don't want this retried - runtime.HandleError(logrus.WithField("key", key), errors.Wrapf(err, "invalid resource key")) + runtime.HandleError(c.logger.WithField("key", key), errors.Wrapf(err, "invalid resource key")) return nil } gs, err := c.gameServerLister.GameServers(namespace).Get(name) if err != nil { if k8serrors.IsNotFound(err) { - logrus.WithField("key", key).Info("GameServer is no longer available for syncing") + c.logger.WithField("key", key).Info("GameServer is no longer available for syncing") return nil } return errors.Wrapf(err, "error retrieving GameServer %s from namespace %s", name, namespace) @@ -362,14 +364,14 @@ func (c *Controller) syncGameServerDeletionTimestamp(gs *stablev1alpha1.GameServ return gs, nil } - logrus.WithField("gs", gs).Info("Syncing with Deletion Timestamp") + c.logger.WithField("gs", gs).Info("Syncing with Deletion Timestamp") pods, err := c.listGameServerPods(gs) if err != nil { return gs, err } if len(pods) > 0 { - logrus.WithField("pods", pods).WithField("gsName", gs.ObjectMeta.Name).Info("Found pods, deleting") + c.logger.WithField("pods", pods).WithField("gsName", gs.ObjectMeta.Name).Info("Found pods, deleting") for _, p := range pods { err := c.podGetter.Pods(p.ObjectMeta.Namespace).Delete(p.ObjectMeta.Name, nil) if err != nil { @@ -389,7 +391,7 @@ func (c *Controller) syncGameServerDeletionTimestamp(gs *stablev1alpha1.GameServ } } gsCopy.ObjectMeta.Finalizers = fin - logrus.WithField("gs", gsCopy).Infof("No pods found, removing finalizer %s", stable.GroupName) + c.logger.WithField("gs", gsCopy).Infof("No pods found, removing finalizer %s", stable.GroupName) gs, err = c.gameServerGetter.GameServers(gsCopy.ObjectMeta.Namespace).Update(gsCopy) return gs, errors.Wrapf(err, "error removing finalizer for GameServer %s", gsCopy.ObjectMeta.Name) } @@ -410,7 +412,7 @@ func (c *Controller) syncGameServerPortAllocationState(gs *stablev1alpha1.GameSe gsCopy.Status.State = stablev1alpha1.Creating c.recorder.Event(gs, corev1.EventTypeNormal, string(gs.Status.State), "Port allocated") - logrus.WithField("gs", gsCopy).Info("Syncing Port Allocation State") + c.logger.WithField("gs", gsCopy).Info("Syncing Port Allocation State") gs, err = c.gameServerGetter.GameServers(gs.ObjectMeta.Namespace).Update(gsCopy) if err != nil { // if the GameServer doesn't get updated with the port data, then put the port @@ -429,7 +431,7 @@ func (c *Controller) syncGameServerCreatingState(gs *stablev1alpha1.GameServer) return gs, nil } - logrus.WithField("gs", gs).Info("Syncing Create State") + c.logger.WithField("gs", gs).Info("Syncing Create State") // Maybe something went wrong, and the pod was created, but the state was never moved to Starting, so let's check ret, err := c.listGameServerPods(gs) @@ -443,17 +445,17 @@ func (c *Controller) syncGameServerCreatingState(gs *stablev1alpha1.GameServer) // this shouldn't happen, but if it does. if err != nil { - logrus.WithField("gameserver", gs).WithError(err).Error("error creating pod from Game Server") + c.logger.WithField("gameserver", gs).WithError(err).Error("error creating pod from Game Server") return c.moveToErrorState(gs, err.Error()) } c.addGameServerHealthCheck(gs, pod) - logrus.WithField("pod", pod).Info("creating Pod for GameServer") + c.logger.WithField("pod", pod).Info("creating Pod for GameServer") pod, err = c.podGetter.Pods(gs.ObjectMeta.Namespace).Create(pod) if err != nil { if k8serrors.IsInvalid(err) { - logrus.WithField("pod", pod).WithField("gameserver", gs).Errorf("Pod created is invalid") + c.logger.WithField("pod", pod).WithField("gameserver", gs).Errorf("Pod created is invalid") return c.moveToErrorState(gs, err.Error()) } return gs, errors.Wrapf(err, "error creating Pod for GameServer %s", gs.Name) @@ -513,9 +515,7 @@ func (c *Controller) sidecar(gs *stablev1alpha1.GameServer) corev1.Container { func (c *Controller) addGameServerHealthCheck(gs *stablev1alpha1.GameServer, pod *corev1.Pod) { if !gs.Spec.Health.Disabled { for i, c := range pod.Spec.Containers { - logrus.WithField("c", c.Name).WithField("Container", gs.Spec.Container).Info("Checking name and container") if c.Name == gs.Spec.Container { - logrus.WithField("liveness", c.LivenessProbe).Info("Found container") if c.LivenessProbe == nil { c.LivenessProbe = &corev1.Probe{ Handler: corev1.Handler{ @@ -528,7 +528,6 @@ func (c *Controller) addGameServerHealthCheck(gs *stablev1alpha1.GameServer, pod PeriodSeconds: gs.Spec.Health.PeriodSeconds, FailureThreshold: gs.Spec.Health.FailureThreshold, } - logrus.WithField("container", c).WithField("pod", pod).Info("Final pod") pod.Spec.Containers[i] = c } break @@ -546,7 +545,7 @@ func (c *Controller) syncGameServerRequestReadyState(gs *stablev1alpha1.GameServ return gs, nil } - logrus.WithField("gs", gs).Info("Syncing RequestReady State") + c.logger.WithField("gs", gs).Info("Syncing RequestReady State") pod, err := c.gameServerPod(gs) if err != nil { @@ -580,7 +579,7 @@ func (c *Controller) syncGameServerShutdownState(gs *stablev1alpha1.GameServer) return nil } - logrus.WithField("gs", gs).Info("Syncing Shutdown State") + c.logger.WithField("gs", gs).Info("Syncing Shutdown State") // Do it in the foreground, so the GameServer gets killed last p := metav1.DeletePropagationForeground err := c.gameServerGetter.GameServers(gs.ObjectMeta.Namespace).Delete(gs.ObjectMeta.Name, &metav1.DeleteOptions{PropagationPolicy: &p}) @@ -659,7 +658,7 @@ func (c Controller) Address(pod *corev1.Pod) (string, error) { } // minikube only has an InternalIP on a Node, so we'll fall back to that. - logrus.WithField("node", node.ObjectMeta.Name).Warn("Could not find ExternalIP. Falling back to Internal") + c.logger.WithField("node", node.ObjectMeta.Name).Warn("Could not find ExternalIP. Falling back to Internal") for _, a := range node.Status.Addresses { if a.Type == corev1.NodeInternalIP { return a.Address, nil @@ -682,7 +681,7 @@ func (c Controller) waitForEstablishedCRD() error { switch cond.Type { case apiv1beta1.Established: if cond.Status == apiv1beta1.ConditionTrue { - logrus.WithField("crd", crd).Info("GameServer custom resource definition is established") + c.logger.WithField("crd", crd).Info("GameServer custom resource definition is established") return true, err } } diff --git a/pkg/gameservers/health.go b/pkg/gameservers/health.go index 76592baaaf..14bf46f9d3 100644 --- a/pkg/gameservers/health.go +++ b/pkg/gameservers/health.go @@ -45,6 +45,7 @@ import ( // an Unhealthy state if the GameServer main container exits when in // a Ready state type HealthController struct { + logger *logrus.Entry podSynced cache.InformerSynced podLister corelisterv1.PodLister gameServerGetter getterv1alpha1.GameServersGetter @@ -66,8 +67,10 @@ func NewHealthController(kubeClient kubernetes.Interface, agonesClient versioned queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), stable.GroupName+".HealthController"), } + hc.logger = runtime.NewLoggerWithType(hc) + eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(logrus.Infof) + eventBroadcaster.StartLogging(hc.logger.Infof) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) hc.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "health-controller"}) @@ -77,7 +80,7 @@ func NewHealthController(kubeClient kubernetes.Interface, agonesClient versioned if owner := metav1.GetControllerOf(pod); owner != nil && owner.Kind == "GameServer" { if v1alpha1.GameServerRolePodSelector.Matches(labels.Set(pod.Labels)) && hc.failedContainer(pod) { key := pod.ObjectMeta.Namespace + "/" + owner.Name - logrus.WithField("key", key).Info("GameServer container has terminated") + hc.logger.WithField("key", key).Info("GameServer container has terminated") hc.enqueueGameServer(key) } } @@ -109,10 +112,10 @@ func (hc *HealthController) enqueueGameServer(key string) { func (hc *HealthController) Run(stop <-chan struct{}) { defer hc.queue.ShutDown() - logrus.Info("Starting health worker") + hc.logger.Info("Starting worker") go wait.Until(hc.runWorker, time.Second, stop) <-stop - logrus.Info("Shut down health worker") + hc.logger.Info("Shut down worker") } // runWorker is a long-running function that will continually call the @@ -130,12 +133,12 @@ func (hc *HealthController) processNextWorkItem() bool { } defer hc.queue.Done(obj) - logrus.WithField("obj", obj).Info("Processing obj") + hc.logger.WithField("obj", obj).Info("Processing obj") var key string var ok bool if key, ok = obj.(string); !ok { - runtime.HandleError(logrus.WithField("obj", obj), errors.Errorf("expected string in queue, but got %T", obj)) + runtime.HandleError(hc.logger.WithField("obj", obj), errors.Errorf("expected string in queue, but got %T", obj)) // this is a bad entry, we don't want to reprocess hc.queue.Forget(obj) return true @@ -143,7 +146,7 @@ func (hc *HealthController) processNextWorkItem() bool { if err := hc.syncGameServer(key); err != nil { // we don't forget here, because we want this to be retried via the queue - runtime.HandleError(logrus.WithField("obj", obj), err) + runtime.HandleError(hc.logger.WithField("obj", obj), err) hc.queue.AddRateLimited(obj) return true } @@ -154,27 +157,27 @@ func (hc *HealthController) processNextWorkItem() bool { // syncGameServer sets the GameSerer to Unhealthy, if its state is Ready func (hc *HealthController) syncGameServer(key string) error { - logrus.WithField("key", key).Info("Synchronising") + hc.logger.WithField("key", key).Info("Synchronising") // Convert the namespace/name string into a distinct namespace and name namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { // don't return an error, as we don't want this retried - runtime.HandleError(logrus.WithField("key", key), errors.Wrapf(err, "invalid resource key")) + runtime.HandleError(hc.logger.WithField("key", key), errors.Wrapf(err, "invalid resource key")) return nil } gs, err := hc.gameServerLister.GameServers(namespace).Get(name) if err != nil { if k8serrors.IsNotFound(err) { - logrus.WithField("key", key).Info("GameServer is no longer available for syncing") + hc.logger.WithField("key", key).Info("GameServer is no longer available for syncing") return nil } return errors.Wrapf(err, "error retrieving GameServer %s from namespace %s", name, namespace) } if gs.Status.State == v1alpha1.Ready { - logrus.WithField("gs", gs).Infof("Marking GameServer as Unhealthy") + hc.logger.WithField("gs", gs).Infof("Marking GameServer as Unhealthy") gsCopy := gs.DeepCopy() gsCopy.Status.State = v1alpha1.Unhealthy diff --git a/pkg/gameservers/portallocator.go b/pkg/gameservers/portallocator.go index 4c79b3a812..67be8d1a57 100644 --- a/pkg/gameservers/portallocator.go +++ b/pkg/gameservers/portallocator.go @@ -42,6 +42,7 @@ type portAllocation map[int32]bool // The PortAllocator does not currently support mixing static portAllocations (or any pods with defined HostPort) // within the dynamic port range other than the ones it coordinates. type PortAllocator struct { + logger *logrus.Entry mutex sync.RWMutex portAllocations []portAllocation minPort int32 @@ -60,7 +61,6 @@ type PortAllocator struct { func NewPortAllocator(minPort, maxPort int32, kubeInformerFactory informers.SharedInformerFactory, agonesInformerFactory externalversions.SharedInformerFactory) *PortAllocator { - logrus.WithField("minPort", minPort).WithField("maxPort", maxPort).Info("Starting port allocator") v1 := kubeInformerFactory.Core().V1() nodes := v1.Nodes() @@ -77,14 +77,16 @@ func NewPortAllocator(minPort, maxPort int32, nodeInformer: nodes.Informer(), nodeSynced: nodes.Informer().HasSynced, } + pa.logger = runtime.NewLoggerWithType(pa) + pa.logger.WithField("minPort", minPort).WithField("maxPort", maxPort).Info("Starting") return pa } // Run sets up the current state of port allocations and // starts tracking Pod and Node changes func (pa *PortAllocator) Run(stop <-chan struct{}) error { - logrus.Info("Running Pod Allocator") + pa.logger.Info("Running") pa.gameServerInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ DeleteFunc: pa.syncDeleteGameServer, }) @@ -99,7 +101,7 @@ func (pa *PortAllocator) Run(stop <-chan struct{}) error { err := pa.syncPortAllocations(stop) if err != nil { err := errors.Wrap(err, "error resetting ports on node update") - runtime.HandleError(logrus.WithField("node", newNode), err) + runtime.HandleError(pa.logger.WithField("node", newNode), err) } } }, @@ -107,12 +109,12 @@ func (pa *PortAllocator) Run(stop <-chan struct{}) error { err := pa.syncPortAllocations(stop) if err != nil { err := errors.Wrap(err, "error on node deletion") - runtime.HandleError(logrus.WithField("node", obj), err) + runtime.HandleError(pa.logger.WithField("node", obj), err) } }, }) - logrus.Info("Flush cache sync, before syncing gameserver and node state") + pa.logger.Info("Flush cache sync, before syncing gameserver and node state") if !cache.WaitForCacheSync(stop, pa.gameServerSynced, pa.nodeSynced) { return nil } @@ -150,7 +152,7 @@ func (pa *PortAllocator) syncAddNode(obj interface{}) { defer pa.mutex.Unlock() node := obj.(*corev1.Node) - logrus.WithField("node", node.ObjectMeta.Name).Info("Adding Node to port allocations") + pa.logger.WithField("node", node.ObjectMeta.Name).Info("Adding Node to port allocations") ports := portAllocation{} for i := pa.minPort; i <= pa.maxPort; i++ { @@ -164,7 +166,7 @@ func (pa *PortAllocator) syncAddNode(obj interface{}) { // make the HostPort available func (pa *PortAllocator) syncDeleteGameServer(object interface{}) { gs := object.(*v1alpha1.GameServer) - logrus.WithField("gs", gs).Info("syncing deleted GameServer") + pa.logger.WithField("gs", gs).Info("syncing deleted GameServer") pa.DeAllocate(gs.Spec.HostPort) } @@ -178,7 +180,7 @@ func (pa *PortAllocator) syncPortAllocations(stop <-chan struct{}) error { pa.mutex.Lock() defer pa.mutex.Unlock() - logrus.Info("Resetting Port Allocation") + pa.logger.Info("Resetting Port Allocation") if !cache.WaitForCacheSync(stop, pa.gameServerSynced, pa.nodeSynced) { return nil diff --git a/pkg/gameservers/sdkserver.go b/pkg/gameservers/sdkserver.go index c8ae690f13..698066256c 100644 --- a/pkg/gameservers/sdkserver.go +++ b/pkg/gameservers/sdkserver.go @@ -46,6 +46,7 @@ var _ sdk.SDKServer = &SDKServer{} // SDKServer is a gRPC server, that is meant to be a sidecar // for a GameServer that will update the game server status on SDK requests type SDKServer struct { + logger *logrus.Entry gameServerName string namespace string gameServerGetter typedv1alpha1.GameServersGetter @@ -68,18 +69,6 @@ func NewSDKServer(gameServerName, namespace string, kubeClient kubernetes.Interface, agonesClient versioned.Interface) (*SDKServer, error) { mux := http.NewServeMux() - mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { - _, err := w.Write([]byte("ok")) - if err != nil { - logrus.WithError(err).Error("could not send ok response on healthz") - w.WriteHeader(http.StatusInternalServerError) - } - }) - - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(logrus.Infof) - eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "gameserver-sidecar"}) s := &SDKServer{ gameServerName: gameServerName, @@ -95,14 +84,27 @@ func NewSDKServer(gameServerName, namespace string, healthTimeout: healthTimeout, healthMutex: sync.RWMutex{}, healthFailureCount: 0, - recorder: recorder, } + s.logger = runtime.NewLoggerWithType(s) + + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(s.logger.Infof) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) + s.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "gameserver-sidecar"}) + + mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { + _, err := w.Write([]byte("ok")) + if err != nil { + s.logger.WithError(err).Error("could not send ok response on healthz") + w.WriteHeader(http.StatusInternalServerError) + } + }) mux.HandleFunc("/gshealthz", func(w http.ResponseWriter, r *http.Request) { if s.healthy() { _, err := w.Write([]byte("ok")) if err != nil { - logrus.WithError(err).Error("could not send ok response on gshealthz") + s.logger.WithError(err).Error("could not send ok response on gshealthz") w.WriteHeader(http.StatusInternalServerError) } } else { @@ -113,7 +115,7 @@ func NewSDKServer(gameServerName, namespace string, s.initHealthLastUpdated(healthInitialDelay) s.queue = s.newWorkQueue() - logrus.WithField("gameServerName", s.gameServerName).WithField("namespace", s.namespace).Info("created GameServer sidecar") + s.logger.WithField("gameServerName", s.gameServerName).WithField("namespace", s.namespace).Info("created GameServer sidecar") return s, nil } @@ -134,28 +136,28 @@ func (s *SDKServer) newWorkQueue() workqueue.RateLimitingInterface { func (s *SDKServer) Run(stop <-chan struct{}) { defer s.queue.ShutDown() - logrus.Info("Starting SDKServer http health check...") + s.logger.Info("Starting SDKServer http health check...") go func() { if err := s.server.ListenAndServe(); err != nil { if err == http.ErrServerClosed { - logrus.WithError(err).Info("health check: http server closed") + s.logger.WithError(err).Info("health check: http server closed") } else { err := errors.Wrap(err, "Could not listen on :8080") - runtime.HandleError(logrus.WithError(err), err) + runtime.HandleError(s.logger.WithError(err), err) } } }() defer s.server.Close() // nolint: errcheck if !s.healthDisabled { - logrus.Info("Starting GameServer health checking") + s.logger.Info("Starting GameServer health checking") go wait.Until(s.runHealth, s.healthTimeout, stop) } - logrus.Info("Starting worker") + s.logger.Info("Starting worker") go wait.Until(s.runWorker, time.Second, stop) <-stop - logrus.Info("Shut down workers and health checking") + s.logger.Info("Shut down workers and health checking") } // runWorker is a long-running function that will continually call the @@ -173,12 +175,12 @@ func (s *SDKServer) processNextWorkItem() bool { } defer s.queue.Done(obj) - logrus.WithField("obj", obj).Info("Processing obj") + s.logger.WithField("obj", obj).Info("Processing obj") var state stablev1alpha1.State var ok bool if state, ok = obj.(stablev1alpha1.State); !ok { - runtime.HandleError(logrus.WithField("obj", obj), errors.Errorf("expected State in queue, but got %T", obj)) + runtime.HandleError(s.logger.WithField("obj", obj), errors.Errorf("expected State in queue, but got %T", obj)) // this is a bad entry, we don't want to reprocess s.queue.Forget(obj) return true @@ -186,7 +188,7 @@ func (s *SDKServer) processNextWorkItem() bool { if err := s.updateState(state); err != nil { // we don't forget here, because we want this to be retried via the queue - runtime.HandleError(logrus.WithField("obj", obj), err) + runtime.HandleError(s.logger.WithField("obj", obj), err) s.queue.AddRateLimited(obj) return true } @@ -198,7 +200,7 @@ func (s *SDKServer) processNextWorkItem() bool { // updateState sets the GameServer Status's state to the state // that has been passed through func (s *SDKServer) updateState(state stablev1alpha1.State) error { - logrus.WithField("state", state).Info("Updating state") + s.logger.WithField("state", state).Info("Updating state") gameServers := s.gameServerGetter.GameServers(s.namespace) gs, err := gameServers.Get(s.gameServerName, metav1.GetOptions{}) if err != nil { @@ -207,7 +209,7 @@ func (s *SDKServer) updateState(state stablev1alpha1.State) error { // if the state is currently unhealthy, you can't go back to Ready if gs.Status.State == stablev1alpha1.Unhealthy { - logrus.Info("State already unhealthy. Skipping update.") + s.logger.Info("State already unhealthy. Skipping update.") return nil } @@ -225,7 +227,7 @@ func (s *SDKServer) updateState(state stablev1alpha1.State) error { // Ready enters the RequestReady state change for this GameServer into // the workqueue so it can be updated func (s *SDKServer) Ready(ctx context.Context, e *sdk.Empty) (*sdk.Empty, error) { - logrus.Info("Received Ready request, adding to queue") + s.logger.Info("Received Ready request, adding to queue") s.queue.AddRateLimited(stablev1alpha1.RequestReady) return e, nil } @@ -233,7 +235,7 @@ func (s *SDKServer) Ready(ctx context.Context, e *sdk.Empty) (*sdk.Empty, error) // Shutdown enters the Shutdown state change for this GameServer into // the workqueue so it can be updated func (s *SDKServer) Shutdown(ctx context.Context, e *sdk.Empty) (*sdk.Empty, error) { - logrus.Info("Received Shutdown request, adding to queue") + s.logger.Info("Received Shutdown request, adding to queue") s.queue.AddRateLimited(stablev1alpha1.Shutdown) return e, nil } @@ -244,13 +246,13 @@ func (s *SDKServer) Health(stream sdk.SDK_HealthServer) error { for { _, err := stream.Recv() if err == io.EOF { - logrus.Info("Health stream closed.") + s.logger.Info("Health stream closed.") return stream.SendAndClose(&sdk.Empty{}) } if err != nil { return errors.Wrap(err, "Error with Health check") } - logrus.Info("Health Ping Received") + s.logger.Info("Health Ping Received") s.touchHealthLastUpdated() } } @@ -261,7 +263,7 @@ func (s *SDKServer) Health(stream sdk.SDK_HealthServer) error { func (s *SDKServer) runHealth() { s.checkHealth() if !s.healthy() { - logrus.WithField("gameServerName", s.gameServerName).Info("being marked as not healthy") + s.logger.WithField("gameServerName", s.gameServerName).Info("being marked as not healthy") s.queue.AddRateLimited(stablev1alpha1.Unhealthy) } } @@ -276,7 +278,7 @@ func (s *SDKServer) touchHealthLastUpdated() { } // checkHealth checks the healthLastUpdated value -// and if it is outside the timeout value, log and +// and if it is outside the timeout value, logger and // count a failure func (s *SDKServer) checkHealth() { timeout := s.healthLastUpdated.Add(s.healthTimeout) @@ -284,7 +286,7 @@ func (s *SDKServer) checkHealth() { s.healthMutex.Lock() defer s.healthMutex.Unlock() s.healthFailureCount++ - logrus.WithField("failureCount", s.healthFailureCount).Infof("GameServer Health Check failed") + s.logger.WithField("failureCount", s.healthFailureCount).Infof("GameServer Health Check failed") } } diff --git a/pkg/util/runtime/runtime.go b/pkg/util/runtime/runtime.go index a8afa350db..433f2caef8 100644 --- a/pkg/util/runtime/runtime.go +++ b/pkg/util/runtime/runtime.go @@ -24,6 +24,8 @@ import ( "k8s.io/apimachinery/pkg/util/runtime" ) +const sourceKey = "source" + // stackTracer is the pkg/errors stacktrace interface type stackTracer interface { StackTrace() errors.StackTrace @@ -31,6 +33,8 @@ type stackTracer interface { // replace the standard glog error logger, with a logrus one func init() { + logrus.SetFormatter(&logrus.JSONFormatter{}) + runtime.ErrorHandlers[0] = func(err error) { if stackTrace, ok := err.(stackTracer); ok { var stack []string @@ -58,3 +62,14 @@ func Must(err error) { panic(err) } } + +// NewLoggerWithSource returns a logrus.Entry to use when you want to specify an source +func NewLoggerWithSource(source string) *logrus.Entry { + return logrus.WithField(sourceKey, source) +} + +// NewLoggerWithType returns a logrus.Entry to use when you want to use a data type as the source +// such as when you have a struct with methods +func NewLoggerWithType(obj interface{}) *logrus.Entry { + return NewLoggerWithSource(fmt.Sprintf("%T", obj)) +} diff --git a/pkg/util/webhooks/webhooks.go b/pkg/util/webhooks/webhooks.go index 52116c1af9..de251ee695 100644 --- a/pkg/util/webhooks/webhooks.go +++ b/pkg/util/webhooks/webhooks.go @@ -34,6 +34,7 @@ type Server interface { // WebHook manage Kubernetes webhooks type WebHook struct { + logger *logrus.Entry mux *http.ServeMux server Server certFile string @@ -60,13 +61,16 @@ func NewWebHook(certFile, keyFile string) *WebHook { Handler: mux, } - return &WebHook{ + wh := &WebHook{ mux: mux, server: &server, certFile: certFile, keyFile: keyFile, handlers: map[string][]operationHandler{}, } + wh.logger = runtime.NewLoggerWithType(wh) + + return wh } // Run runs the webhook server, starting a https listener. @@ -77,11 +81,11 @@ func (wh *WebHook) Run(stop <-chan struct{}) error { wh.server.Close() // nolint: errcheck }() - logrus.WithField("webook", wh).Infof("webhook: https server started") + wh.logger.WithField("webook", wh).Infof("https server started") err := wh.server.ListenAndServeTLS(wh.certFile, wh.keyFile) if err == http.ErrServerClosed { - logrus.WithError(err).Info("webhook: https server closed") + wh.logger.WithError(err).Info("https server closed") return nil } @@ -94,18 +98,18 @@ func (wh *WebHook) AddHandler(path string, gk schema.GroupKind, op v1beta1.Opera wh.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { err := wh.handle(path, w, r) if err != nil { - runtime.HandleError(logrus.WithField("url", r.URL), err) + runtime.HandleError(wh.logger.WithField("url", r.URL), err) w.WriteHeader(http.StatusInternalServerError) } }) } - logrus.WithField("path", path).WithField("groupKind", gk).WithField("op", op).Info("Added webhook handler") + wh.logger.WithField("path", path).WithField("groupKind", gk).WithField("op", op).Info("Added webhook handler") wh.handlers[path] = append(wh.handlers[path], operationHandler{groupKind: gk, operation: op, handler: h}) } // handle Handles http requests for webhooks func (wh *WebHook) handle(path string, w http.ResponseWriter, r *http.Request) error { - logrus.WithField("path", path).Info("running webhook") + wh.logger.WithField("path", path).Info("running webhook") var review v1beta1.AdmissionReview err := json.NewDecoder(r.Body).Decode(&review)