Skip to content

Commit

Permalink
fix: make unhealthy signal send non-blocking (#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
kalbhor authored Sep 19, 2024
1 parent 98fb5f2 commit b5059eb
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 4 deletions.
21 changes: 18 additions & 3 deletions internal/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,12 @@ func (re *Relay) Start(globalCtx context.Context) error {
// Start the consumer group worker by trigger a signal to the relay loop to fetch
// a consumer worker to fetch initial healthy node.
re.log.Info("starting consumer worker")
re.signalCh <- struct{}{}
// The push is non-blocking to avoid getting stuck trying to send on the poll loop
// if the threshold checker go-routine might have already sent on the channel concurrently.
select {
case re.signalCh <- struct{}{}:
default:
}

wg.Add(1)
// Relay teardown.
Expand Down Expand Up @@ -179,7 +184,12 @@ loop:
of, err := re.source.GetHighWatermark(ctx, server.Client)
if err != nil {
re.log.Error("could not get end offsets (first poll); sending unhealthy signal", "id", server.ID, "server", server.Config.BootstrapBrokers, "error", err)
re.signalCh <- struct{}{}
// The push is non-blocking to avoid getting stuck trying to send on the poll loop
// if the threshold checker go-routine might have already sent on the channel concurrently.
select {
case re.signalCh <- struct{}{}:
default:
}

continue loop
}
Expand All @@ -197,7 +207,12 @@ loop:
fetches, err := re.source.GetFetches(server)
if err != nil {
re.log.Error("marking server as unhealthy", "server", server.ID)
re.signalCh <- struct{}{}
// The push is non-blocking to avoid getting stuck trying to send on the poll loop
// if the threshold checker go-routine might have already sent on the channel concurrently.
select {
case re.signalCh <- struct{}{}:
default:
}

continue loop
}
Expand Down
7 changes: 6 additions & 1 deletion internal/relay/source_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,12 @@ func (sp *SourcePool) healthcheck(ctx context.Context, signal chan struct{}) err
sp.fetchCancel()

// Signal the relay poll loop to start asking for a healthy client.
signal <- struct{}{}
// The push is non-blocking to avoid getting stuck trying to send on the poll loop
// if the poll loop's subsection (checking for errors) has already sent a signal
select {
case signal <- struct{}{}:
default:
}
}
}
}
Expand Down

0 comments on commit b5059eb

Please sign in to comment.