Skip to content

Commit

Permalink
Merge pull request #894 from ripienaar/check_consumer_backport
Browse files Browse the repository at this point in the history
Backport consumer check command
  • Loading branch information
ripienaar authored Oct 11, 2023
2 parents 48df2a0 + 35c4b13 commit 8eced1f
Showing 1 changed file with 87 additions and 0 deletions.
87 changes: 87 additions & 0 deletions cli/server_check_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ type SrvCheckCmd struct {
sourcesMessagesWarn uint64
sourcesMessagesCrit uint64

consumerName string
consumerAckOutstandingCritical int
consumerWaitingCritical int
consumerUnprocessedCritical int
consumerLastDeliveryCritical time.Duration
consumerLastAckCritical time.Duration
consumerRedeliveryCritical int

raftExpect int
raftLagCritical uint64
raftSeenCritical time.Duration
Expand Down Expand Up @@ -116,6 +124,16 @@ func configureServerCheckCommand(srv *fisk.CmdClause) {
stream.Flag("msgs-warn", "Warn if there are fewer than this many messages in the stream").PlaceHolder("MSGS").Uint64Var(&c.sourcesMessagesWarn)
stream.Flag("msgs-critical", "Critical if there are fewer than this many messages in the stream").PlaceHolder("MSGS").Uint64Var(&c.sourcesMessagesCrit)

consumer := check.Command("consumer", "Checks the health of a consumer").Action(c.checkConsumer)
consumer.Flag("stream", "The streams to check").Required().StringVar(&c.sourcesStream)
consumer.Flag("consumer", "The consumer to check").Required().StringVar(&c.consumerName)
consumer.Flag("outstanding-ack-critical", "Maximum number of outstanding acks to allow").Default("-1").IntVar(&c.consumerAckOutstandingCritical)
consumer.Flag("waiting-critical", "Maximum number of waiting pulls to allow").Default("-1").IntVar(&c.consumerWaitingCritical)
consumer.Flag("unprocessed-critical", "Maximum number of unprocessed messages to allow").Default("-1").IntVar(&c.consumerUnprocessedCritical)
consumer.Flag("last-delivery-critical", "Time to allow since the last delivery").Default("0s").DurationVar(&c.consumerLastDeliveryCritical)
consumer.Flag("last-ack-critical", "Time to allow since the last ack").Default("0s").DurationVar(&c.consumerLastAckCritical)
consumer.Flag("redelivery-critical", "Maximum number of redeliveries to allow").Default("-1").IntVar(&c.consumerRedeliveryCritical)

meta := check.Command("meta", "Check JetStream cluster state").Alias("raft").Action(c.checkRaft)
meta.Flag("expect", "Number of servers to expect").Required().PlaceHolder("SERVERS").IntVar(&c.raftExpect)
meta.Flag("lag-critical", "Critical threshold to allow for lag").PlaceHolder("OPS").Required().Uint64Var(&c.raftLagCritical)
Expand Down Expand Up @@ -475,6 +493,75 @@ func (i *perfDataItem) String() string {
return pd
}

func (c *SrvCheckCmd) checkConsumerStatus(check *result, nfo api.ConsumerInfo) {
check.pd(&perfDataItem{Name: "ack_pending", Value: float64(nfo.NumAckPending), Help: "The number of messages waiting to be Acknowledged", Crit: float64(c.consumerAckOutstandingCritical)})
check.pd(&perfDataItem{Name: "pull_waiting", Value: float64(nfo.NumWaiting), Help: "The number of waiting Pull requests", Crit: float64(c.consumerWaitingCritical)})
check.pd(&perfDataItem{Name: "pending", Value: float64(nfo.NumPending), Help: "The number of messages that have not yet been consumed", Crit: float64(c.consumerUnprocessedCritical)})
check.pd(&perfDataItem{Name: "redelivered", Value: float64(nfo.NumRedelivered), Help: "The number of messages currently being redelivered", Crit: float64(c.consumerRedeliveryCritical)})
if nfo.Delivered.Last != nil {
check.pd(&perfDataItem{Name: "last_delivery", Value: time.Since(*nfo.Delivered.Last).Seconds(), Unit: "s", Help: "Seconds since the last message was delivered", Crit: float64(c.consumerLastDeliveryCritical)})
}
if nfo.AckFloor.Last != nil {
check.pd(&perfDataItem{Name: "last_ack", Value: time.Since(*nfo.AckFloor.Last).Seconds(), Unit: "s", Help: "Seconds since the last message was acknowledged", Crit: float64(c.consumerLastDeliveryCritical)})
}

if c.consumerAckOutstandingCritical > 0 && nfo.NumAckPending >= c.consumerAckOutstandingCritical {
check.critical("Ack Pending: %d", nfo.NumAckPending)
}

if c.consumerWaitingCritical > 0 && nfo.NumWaiting >= c.consumerWaitingCritical {
check.critical("Waiting Pulls: %d", nfo.NumWaiting)
}

if c.consumerUnprocessedCritical > 0 && nfo.NumPending >= uint64(c.consumerUnprocessedCritical) {
check.critical("Unprocessed Messages: %d", nfo.NumPending)
}

if c.consumerRedeliveryCritical > 0 && nfo.NumRedelivered > c.consumerRedeliveryCritical {
check.critical("Redelivered Messages: %d", nfo.NumRedelivered)
}

switch {
case c.consumerLastDeliveryCritical <= 0:
case nfo.Delivered.Last == nil:
check.critical("No deliveries")
case time.Since(*nfo.Delivered.Last) >= c.consumerLastDeliveryCritical:
check.critical("Last delivery %v ago", time.Since(*nfo.Delivered.Last).Round(time.Second))
}

switch {
case c.consumerLastAckCritical <= 0:
case nfo.AckFloor.Last == nil:
check.critical("No acknowledgements")
case time.Since(*nfo.AckFloor.Last) >= c.consumerLastAckCritical:
check.critical("Last ack %v ago", time.Since(*nfo.AckFloor.Last).Round(time.Second))
}
}

func (c *SrvCheckCmd) checkConsumer(_ *fisk.ParseContext) error {
check := &result{Name: fmt.Sprintf("%s_%s", c.sourcesStream, c.consumerName), Check: "consumer"}
defer check.GenericExit()

_, mgr, err := prepareHelper("", natsOpts()...)
check.criticalIfErr(err, "connection failed: %s", err)

cons, err := mgr.LoadConsumer(c.sourcesStream, c.consumerName)
if err != nil {
check.critical("consumer load failure: %v", err)
return nil
}

nfo, err := cons.LatestState()
if err != nil {
check.critical("consumer state failure: %v", err)
return nil
}

c.checkConsumerStatus(check, nfo)

return nil
}

func (c *SrvCheckCmd) checkKVStatusAndBucket(check *result, nc *nats.Conn) {
js, err := nc.JetStream()
check.criticalIfErr(err, "connection failed: %v", err)
Expand Down

0 comments on commit 8eced1f

Please sign in to comment.