diff --git a/app/app_test.go b/app/app_test.go index 5c64272eb5..4a5be14f7d 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -18,6 +18,7 @@ import ( "github.com/facebookgo/inject" "github.com/facebookgo/startstop" + "github.com/jonboulle/clockwork" "github.com/klauspost/compress/zstd" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -28,6 +29,7 @@ import ( "github.com/honeycombio/libhoney-go/transmission" "github.com/honeycombio/refinery/collect" "github.com/honeycombio/refinery/config" + "github.com/honeycombio/refinery/internal/health" "github.com/honeycombio/refinery/internal/peer" "github.com/honeycombio/refinery/logger" "github.com/honeycombio/refinery/metrics" @@ -189,6 +191,8 @@ func newStartedApp( &inject.Object{Value: metricsr, Name: "peerMetrics"}, &inject.Object{Value: "test", Name: "version"}, &inject.Object{Value: samplerFactory}, + &inject.Object{Value: &health.Health{}}, + &inject.Object{Value: clockwork.NewRealClock()}, &inject.Object{Value: &collect.MockStressReliever{}, Name: "stressRelief"}, &inject.Object{Value: &a}, ) diff --git a/cmd/refinery/main.go b/cmd/refinery/main.go index 103029a288..6a5310ad11 100644 --- a/cmd/refinery/main.go +++ b/cmd/refinery/main.go @@ -25,6 +25,7 @@ import ( "github.com/honeycombio/refinery/app" "github.com/honeycombio/refinery/collect" "github.com/honeycombio/refinery/config" + "github.com/honeycombio/refinery/internal/health" "github.com/honeycombio/refinery/internal/otelutil" "github.com/honeycombio/refinery/internal/peer" "github.com/honeycombio/refinery/logger" @@ -264,6 +265,7 @@ func main() { {Value: version, Name: "version"}, {Value: samplerFactory}, {Value: stressRelief, Name: "stressRelief"}, + {Value: health.Health{}}, {Value: &a}, } err = g.Provide(objects...) diff --git a/collect/stressRelief.go b/collect/stressRelief.go index 4dfc4b0914..02632d044e 100644 --- a/collect/stressRelief.go +++ b/collect/stressRelief.go @@ -8,6 +8,7 @@ import ( "github.com/dgryski/go-wyhash" "github.com/honeycombio/refinery/config" + "github.com/honeycombio/refinery/internal/health" "github.com/honeycombio/refinery/logger" "github.com/honeycombio/refinery/metrics" ) @@ -59,6 +60,7 @@ type StressRelief struct { minDuration time.Duration RefineryMetrics metrics.Metrics `inject:"metrics"` Logger logger.Logger `inject:""` + Health health.Recorder `inject:""` Done chan struct{} algorithms map[string]func(string, string) float64 @@ -66,10 +68,15 @@ type StressRelief struct { lock sync.RWMutex } +const StressReliefHealthKey = "stress_relief" + func (s *StressRelief) Start() error { s.Logger.Debug().Logf("Starting StressRelief system") defer func() { s.Logger.Debug().Logf("Finished starting StressRelief system") }() + // register with health + s.Health.Register(StressReliefHealthKey, 3*time.Second) + // We use an algorithms map so that we can name these algorithms, which makes it easier for several things: // - change our mind about which algorithm to use // - logging the algorithm actually used @@ -93,6 +100,7 @@ func (s *StressRelief) Start() error { } // start our monitor goroutine that periodically calls recalc + // and also reports that it's healthy go func(s *StressRelief) { tick := time.NewTicker(100 * time.Millisecond) defer tick.Stop() @@ -100,7 +108,9 @@ func (s *StressRelief) Start() error { select { case <-tick.C: s.Recalc() + s.Health.Ready(StressReliefHealthKey, true) case <-s.Done: + s.Health.Unregister(StressReliefHealthKey) s.Logger.Debug().Logf("Stopping StressRelief system") return } diff --git a/internal/health/health.go b/internal/health/health.go new file mode 100644 index 0000000000..96794388b5 --- /dev/null +++ b/internal/health/health.go @@ -0,0 +1,246 @@ +package health + +import ( + "sync" + "time" + + "github.com/facebookgo/startstop" + "github.com/honeycombio/refinery/logger" + "github.com/honeycombio/refinery/metrics" + "github.com/jonboulle/clockwork" +) + +// We need a Health object that can be used by: +// - internal subsystems to tell it their readiness to receive traffic +// - the router to read back that data for reporting when it receives a health or readiness request +// either on grpc or on http +// We want that object in its own package so we don't have import cycles + +// We register a subsystem with an expected interval for reporting and if it +// doesn't report for a time exceeding the duration of that interval, we will +// mark it (and the whole application) as unhealthy (not alive). A subsystem can +// also report that it is alive but not ready; when this happens, we will mark +// it as not ready and the system as a whole as not ready but still alive. This +// is useful during shutdown. + +// Subsystems will typically Register during their startup, and then call Ready +// frequently once they are ready to receive traffic. Note that Registration +// does not start the ticker -- it only starts once Ready is called for the +// first time. + +// Recorder is the interface used by object that want to record their own health +// status and make it available to the system. +type Recorder interface { + Register(subsystem string, timeout time.Duration) + Unregister(subsystem string) + Ready(subsystem string, ready bool) +} + +// Reporter is the interface that is used to read back the health status of the system. +type Reporter interface { + IsAlive() bool + IsReady() bool +} + +// TickerTime is the interval at which we will survey health of all of the +// subsystems. We will decrement the counters for each subsystem that has +// registered. If a counter reaches 0, we will mark the subsystem as dead. This +// value should generally be less than the duration of any reporting timeout in +// the system. +var TickerTime = 500 * time.Millisecond + +// The Health object is the main object that subsystems will interact with. When +// subsystems are registered, they will be expected to report in at least once +// every timeout interval. If they don't, they will be marked as not alive. +type Health struct { + Clock clockwork.Clock `inject:""` + Metrics metrics.Metrics `inject:"genericMetrics"` + Logger logger.Logger `inject:""` + timeouts map[string]time.Duration + timeLeft map[string]time.Duration + readies map[string]bool + alives map[string]bool + mut sync.RWMutex + done chan struct{} + startstop.Starter + startstop.Stopper + Recorder + Reporter +} + +func (h *Health) Start() error { + // if we don't have a logger or metrics object, we'll use the null ones (makes testing easier) + if h.Logger == nil { + h.Logger = &logger.NullLogger{} + } + if h.Metrics == nil { + h.Metrics = &metrics.NullMetrics{} + } + h.timeouts = make(map[string]time.Duration) + h.timeLeft = make(map[string]time.Duration) + h.readies = make(map[string]bool) + h.alives = make(map[string]bool) + h.done = make(chan struct{}) + go h.ticker() + return nil +} + +func (h *Health) Stop() error { + close(h.done) + return nil +} + +func (h *Health) ticker() { + tick := h.Clock.NewTicker(TickerTime) + for { + select { + case <-tick.Chan(): + h.mut.Lock() + for subsystem, timeLeft := range h.timeLeft { + // only decrement positive counters since 0 means we're dead + if timeLeft > 0 { + h.timeLeft[subsystem] -= TickerTime + if h.timeLeft[subsystem] < 0 { + h.timeLeft[subsystem] = 0 + } + } + } + h.mut.Unlock() + case <-h.done: + return + } + } +} + +// Register a subsystem with the health system. The timeout is the maximum +// expected interval between subsystem reports. If Ready is not called within +// that interval (beginning from the time of calling Ready for the first time), +// it (and the entire server) will be marked as not alive. +func (h *Health) Register(subsystem string, timeout time.Duration) { + h.mut.Lock() + defer h.mut.Unlock() + h.timeouts[subsystem] = timeout + h.readies[subsystem] = false + // we use a negative value to indicate that we haven't seen a report yet so + // we don't return "dead" immediately + h.timeLeft[subsystem] = -1 + fields := map[string]any{ + "source": subsystem, + "timeout": timeout, + } + h.Logger.Debug().WithFields(fields).Logf("Registered Health ticker", subsystem, timeout) + if timeout < TickerTime { + h.Logger.Error().WithFields(fields).Logf("Registering a timeout less than the ticker time") + } +} + +// Unregister a subsystem with the health system. This marks the subsystem as not +// ready and removes it from the alive tracking. It also means that it no longer +// needs to report in. If it does report in, the report will be ignored. +func (h *Health) Unregister(subsystem string) { + h.mut.Lock() + defer h.mut.Unlock() + delete(h.timeouts, subsystem) + delete(h.timeLeft, subsystem) + delete(h.alives, subsystem) + + // we don't remove it from readies, but we mark it as not ready; + // an unregistered subsystem can never be ready. + h.readies[subsystem] = false +} + +// Ready is called by subsystems with a flag to indicate their readiness to +// receive traffic. If any subsystem is not ready, the system as a whole is not +// ready. Even unready subsystems will be marked as alive as long as they report +// in. +func (h *Health) Ready(subsystem string, ready bool) { + h.mut.Lock() + defer h.mut.Unlock() + if _, ok := h.timeouts[subsystem]; !ok { + // if a subsystem has an entry in readies but not in timeouts, it means + // it had called Unregister but is still reporting in. This is not an error. + if _, ok := h.readies[subsystem]; !ok { + // but if it was never registered, it IS an error + h.Logger.Error().WithField("subsystem", subsystem).Logf("Health.Ready called for unregistered subsystem") + } + return + } + if h.readies[subsystem] != ready { + h.Logger.Info().WithFields(map[string]any{ + "subsystem": subsystem, + "ready": ready, + }).Logf("Health.Ready reporting subsystem changing state") + } + h.readies[subsystem] = ready + h.timeLeft[subsystem] = h.timeouts[subsystem] + if !h.alives[subsystem] { + h.alives[subsystem] = true + h.Logger.Info().WithField("subsystem", subsystem).Logf("Health.Ready reporting subsystem alive") + } + h.Metrics.Gauge("is_ready", h.checkReady()) + h.Metrics.Gauge("is_alive", h.checkAlive()) +} + +// IsAlive returns true if all registered subsystems are alive +func (h *Health) IsAlive() bool { + h.mut.Lock() + defer h.mut.Unlock() + return h.checkAlive() +} + +// checkAlive returns true if all registered subsystems are alive +// only call with a write lock held +func (h *Health) checkAlive() bool { + // if any counter is 0, we're dead + for subsystem, a := range h.timeLeft { + if a == 0 { + if h.alives[subsystem] { + h.Logger.Error().WithField("subsystem", subsystem).Logf("IsAlive: subsystem dead due to timeout") + h.alives[subsystem] = false + } + return false + } + } + return true +} + +// IsReady returns true if all registered subsystems are ready +func (h *Health) IsReady() bool { + h.mut.RLock() + defer h.mut.RUnlock() + return h.checkReady() +} + +// checkReady returns true if all registered subsystems are ready +// only call with the lock held +func (h *Health) checkReady() bool { + // if no one has registered yet, we're not ready + if len(h.readies) == 0 { + h.Logger.Debug().Logf("IsReady: no one has registered yet") + return false + } + + // if any counter is not positive, we're not ready + for subsystem, counter := range h.timeLeft { + if counter <= 0 { + h.Logger.Info().WithFields(map[string]any{ + "subsystem": subsystem, + "counter": counter, + }).Logf("Health.IsReady failed due to counter <= 0") + return false + } + } + + // if any registered subsystem is not ready, we're not ready + ready := true + for subsystem, r := range h.readies { + if !r { + h.Logger.Info().WithFields(map[string]any{ + "subsystem": subsystem, + "ready": ready, + }).Logf("Health.IsReady reporting subsystem not ready") + } + ready = ready && r + } + return ready +} diff --git a/internal/health/health_test.go b/internal/health/health_test.go new file mode 100644 index 0000000000..1b9f783f56 --- /dev/null +++ b/internal/health/health_test.go @@ -0,0 +1,145 @@ +package health + +import ( + "testing" + "time" + + "github.com/jonboulle/clockwork" + "github.com/stretchr/testify/assert" +) + +func TestHealthStartup(t *testing.T) { + // Create a new Health object + cl := clockwork.NewFakeClock() + h := &Health{ + Clock: cl, + } + // Start the Health object + h.Start() + + // at time 0 with no registrations, it should be alive and not ready + assert.True(t, h.IsAlive()) + assert.False(t, h.IsReady()) + // Stop the Health object + h.Stop() +} + +func TestHealthRegistrationNotReady(t *testing.T) { + // Create a new Health object + cl := clockwork.NewFakeClock() + h := &Health{ + Clock: cl, + } + // Start the Health object + h.Start() + // at time 0 with no registrations, it should be alive and not ready + assert.True(t, h.IsAlive()) + assert.False(t, h.IsReady()) + + // register a service that will never report in + h.Register("foo", 1500*time.Millisecond) + // now it should also be alive and not ready + assert.True(t, h.IsAlive()) + assert.False(t, h.IsReady()) + + // and even after the timeout, it should still be alive and not ready + for i := 0; i < 10; i++ { + cl.Advance(500 * time.Millisecond) + time.Sleep(1 * time.Millisecond) // give goroutines time to run + } + assert.True(t, h.IsAlive()) + assert.False(t, h.IsReady()) + // Stop the Health object + h.Stop() +} + +func TestHealthRegistrationAndReady(t *testing.T) { + // Create a new Health object + cl := clockwork.NewFakeClock() + h := &Health{ + Clock: cl, + } + // Start the Health object + h.Start() + // register a service + h.Register("foo", 1500*time.Millisecond) + cl.Advance(500 * time.Millisecond) + // Tell h we're ready + h.Ready("foo", true) + // now h should also be alive and ready + assert.True(t, h.IsAlive()) + assert.True(t, h.IsReady()) + + // make some periodic ready calls, it should stay alive and ready + for i := 0; i < 10; i++ { + h.Ready("foo", true) + cl.Advance(500 * time.Millisecond) + time.Sleep(1 * time.Millisecond) // give goroutines time to run + assert.True(t, h.IsAlive()) + assert.True(t, h.IsReady()) + } + + // now run for a bit with no ready calls, it should be dead and not ready + for i := 0; i < 10; i++ { + cl.Advance(500 * time.Millisecond) + time.Sleep(1 * time.Millisecond) // give goroutines time to run + } + assert.False(t, h.IsAlive()) + assert.False(t, h.IsReady()) + // Stop the Health object + h.Stop() +} + +func TestHealthReadyFalse(t *testing.T) { + // Create a new Health object + cl := clockwork.NewFakeClock() + h := &Health{ + Clock: cl, + } + // Start the Health object + h.Start() + // register a service + h.Register("foo", 1500*time.Millisecond) + h.Ready("foo", true) + + cl.Advance(500 * time.Millisecond) + time.Sleep(1 * time.Millisecond) // give goroutines time to run + assert.True(t, h.IsAlive()) + assert.True(t, h.IsReady()) + + // tell it we're not ready + h.Ready("foo", false) + cl.Advance(500 * time.Millisecond) + time.Sleep(1 * time.Millisecond) // give goroutines time to run + assert.True(t, h.IsAlive()) + assert.False(t, h.IsReady()) + // Stop the Health object + h.Stop() +} + +func TestNotReadyFromOneService(t *testing.T) { + // Create a new Health object + cl := clockwork.NewFakeClock() + h := &Health{ + Clock: cl, + } + // Start the Health object + h.Start() + h.Register("foo", 1500*time.Millisecond) + h.Register("bar", 1500*time.Millisecond) + h.Register("baz", 1500*time.Millisecond) + h.Ready("foo", true) + h.Ready("bar", true) + h.Ready("baz", true) + assert.True(t, h.IsAlive()) + assert.True(t, h.IsReady()) + + // make bar not ready + h.Ready("bar", false) + cl.Advance(500 * time.Millisecond) + time.Sleep(1 * time.Millisecond) // give goroutines time to run + assert.True(t, h.IsAlive()) + assert.False(t, h.IsReady()) + // Stop the Health object + h.Stop() +} diff --git a/route/route.go b/route/route.go index f7166fd29b..ceeb74370f 100644 --- a/route/route.go +++ b/route/route.go @@ -23,6 +23,7 @@ import ( "github.com/pelletier/go-toml/v2" "github.com/vmihailenco/msgpack/v5" "google.golang.org/grpc" + healthserver "google.golang.org/grpc/health" "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" @@ -35,6 +36,7 @@ import ( "github.com/honeycombio/refinery/collect" "github.com/honeycombio/refinery/config" + "github.com/honeycombio/refinery/internal/health" "github.com/honeycombio/refinery/logger" "github.com/honeycombio/refinery/metrics" "github.com/honeycombio/refinery/sharder" @@ -59,6 +61,7 @@ const ( type Router struct { Config config.Config `inject:""` Logger logger.Logger `inject:""` + Health health.Reporter `inject:""` HTTPTransport *http.Transport `inject:"upstreamTransport"` UpstreamTransmission transmit.Transmission `inject:"upstreamTransmission"` PeerTransmission transmit.Transmission `inject:"peerTransmission"` @@ -84,8 +87,10 @@ type Router struct { server *http.Server grpcServer *grpc.Server doneWG sync.WaitGroup + donech chan struct{} environmentCache *environmentCache + hsrv *healthserver.Server } type BatchResponse struct { @@ -152,8 +157,8 @@ func (r *Router) LnS(incomingOrPeer string) { muxxer.Use(r.requestLogger) muxxer.Use(r.panicCatcher) - // answer a basic health check locally muxxer.HandleFunc("/alive", r.alive).Name("local health") + muxxer.HandleFunc("/ready", r.ready).Name("local readiness") muxxer.HandleFunc("/panic", r.panic).Name("intentional panic") muxxer.HandleFunc("/version", r.version).Name("report version info") @@ -209,6 +214,7 @@ func (r *Router) LnS(incomingOrPeer string) { IdleTimeout: r.Config.GetHTTPIdleTimeout(), } + r.donech = make(chan struct{}) if r.Config.GetGRPCEnabled() && len(grpcAddr) > 0 { l, err := net.Listen("tcp", grpcAddr) if err != nil { @@ -236,7 +242,11 @@ func (r *Router) LnS(incomingOrPeer string) { logsServer := NewLogsServer(r) collectorlogs.RegisterLogsServiceServer(r.grpcServer, logsServer) - grpc_health_v1.RegisterHealthServer(r.grpcServer, r) + // health check -- manufactured by grpc health package + r.hsrv = healthserver.NewServer() + grpc_health_v1.RegisterHealthServer(r.grpcServer, r.hsrv) + r.startGRPCHealthMonitor() + go r.grpcServer.Serve(l) } @@ -262,13 +272,35 @@ func (r *Router) Stop() error { if r.grpcServer != nil { r.grpcServer.GracefulStop() } + close(r.donech) r.doneWG.Wait() return nil } func (r *Router) alive(w http.ResponseWriter, req *http.Request) { - r.iopLogger.Debug().Logf("answered /x/alive check") - w.Write([]byte(`{"source":"refinery","alive":"yes"}`)) + r.iopLogger.Debug().Logf("answered /alive check") + + alive := r.Health.IsAlive() + r.Metrics.Gauge("is_alive", alive) + if !alive { + w.WriteHeader(http.StatusServiceUnavailable) + r.marshalToFormat(w, map[string]interface{}{"source": "refinery", "alive": "no"}, "json") + return + } + r.marshalToFormat(w, map[string]interface{}{"source": "refinery", "alive": "yes"}, "json") +} + +func (r *Router) ready(w http.ResponseWriter, req *http.Request) { + r.iopLogger.Debug().Logf("answered /ready check") + + ready := r.Health.IsReady() + r.Metrics.Gauge("is_ready", ready) + if !ready { + w.WriteHeader(http.StatusServiceUnavailable) + r.marshalToFormat(w, map[string]interface{}{"source": "refinery", "ready": "no"}, "json") + return + } + r.marshalToFormat(w, map[string]interface{}{"source": "refinery", "ready": "yes"}, "json") } func (r *Router) panic(w http.ResponseWriter, req *http.Request) { @@ -947,6 +979,46 @@ func (r *Router) Watch(req *grpc_health_v1.HealthCheckRequest, server grpc_healt }) } +// startGRPCHealthMonitor starts a goroutine that periodically checks the health of the system and updates the grpc health server +func (r *Router) startGRPCHealthMonitor() { + const ( + system = "" // empty string represents the generic health of the whole system (corresponds to "ready") + systemReady = "ready" + systemAlive = "alive" + ) + r.iopLogger.Debug().Logf("running grpc health monitor") + + setStatus := func(svc string, stat bool) { + if stat { + r.hsrv.SetServingStatus(svc, grpc_health_v1.HealthCheckResponse_SERVING) + } else { + r.hsrv.SetServingStatus(svc, grpc_health_v1.HealthCheckResponse_NOT_SERVING) + } + } + + r.doneWG.Add(1) + go func() { + defer r.doneWG.Done() + // TODO: Does this time need to be configurable? + watchticker := time.NewTicker(3 * time.Second) + defer watchticker.Stop() + for { + select { + case <-watchticker.C: + alive := r.Health.IsAlive() + ready := r.Health.IsReady() + + // we can just update everything because the grpc health server will only send updates if the status changes + setStatus(systemReady, ready) + setStatus(systemAlive, alive) + setStatus(system, ready && alive) + case <-r.donech: + return + } + } + }() +} + // AddOTLPMuxxer adds muxxer for OTLP requests func (r *Router) AddOTLPMuxxer(muxxer *mux.Router) { // require an auth header for OTLP requests diff --git a/route/route_test.go b/route/route_test.go index 1de8677c30..e2f87f63d5 100644 --- a/route/route_test.go +++ b/route/route_test.go @@ -17,10 +17,12 @@ import ( "github.com/facebookgo/inject" "github.com/honeycombio/refinery/collect" "github.com/honeycombio/refinery/config" + "github.com/honeycombio/refinery/internal/health" "github.com/honeycombio/refinery/internal/peer" "github.com/honeycombio/refinery/logger" "github.com/honeycombio/refinery/metrics" "github.com/honeycombio/refinery/transmit" + "github.com/jonboulle/clockwork" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/trace/noop" @@ -485,6 +487,8 @@ func TestDependencyInjection(t *testing.T) { &inject.Object{Value: &metrics.NullMetrics{}, Name: "genericMetrics"}, &inject.Object{Value: &collect.MockStressReliever{}, Name: "stressRelief"}, &inject.Object{Value: &peer.MockPeers{}}, + &inject.Object{Value: &health.Health{}}, + &inject.Object{Value: clockwork.NewFakeClock()}, ) if err != nil { t.Error(err)