From d99b932940d1b1c180af0977586b8c42cb3741df Mon Sep 17 00:00:00 2001 From: Mark Mandel Date: Thu, 22 Feb 2018 14:54:57 -0800 Subject: [PATCH] Add an `source` to all log statements This includes a `logger` as a data members for all objects, which sets a default "source" field on every log statement so that it's easy to track where log messages are coming from. This is particularly useful as the controller for Agones gets more complicated. --- cmd/controller/main.go | 33 ++++++------ cmd/sdk-server/main.go | 21 ++++---- pkg/apis/stable/v1alpha1/types_test.go | 2 - pkg/gameservers/controller.go | 73 +++++++++++++------------- pkg/gameservers/health.go | 25 +++++---- pkg/gameservers/portallocator.go | 18 ++++--- pkg/gameservers/sdkserver.go | 68 ++++++++++++------------ pkg/util/runtime/runtime.go | 15 ++++++ pkg/util/webhooks/webhooks.go | 16 +++--- 9 files changed, 146 insertions(+), 125 deletions(-) diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 39fcb40dbe..f0b30c492e 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -16,10 +16,10 @@ package main import ( - "strings" - "time" "os" "path/filepath" + "strings" + "time" "agones.dev/agones/pkg" "agones.dev/agones/pkg/client/clientset/versioned" @@ -28,7 +28,6 @@ import ( "agones.dev/agones/pkg/util/runtime" "agones.dev/agones/pkg/util/signals" "agones.dev/agones/pkg/util/webhooks" - "github.com/sirupsen/logrus" "github.com/spf13/pflag" "github.com/spf13/viper" extclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" @@ -46,15 +45,15 @@ const ( keyFileFlag = "key-file" ) -func init() { - logrus.SetFormatter(&logrus.JSONFormatter{}) -} +var ( + logger = runtime.NewLoggerWithSource("main") +) // main starts the operator for the gameserver CRD func main() { exec, err := os.Executable() if err != nil { - logrus.WithError(err).Fatal("Could not get executable path") + logger.WithError(err).Fatal("Could not get executable path") } base := filepath.Dir(exec) @@ -87,7 +86,7 @@ func main() { keyFile := viper.GetString(keyFileFlag) certFile := viper.GetString(certFileFlag) - logrus.WithField(sidecarFlag, sidecarImage). + logger.WithField(sidecarFlag, sidecarImage). WithField("minPort", minPort). WithField("maxPort", maxPort). WithField(keyFileFlag, keyFile). @@ -96,29 +95,29 @@ func main() { WithField("Version", pkg.Version).Info("starting gameServer operator...") if minPort <= 0 || maxPort <= 0 { - logrus.Fatal("Min Port and Max Port values are required.") + logger.Fatal("Min Port and Max Port values are required.") } else if maxPort < minPort { - logrus.Fatal("Max Port cannot be set less that the Min Port") + logger.Fatal("Max Port cannot be set less that the Min Port") } config, err := rest.InClusterConfig() if err != nil { - logrus.WithError(err).Fatal("Could not create in cluster config") + logger.WithError(err).Fatal("Could not create in cluster config") } kubeClient, err := kubernetes.NewForConfig(config) if err != nil { - logrus.WithError(err).Fatal("Could not create the kubernetes clientset") + logger.WithError(err).Fatal("Could not create the kubernetes clientset") } extClient, err := extclientset.NewForConfig(config) if err != nil { - logrus.WithError(err).Fatal("Could not create the api extension clientset") + logger.WithError(err).Fatal("Could not create the api extension clientset") } agonesClient, err := versioned.NewForConfig(config) if err != nil { - logrus.WithError(err).Fatal("Could not create the agones api clientset") + logger.WithError(err).Fatal("Could not create the agones api clientset") } agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, 30*time.Second) @@ -134,14 +133,14 @@ func main() { go func() { if err := wh.Run(stop); err != nil { // nolint: vetshadow - logrus.WithError(err).Fatal("could not run webhook server") + logger.WithError(err).Fatal("could not run webhook server") } }() err = c.Run(2, stop) if err != nil { - logrus.WithError(err).Fatal("Could not run gameserver controller") + logger.WithError(err).Fatal("Could not run gameserver controller") } - logrus.Info("Shut down gameserver controller") + logger.Info("Shut down gameserver controller") } diff --git a/cmd/sdk-server/main.go b/cmd/sdk-server/main.go index c0d5035e1a..441884b93f 100644 --- a/cmd/sdk-server/main.go +++ b/cmd/sdk-server/main.go @@ -26,7 +26,6 @@ import ( "agones.dev/agones/pkg/gameservers" "agones.dev/agones/pkg/sdk" "agones.dev/agones/pkg/util/runtime" - "github.com/sirupsen/logrus" "github.com/spf13/pflag" "github.com/spf13/viper" "golang.org/x/net/context" @@ -51,9 +50,9 @@ const ( healthFailureThresholdFlag = "health-failure-threshold" ) -func init() { - logrus.SetFormatter(&logrus.JSONFormatter{}) -} +var ( + logger = runtime.NewLoggerWithSource("main") +) func main() { viper.SetDefault(localFlag, false) @@ -92,7 +91,7 @@ func main() { healthInitialDelay := time.Duration(viper.GetInt64(healthInitialDelayFlag)) * time.Second healthFailureThreshold := viper.GetInt64(healthFailureThresholdFlag) - logrus.WithField(localFlag, isLocal).WithField("version", pkg.Version). + logger.WithField(localFlag, isLocal).WithField("version", pkg.Version). WithField("port", port).WithField(addressFlag, address). WithField(healthDisabledFlag, healthDisabled).WithField(healthTimeoutFlag, healthTimeout). WithField(healthFailureThresholdFlag, healthFailureThreshold). @@ -100,7 +99,7 @@ func main() { lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", address, port)) if err != nil { - logrus.WithField("port", port).WithField("address", address).Fatalf("Could not listen on port") + logger.WithField("port", port).WithField("address", address).Fatalf("Could not listen on port") } grpcServer := grpc.NewServer() @@ -109,24 +108,24 @@ func main() { } else { config, err := rest.InClusterConfig() if err != nil { - logrus.WithError(err).Fatal("Could not create in cluster config") + logger.WithError(err).Fatal("Could not create in cluster config") } kubeClient, err := kubernetes.NewForConfig(config) if err != nil { - logrus.WithError(err).Fatal("Could not create the kubernetes clientset") + logger.WithError(err).Fatal("Could not create the kubernetes clientset") } agonesClient, err := versioned.NewForConfig(config) if err != nil { - logrus.WithError(err).Fatalf("Could not create the agones api clientset") + logger.WithError(err).Fatalf("Could not create the agones api clientset") } var s *gameservers.SDKServer s, err = gameservers.NewSDKServer(viper.GetString(gameServerNameEnv), viper.GetString(podNamespaceEnv), healthDisabled, healthTimeout, healthFailureThreshold, healthInitialDelay, kubeClient, agonesClient) if err != nil { - logrus.WithError(err).Fatalf("Could not start sidecar") + logger.WithError(err).Fatalf("Could not start sidecar") } ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -137,6 +136,6 @@ func main() { err = grpcServer.Serve(lis) if err != nil { - logrus.WithError(err).Error("Could not serve grpc server") + logger.WithError(err).Error("Could not serve grpc server") } } diff --git a/pkg/apis/stable/v1alpha1/types_test.go b/pkg/apis/stable/v1alpha1/types_test.go index 264fb148d1..97e52c4dea 100644 --- a/pkg/apis/stable/v1alpha1/types_test.go +++ b/pkg/apis/stable/v1alpha1/types_test.go @@ -18,7 +18,6 @@ import ( "testing" "agones.dev/agones/pkg/apis/stable" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -218,7 +217,6 @@ func TestGameServerPod(t *testing.T) { assert.Equal(t, fixture.Spec.HostPort, pod.Spec.Containers[0].Ports[0].HostPort) assert.Equal(t, fixture.Spec.ContainerPort, pod.Spec.Containers[0].Ports[0].ContainerPort) assert.Equal(t, corev1.Protocol("UDP"), pod.Spec.Containers[0].Ports[0].Protocol) - logrus.SetFormatter(&logrus.JSONFormatter{}) assert.True(t, metav1.IsControlledBy(pod, fixture)) sidecar := corev1.Container{Name: "sidecar", Image: "container/sidecar"} diff --git a/pkg/gameservers/controller.go b/pkg/gameservers/controller.go index f4267f2c67..8e291e2cd1 100644 --- a/pkg/gameservers/controller.go +++ b/pkg/gameservers/controller.go @@ -57,6 +57,7 @@ var ( // Controller is a the main GameServer crd controller type Controller struct { + logger *logrus.Entry sidecarImage string alwaysPullSidecarImage bool crdGetter v1beta1.CustomResourceDefinitionInterface @@ -90,11 +91,6 @@ func NewController( gameServers := agonesInformerFactory.Stable().V1alpha1().GameServers() gsInformer := gameServers.Informer() - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(logrus.Infof) - eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "gameserver-controller"}) - c := &Controller{ sidecarImage: sidecarImage, alwaysPullSidecarImage: alwaysPullSidecarImage, @@ -108,9 +104,15 @@ func NewController( queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), stable.GroupName+".GameServerController"), portAllocator: NewPortAllocator(minPort, maxPort, kubeInformerFactory, agonesInformerFactory), healthController: NewHealthController(kubeClient, agonesClient, kubeInformerFactory, agonesInformerFactory), - recorder: recorder, } + c.logger = runtime.NewLoggerWithType(c) + + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(c.logger.Infof) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) + c.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "gameserver-controller"}) + wh.AddHandler("/mutate", stablev1alpha1.Kind("GameServer"), admv1beta1.Create, c.creationHandler) gsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -143,7 +145,7 @@ func NewController( mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { _, err := w.Write([]byte("ok")) if err != nil { - logrus.WithError(err).Error("could not send ok response on healthz") + c.logger.WithError(err).Error("could not send ok response on healthz") w.WriteHeader(http.StatusInternalServerError) } }) @@ -160,7 +162,7 @@ func NewController( // the default values on the GameServer, and validates the results // Should only be called on gameserver create operations. func (c *Controller) creationHandler(review admv1beta1.AdmissionReview) (admv1beta1.AdmissionReview, error) { - logrus.WithField("review", review).Info("creationHandler") + c.logger.WithField("review", review).Info("creationHandler") obj := review.Request.Object gs := &stablev1alpha1.GameServer{} @@ -188,7 +190,7 @@ func (c *Controller) creationHandler(review admv1beta1.AdmissionReview) (admv1be Details: &details, } - logrus.WithField("review", review).Info("Invalid GameServer") + c.logger.WithField("review", review).Info("Invalid GameServer") return review, nil } @@ -207,7 +209,7 @@ func (c *Controller) creationHandler(review admv1beta1.AdmissionReview) (admv1be return review, errors.Wrapf(err, "error creating json for patch for GameServer %s", gs.ObjectMeta.Name) } - logrus.WithField("gs", gs.ObjectMeta.Name).WithField("patch", string(json)).Infof("patch created!") + c.logger.WithField("gs", gs.ObjectMeta.Name).WithField("patch", string(json)).Infof("patch created!") pt := admv1beta1.PatchTypeJSONPatch review.Response.PatchType = &pt @@ -221,14 +223,14 @@ func (c *Controller) creationHandler(review admv1beta1.AdmissionReview) (admv1be func (c Controller) Run(threadiness int, stop <-chan struct{}) error { defer c.queue.ShutDown() - logrus.Info("Starting health check...") + c.logger.Info("Starting health check...") go func() { if err := c.server.ListenAndServe(); err != nil { if err == http.ErrServerClosed { - logrus.WithError(err).Info("health check: http server closed") + c.logger.WithError(err).Info("health check: http server closed") } else { err := errors.Wrap(err, "Could not listen on :8080") - runtime.HandleError(logrus.WithError(err), err) + runtime.HandleError(c.logger.WithError(err), err) } } }() @@ -239,7 +241,7 @@ func (c Controller) Run(threadiness int, stop <-chan struct{}) error { return err } - logrus.Info("Wait for cache sync") + c.logger.Info("Wait for cache sync") if !cache.WaitForCacheSync(stop, c.gameServerSynced) { return errors.New("failed to wait for caches to sync") } @@ -252,7 +254,7 @@ func (c Controller) Run(threadiness int, stop <-chan struct{}) error { // Run the Health Controller go c.healthController.Run(stop) - logrus.Info("Starting workers...") + c.logger.Info("Starting workers...") for i := 0; i < threadiness; i++ { go wait.Until(c.runWorker, time.Second, stop) } @@ -269,7 +271,7 @@ func (c Controller) enqueueGameServer(obj interface{}) { var err error if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { err := errors.Wrap(err, "Error creating key for object") - runtime.HandleError(logrus.WithField("obj", obj), err) + runtime.HandleError(c.logger.WithField("obj", obj), err) return } c.queue.AddRateLimited(key) @@ -290,12 +292,12 @@ func (c *Controller) processNextWorkItem() bool { } defer c.queue.Done(obj) - logrus.WithField("obj", obj).Info("Processing obj") + c.logger.WithField("obj", obj).Info("Processing obj") var key string var ok bool if key, ok = obj.(string); !ok { - runtime.HandleError(logrus.WithField("obj", obj), errors.Errorf("expected string in queue, but got %T", obj)) + runtime.HandleError(c.logger.WithField("obj", obj), errors.Errorf("expected string in queue, but got %T", obj)) // this is a bad entry, we don't want to reprocess c.queue.Forget(obj) return true @@ -303,7 +305,7 @@ func (c *Controller) processNextWorkItem() bool { if err := c.syncHandler(key); err != nil { // we don't forget here, because we want this to be retried via the queue - runtime.HandleError(logrus.WithField("obj", obj), err) + runtime.HandleError(c.logger.WithField("obj", obj), err) c.queue.AddRateLimited(obj) return true } @@ -315,20 +317,20 @@ func (c *Controller) processNextWorkItem() bool { // syncGameServer synchronises the Pods for the GameServers. // and reacts to status changes that can occur through the client SDK func (c *Controller) syncGameServer(key string) error { - logrus.WithField("key", key).Info("Synchronising") + c.logger.WithField("key", key).Info("Synchronising") // Convert the namespace/name string into a distinct namespace and name namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { // don't return an error, as we don't want this retried - runtime.HandleError(logrus.WithField("key", key), errors.Wrapf(err, "invalid resource key")) + runtime.HandleError(c.logger.WithField("key", key), errors.Wrapf(err, "invalid resource key")) return nil } gs, err := c.gameServerLister.GameServers(namespace).Get(name) if err != nil { if k8serrors.IsNotFound(err) { - logrus.WithField("key", key).Info("GameServer is no longer available for syncing") + c.logger.WithField("key", key).Info("GameServer is no longer available for syncing") return nil } return errors.Wrapf(err, "error retrieving GameServer %s from namespace %s", name, namespace) @@ -362,14 +364,14 @@ func (c *Controller) syncGameServerDeletionTimestamp(gs *stablev1alpha1.GameServ return gs, nil } - logrus.WithField("gs", gs).Info("Syncing with Deletion Timestamp") + c.logger.WithField("gs", gs).Info("Syncing with Deletion Timestamp") pods, err := c.listGameServerPods(gs) if err != nil { return gs, err } if len(pods) > 0 { - logrus.WithField("pods", pods).WithField("gsName", gs.ObjectMeta.Name).Info("Found pods, deleting") + c.logger.WithField("pods", pods).WithField("gsName", gs.ObjectMeta.Name).Info("Found pods, deleting") for _, p := range pods { err := c.podGetter.Pods(p.ObjectMeta.Namespace).Delete(p.ObjectMeta.Name, nil) if err != nil { @@ -389,7 +391,7 @@ func (c *Controller) syncGameServerDeletionTimestamp(gs *stablev1alpha1.GameServ } } gsCopy.ObjectMeta.Finalizers = fin - logrus.WithField("gs", gsCopy).Infof("No pods found, removing finalizer %s", stable.GroupName) + c.logger.WithField("gs", gsCopy).Infof("No pods found, removing finalizer %s", stable.GroupName) gs, err = c.gameServerGetter.GameServers(gsCopy.ObjectMeta.Namespace).Update(gsCopy) return gs, errors.Wrapf(err, "error removing finalizer for GameServer %s", gsCopy.ObjectMeta.Name) } @@ -410,7 +412,7 @@ func (c *Controller) syncGameServerPortAllocationState(gs *stablev1alpha1.GameSe gsCopy.Status.State = stablev1alpha1.Creating c.recorder.Event(gs, corev1.EventTypeNormal, string(gs.Status.State), "Port allocated") - logrus.WithField("gs", gsCopy).Info("Syncing Port Allocation State") + c.logger.WithField("gs", gsCopy).Info("Syncing Port Allocation State") gs, err = c.gameServerGetter.GameServers(gs.ObjectMeta.Namespace).Update(gsCopy) if err != nil { // if the GameServer doesn't get updated with the port data, then put the port @@ -429,7 +431,7 @@ func (c *Controller) syncGameServerCreatingState(gs *stablev1alpha1.GameServer) return gs, nil } - logrus.WithField("gs", gs).Info("Syncing Create State") + c.logger.WithField("gs", gs).Info("Syncing Create State") // Maybe something went wrong, and the pod was created, but the state was never moved to Starting, so let's check ret, err := c.listGameServerPods(gs) @@ -443,17 +445,17 @@ func (c *Controller) syncGameServerCreatingState(gs *stablev1alpha1.GameServer) // this shouldn't happen, but if it does. if err != nil { - logrus.WithField("gameserver", gs).WithError(err).Error("error creating pod from Game Server") + c.logger.WithField("gameserver", gs).WithError(err).Error("error creating pod from Game Server") return c.moveToErrorState(gs, err.Error()) } c.addGameServerHealthCheck(gs, pod) - logrus.WithField("pod", pod).Info("creating Pod for GameServer") + c.logger.WithField("pod", pod).Info("creating Pod for GameServer") pod, err = c.podGetter.Pods(gs.ObjectMeta.Namespace).Create(pod) if err != nil { if k8serrors.IsInvalid(err) { - logrus.WithField("pod", pod).WithField("gameserver", gs).Errorf("Pod created is invalid") + c.logger.WithField("pod", pod).WithField("gameserver", gs).Errorf("Pod created is invalid") return c.moveToErrorState(gs, err.Error()) } return gs, errors.Wrapf(err, "error creating Pod for GameServer %s", gs.Name) @@ -513,9 +515,7 @@ func (c *Controller) sidecar(gs *stablev1alpha1.GameServer) corev1.Container { func (c *Controller) addGameServerHealthCheck(gs *stablev1alpha1.GameServer, pod *corev1.Pod) { if !gs.Spec.Health.Disabled { for i, c := range pod.Spec.Containers { - logrus.WithField("c", c.Name).WithField("Container", gs.Spec.Container).Info("Checking name and container") if c.Name == gs.Spec.Container { - logrus.WithField("liveness", c.LivenessProbe).Info("Found container") if c.LivenessProbe == nil { c.LivenessProbe = &corev1.Probe{ Handler: corev1.Handler{ @@ -528,7 +528,6 @@ func (c *Controller) addGameServerHealthCheck(gs *stablev1alpha1.GameServer, pod PeriodSeconds: gs.Spec.Health.PeriodSeconds, FailureThreshold: gs.Spec.Health.FailureThreshold, } - logrus.WithField("container", c).WithField("pod", pod).Info("Final pod") pod.Spec.Containers[i] = c } break @@ -546,7 +545,7 @@ func (c *Controller) syncGameServerRequestReadyState(gs *stablev1alpha1.GameServ return gs, nil } - logrus.WithField("gs", gs).Info("Syncing RequestReady State") + c.logger.WithField("gs", gs).Info("Syncing RequestReady State") pod, err := c.gameServerPod(gs) if err != nil { @@ -580,7 +579,7 @@ func (c *Controller) syncGameServerShutdownState(gs *stablev1alpha1.GameServer) return nil } - logrus.WithField("gs", gs).Info("Syncing Shutdown State") + c.logger.WithField("gs", gs).Info("Syncing Shutdown State") // Do it in the foreground, so the GameServer gets killed last p := metav1.DeletePropagationForeground err := c.gameServerGetter.GameServers(gs.ObjectMeta.Namespace).Delete(gs.ObjectMeta.Name, &metav1.DeleteOptions{PropagationPolicy: &p}) @@ -659,7 +658,7 @@ func (c Controller) Address(pod *corev1.Pod) (string, error) { } // minikube only has an InternalIP on a Node, so we'll fall back to that. - logrus.WithField("node", node.ObjectMeta.Name).Warn("Could not find ExternalIP. Falling back to Internal") + c.logger.WithField("node", node.ObjectMeta.Name).Warn("Could not find ExternalIP. Falling back to Internal") for _, a := range node.Status.Addresses { if a.Type == corev1.NodeInternalIP { return a.Address, nil @@ -682,7 +681,7 @@ func (c Controller) waitForEstablishedCRD() error { switch cond.Type { case apiv1beta1.Established: if cond.Status == apiv1beta1.ConditionTrue { - logrus.WithField("crd", crd).Info("GameServer custom resource definition is established") + c.logger.WithField("crd", crd).Info("GameServer custom resource definition is established") return true, err } } diff --git a/pkg/gameservers/health.go b/pkg/gameservers/health.go index 76592baaaf..14bf46f9d3 100644 --- a/pkg/gameservers/health.go +++ b/pkg/gameservers/health.go @@ -45,6 +45,7 @@ import ( // an Unhealthy state if the GameServer main container exits when in // a Ready state type HealthController struct { + logger *logrus.Entry podSynced cache.InformerSynced podLister corelisterv1.PodLister gameServerGetter getterv1alpha1.GameServersGetter @@ -66,8 +67,10 @@ func NewHealthController(kubeClient kubernetes.Interface, agonesClient versioned queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), stable.GroupName+".HealthController"), } + hc.logger = runtime.NewLoggerWithType(hc) + eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(logrus.Infof) + eventBroadcaster.StartLogging(hc.logger.Infof) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) hc.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "health-controller"}) @@ -77,7 +80,7 @@ func NewHealthController(kubeClient kubernetes.Interface, agonesClient versioned if owner := metav1.GetControllerOf(pod); owner != nil && owner.Kind == "GameServer" { if v1alpha1.GameServerRolePodSelector.Matches(labels.Set(pod.Labels)) && hc.failedContainer(pod) { key := pod.ObjectMeta.Namespace + "/" + owner.Name - logrus.WithField("key", key).Info("GameServer container has terminated") + hc.logger.WithField("key", key).Info("GameServer container has terminated") hc.enqueueGameServer(key) } } @@ -109,10 +112,10 @@ func (hc *HealthController) enqueueGameServer(key string) { func (hc *HealthController) Run(stop <-chan struct{}) { defer hc.queue.ShutDown() - logrus.Info("Starting health worker") + hc.logger.Info("Starting worker") go wait.Until(hc.runWorker, time.Second, stop) <-stop - logrus.Info("Shut down health worker") + hc.logger.Info("Shut down worker") } // runWorker is a long-running function that will continually call the @@ -130,12 +133,12 @@ func (hc *HealthController) processNextWorkItem() bool { } defer hc.queue.Done(obj) - logrus.WithField("obj", obj).Info("Processing obj") + hc.logger.WithField("obj", obj).Info("Processing obj") var key string var ok bool if key, ok = obj.(string); !ok { - runtime.HandleError(logrus.WithField("obj", obj), errors.Errorf("expected string in queue, but got %T", obj)) + runtime.HandleError(hc.logger.WithField("obj", obj), errors.Errorf("expected string in queue, but got %T", obj)) // this is a bad entry, we don't want to reprocess hc.queue.Forget(obj) return true @@ -143,7 +146,7 @@ func (hc *HealthController) processNextWorkItem() bool { if err := hc.syncGameServer(key); err != nil { // we don't forget here, because we want this to be retried via the queue - runtime.HandleError(logrus.WithField("obj", obj), err) + runtime.HandleError(hc.logger.WithField("obj", obj), err) hc.queue.AddRateLimited(obj) return true } @@ -154,27 +157,27 @@ func (hc *HealthController) processNextWorkItem() bool { // syncGameServer sets the GameSerer to Unhealthy, if its state is Ready func (hc *HealthController) syncGameServer(key string) error { - logrus.WithField("key", key).Info("Synchronising") + hc.logger.WithField("key", key).Info("Synchronising") // Convert the namespace/name string into a distinct namespace and name namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { // don't return an error, as we don't want this retried - runtime.HandleError(logrus.WithField("key", key), errors.Wrapf(err, "invalid resource key")) + runtime.HandleError(hc.logger.WithField("key", key), errors.Wrapf(err, "invalid resource key")) return nil } gs, err := hc.gameServerLister.GameServers(namespace).Get(name) if err != nil { if k8serrors.IsNotFound(err) { - logrus.WithField("key", key).Info("GameServer is no longer available for syncing") + hc.logger.WithField("key", key).Info("GameServer is no longer available for syncing") return nil } return errors.Wrapf(err, "error retrieving GameServer %s from namespace %s", name, namespace) } if gs.Status.State == v1alpha1.Ready { - logrus.WithField("gs", gs).Infof("Marking GameServer as Unhealthy") + hc.logger.WithField("gs", gs).Infof("Marking GameServer as Unhealthy") gsCopy := gs.DeepCopy() gsCopy.Status.State = v1alpha1.Unhealthy diff --git a/pkg/gameservers/portallocator.go b/pkg/gameservers/portallocator.go index 4c79b3a812..67be8d1a57 100644 --- a/pkg/gameservers/portallocator.go +++ b/pkg/gameservers/portallocator.go @@ -42,6 +42,7 @@ type portAllocation map[int32]bool // The PortAllocator does not currently support mixing static portAllocations (or any pods with defined HostPort) // within the dynamic port range other than the ones it coordinates. type PortAllocator struct { + logger *logrus.Entry mutex sync.RWMutex portAllocations []portAllocation minPort int32 @@ -60,7 +61,6 @@ type PortAllocator struct { func NewPortAllocator(minPort, maxPort int32, kubeInformerFactory informers.SharedInformerFactory, agonesInformerFactory externalversions.SharedInformerFactory) *PortAllocator { - logrus.WithField("minPort", minPort).WithField("maxPort", maxPort).Info("Starting port allocator") v1 := kubeInformerFactory.Core().V1() nodes := v1.Nodes() @@ -77,14 +77,16 @@ func NewPortAllocator(minPort, maxPort int32, nodeInformer: nodes.Informer(), nodeSynced: nodes.Informer().HasSynced, } + pa.logger = runtime.NewLoggerWithType(pa) + pa.logger.WithField("minPort", minPort).WithField("maxPort", maxPort).Info("Starting") return pa } // Run sets up the current state of port allocations and // starts tracking Pod and Node changes func (pa *PortAllocator) Run(stop <-chan struct{}) error { - logrus.Info("Running Pod Allocator") + pa.logger.Info("Running") pa.gameServerInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ DeleteFunc: pa.syncDeleteGameServer, }) @@ -99,7 +101,7 @@ func (pa *PortAllocator) Run(stop <-chan struct{}) error { err := pa.syncPortAllocations(stop) if err != nil { err := errors.Wrap(err, "error resetting ports on node update") - runtime.HandleError(logrus.WithField("node", newNode), err) + runtime.HandleError(pa.logger.WithField("node", newNode), err) } } }, @@ -107,12 +109,12 @@ func (pa *PortAllocator) Run(stop <-chan struct{}) error { err := pa.syncPortAllocations(stop) if err != nil { err := errors.Wrap(err, "error on node deletion") - runtime.HandleError(logrus.WithField("node", obj), err) + runtime.HandleError(pa.logger.WithField("node", obj), err) } }, }) - logrus.Info("Flush cache sync, before syncing gameserver and node state") + pa.logger.Info("Flush cache sync, before syncing gameserver and node state") if !cache.WaitForCacheSync(stop, pa.gameServerSynced, pa.nodeSynced) { return nil } @@ -150,7 +152,7 @@ func (pa *PortAllocator) syncAddNode(obj interface{}) { defer pa.mutex.Unlock() node := obj.(*corev1.Node) - logrus.WithField("node", node.ObjectMeta.Name).Info("Adding Node to port allocations") + pa.logger.WithField("node", node.ObjectMeta.Name).Info("Adding Node to port allocations") ports := portAllocation{} for i := pa.minPort; i <= pa.maxPort; i++ { @@ -164,7 +166,7 @@ func (pa *PortAllocator) syncAddNode(obj interface{}) { // make the HostPort available func (pa *PortAllocator) syncDeleteGameServer(object interface{}) { gs := object.(*v1alpha1.GameServer) - logrus.WithField("gs", gs).Info("syncing deleted GameServer") + pa.logger.WithField("gs", gs).Info("syncing deleted GameServer") pa.DeAllocate(gs.Spec.HostPort) } @@ -178,7 +180,7 @@ func (pa *PortAllocator) syncPortAllocations(stop <-chan struct{}) error { pa.mutex.Lock() defer pa.mutex.Unlock() - logrus.Info("Resetting Port Allocation") + pa.logger.Info("Resetting Port Allocation") if !cache.WaitForCacheSync(stop, pa.gameServerSynced, pa.nodeSynced) { return nil diff --git a/pkg/gameservers/sdkserver.go b/pkg/gameservers/sdkserver.go index c8ae690f13..698066256c 100644 --- a/pkg/gameservers/sdkserver.go +++ b/pkg/gameservers/sdkserver.go @@ -46,6 +46,7 @@ var _ sdk.SDKServer = &SDKServer{} // SDKServer is a gRPC server, that is meant to be a sidecar // for a GameServer that will update the game server status on SDK requests type SDKServer struct { + logger *logrus.Entry gameServerName string namespace string gameServerGetter typedv1alpha1.GameServersGetter @@ -68,18 +69,6 @@ func NewSDKServer(gameServerName, namespace string, kubeClient kubernetes.Interface, agonesClient versioned.Interface) (*SDKServer, error) { mux := http.NewServeMux() - mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { - _, err := w.Write([]byte("ok")) - if err != nil { - logrus.WithError(err).Error("could not send ok response on healthz") - w.WriteHeader(http.StatusInternalServerError) - } - }) - - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(logrus.Infof) - eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "gameserver-sidecar"}) s := &SDKServer{ gameServerName: gameServerName, @@ -95,14 +84,27 @@ func NewSDKServer(gameServerName, namespace string, healthTimeout: healthTimeout, healthMutex: sync.RWMutex{}, healthFailureCount: 0, - recorder: recorder, } + s.logger = runtime.NewLoggerWithType(s) + + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(s.logger.Infof) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) + s.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "gameserver-sidecar"}) + + mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { + _, err := w.Write([]byte("ok")) + if err != nil { + s.logger.WithError(err).Error("could not send ok response on healthz") + w.WriteHeader(http.StatusInternalServerError) + } + }) mux.HandleFunc("/gshealthz", func(w http.ResponseWriter, r *http.Request) { if s.healthy() { _, err := w.Write([]byte("ok")) if err != nil { - logrus.WithError(err).Error("could not send ok response on gshealthz") + s.logger.WithError(err).Error("could not send ok response on gshealthz") w.WriteHeader(http.StatusInternalServerError) } } else { @@ -113,7 +115,7 @@ func NewSDKServer(gameServerName, namespace string, s.initHealthLastUpdated(healthInitialDelay) s.queue = s.newWorkQueue() - logrus.WithField("gameServerName", s.gameServerName).WithField("namespace", s.namespace).Info("created GameServer sidecar") + s.logger.WithField("gameServerName", s.gameServerName).WithField("namespace", s.namespace).Info("created GameServer sidecar") return s, nil } @@ -134,28 +136,28 @@ func (s *SDKServer) newWorkQueue() workqueue.RateLimitingInterface { func (s *SDKServer) Run(stop <-chan struct{}) { defer s.queue.ShutDown() - logrus.Info("Starting SDKServer http health check...") + s.logger.Info("Starting SDKServer http health check...") go func() { if err := s.server.ListenAndServe(); err != nil { if err == http.ErrServerClosed { - logrus.WithError(err).Info("health check: http server closed") + s.logger.WithError(err).Info("health check: http server closed") } else { err := errors.Wrap(err, "Could not listen on :8080") - runtime.HandleError(logrus.WithError(err), err) + runtime.HandleError(s.logger.WithError(err), err) } } }() defer s.server.Close() // nolint: errcheck if !s.healthDisabled { - logrus.Info("Starting GameServer health checking") + s.logger.Info("Starting GameServer health checking") go wait.Until(s.runHealth, s.healthTimeout, stop) } - logrus.Info("Starting worker") + s.logger.Info("Starting worker") go wait.Until(s.runWorker, time.Second, stop) <-stop - logrus.Info("Shut down workers and health checking") + s.logger.Info("Shut down workers and health checking") } // runWorker is a long-running function that will continually call the @@ -173,12 +175,12 @@ func (s *SDKServer) processNextWorkItem() bool { } defer s.queue.Done(obj) - logrus.WithField("obj", obj).Info("Processing obj") + s.logger.WithField("obj", obj).Info("Processing obj") var state stablev1alpha1.State var ok bool if state, ok = obj.(stablev1alpha1.State); !ok { - runtime.HandleError(logrus.WithField("obj", obj), errors.Errorf("expected State in queue, but got %T", obj)) + runtime.HandleError(s.logger.WithField("obj", obj), errors.Errorf("expected State in queue, but got %T", obj)) // this is a bad entry, we don't want to reprocess s.queue.Forget(obj) return true @@ -186,7 +188,7 @@ func (s *SDKServer) processNextWorkItem() bool { if err := s.updateState(state); err != nil { // we don't forget here, because we want this to be retried via the queue - runtime.HandleError(logrus.WithField("obj", obj), err) + runtime.HandleError(s.logger.WithField("obj", obj), err) s.queue.AddRateLimited(obj) return true } @@ -198,7 +200,7 @@ func (s *SDKServer) processNextWorkItem() bool { // updateState sets the GameServer Status's state to the state // that has been passed through func (s *SDKServer) updateState(state stablev1alpha1.State) error { - logrus.WithField("state", state).Info("Updating state") + s.logger.WithField("state", state).Info("Updating state") gameServers := s.gameServerGetter.GameServers(s.namespace) gs, err := gameServers.Get(s.gameServerName, metav1.GetOptions{}) if err != nil { @@ -207,7 +209,7 @@ func (s *SDKServer) updateState(state stablev1alpha1.State) error { // if the state is currently unhealthy, you can't go back to Ready if gs.Status.State == stablev1alpha1.Unhealthy { - logrus.Info("State already unhealthy. Skipping update.") + s.logger.Info("State already unhealthy. Skipping update.") return nil } @@ -225,7 +227,7 @@ func (s *SDKServer) updateState(state stablev1alpha1.State) error { // Ready enters the RequestReady state change for this GameServer into // the workqueue so it can be updated func (s *SDKServer) Ready(ctx context.Context, e *sdk.Empty) (*sdk.Empty, error) { - logrus.Info("Received Ready request, adding to queue") + s.logger.Info("Received Ready request, adding to queue") s.queue.AddRateLimited(stablev1alpha1.RequestReady) return e, nil } @@ -233,7 +235,7 @@ func (s *SDKServer) Ready(ctx context.Context, e *sdk.Empty) (*sdk.Empty, error) // Shutdown enters the Shutdown state change for this GameServer into // the workqueue so it can be updated func (s *SDKServer) Shutdown(ctx context.Context, e *sdk.Empty) (*sdk.Empty, error) { - logrus.Info("Received Shutdown request, adding to queue") + s.logger.Info("Received Shutdown request, adding to queue") s.queue.AddRateLimited(stablev1alpha1.Shutdown) return e, nil } @@ -244,13 +246,13 @@ func (s *SDKServer) Health(stream sdk.SDK_HealthServer) error { for { _, err := stream.Recv() if err == io.EOF { - logrus.Info("Health stream closed.") + s.logger.Info("Health stream closed.") return stream.SendAndClose(&sdk.Empty{}) } if err != nil { return errors.Wrap(err, "Error with Health check") } - logrus.Info("Health Ping Received") + s.logger.Info("Health Ping Received") s.touchHealthLastUpdated() } } @@ -261,7 +263,7 @@ func (s *SDKServer) Health(stream sdk.SDK_HealthServer) error { func (s *SDKServer) runHealth() { s.checkHealth() if !s.healthy() { - logrus.WithField("gameServerName", s.gameServerName).Info("being marked as not healthy") + s.logger.WithField("gameServerName", s.gameServerName).Info("being marked as not healthy") s.queue.AddRateLimited(stablev1alpha1.Unhealthy) } } @@ -276,7 +278,7 @@ func (s *SDKServer) touchHealthLastUpdated() { } // checkHealth checks the healthLastUpdated value -// and if it is outside the timeout value, log and +// and if it is outside the timeout value, logger and // count a failure func (s *SDKServer) checkHealth() { timeout := s.healthLastUpdated.Add(s.healthTimeout) @@ -284,7 +286,7 @@ func (s *SDKServer) checkHealth() { s.healthMutex.Lock() defer s.healthMutex.Unlock() s.healthFailureCount++ - logrus.WithField("failureCount", s.healthFailureCount).Infof("GameServer Health Check failed") + s.logger.WithField("failureCount", s.healthFailureCount).Infof("GameServer Health Check failed") } } diff --git a/pkg/util/runtime/runtime.go b/pkg/util/runtime/runtime.go index a8afa350db..433f2caef8 100644 --- a/pkg/util/runtime/runtime.go +++ b/pkg/util/runtime/runtime.go @@ -24,6 +24,8 @@ import ( "k8s.io/apimachinery/pkg/util/runtime" ) +const sourceKey = "source" + // stackTracer is the pkg/errors stacktrace interface type stackTracer interface { StackTrace() errors.StackTrace @@ -31,6 +33,8 @@ type stackTracer interface { // replace the standard glog error logger, with a logrus one func init() { + logrus.SetFormatter(&logrus.JSONFormatter{}) + runtime.ErrorHandlers[0] = func(err error) { if stackTrace, ok := err.(stackTracer); ok { var stack []string @@ -58,3 +62,14 @@ func Must(err error) { panic(err) } } + +// NewLoggerWithSource returns a logrus.Entry to use when you want to specify an source +func NewLoggerWithSource(source string) *logrus.Entry { + return logrus.WithField(sourceKey, source) +} + +// NewLoggerWithType returns a logrus.Entry to use when you want to use a data type as the source +// such as when you have a struct with methods +func NewLoggerWithType(obj interface{}) *logrus.Entry { + return NewLoggerWithSource(fmt.Sprintf("%T", obj)) +} diff --git a/pkg/util/webhooks/webhooks.go b/pkg/util/webhooks/webhooks.go index 52116c1af9..de251ee695 100644 --- a/pkg/util/webhooks/webhooks.go +++ b/pkg/util/webhooks/webhooks.go @@ -34,6 +34,7 @@ type Server interface { // WebHook manage Kubernetes webhooks type WebHook struct { + logger *logrus.Entry mux *http.ServeMux server Server certFile string @@ -60,13 +61,16 @@ func NewWebHook(certFile, keyFile string) *WebHook { Handler: mux, } - return &WebHook{ + wh := &WebHook{ mux: mux, server: &server, certFile: certFile, keyFile: keyFile, handlers: map[string][]operationHandler{}, } + wh.logger = runtime.NewLoggerWithType(wh) + + return wh } // Run runs the webhook server, starting a https listener. @@ -77,11 +81,11 @@ func (wh *WebHook) Run(stop <-chan struct{}) error { wh.server.Close() // nolint: errcheck }() - logrus.WithField("webook", wh).Infof("webhook: https server started") + wh.logger.WithField("webook", wh).Infof("https server started") err := wh.server.ListenAndServeTLS(wh.certFile, wh.keyFile) if err == http.ErrServerClosed { - logrus.WithError(err).Info("webhook: https server closed") + wh.logger.WithError(err).Info("https server closed") return nil } @@ -94,18 +98,18 @@ func (wh *WebHook) AddHandler(path string, gk schema.GroupKind, op v1beta1.Opera wh.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { err := wh.handle(path, w, r) if err != nil { - runtime.HandleError(logrus.WithField("url", r.URL), err) + runtime.HandleError(wh.logger.WithField("url", r.URL), err) w.WriteHeader(http.StatusInternalServerError) } }) } - logrus.WithField("path", path).WithField("groupKind", gk).WithField("op", op).Info("Added webhook handler") + wh.logger.WithField("path", path).WithField("groupKind", gk).WithField("op", op).Info("Added webhook handler") wh.handlers[path] = append(wh.handlers[path], operationHandler{groupKind: gk, operation: op, handler: h}) } // handle Handles http requests for webhooks func (wh *WebHook) handle(path string, w http.ResponseWriter, r *http.Request) error { - logrus.WithField("path", path).Info("running webhook") + wh.logger.WithField("path", path).Info("running webhook") var review v1beta1.AdmissionReview err := json.NewDecoder(r.Body).Decode(&review)