From a5a3dd2f284f910a95e798e74b5f9bb6a8358a11 Mon Sep 17 00:00:00 2001 From: Robert Laszczak Date: Fri, 1 Nov 2019 16:23:11 +0100 Subject: [PATCH 1/3] fixed panic on calling updateMetadata on closed client --- client.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/client.go b/client.go index 040cfe9e3..7833d3f2a 100644 --- a/client.go +++ b/client.go @@ -833,6 +833,10 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, // if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bool) (retry bool, err error) { + if client.Closed() { + return + } + client.lock.Lock() defer client.lock.Unlock() From 93f4001ec386a68fee0ebd8380416d9cf19ff17a Mon Sep 17 00:00:00 2001 From: Robert Laszczak Date: Sat, 2 Nov 2019 22:44:38 +0100 Subject: [PATCH 2/3] fixed race conditions and nil panic on calling registerBroker after close --- client.go | 8 ++++++++ consumer_group.go | 28 ++++++++++++++++------------ 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/client.go b/client.go index 7833d3f2a..e9bdecf03 100644 --- a/client.go +++ b/client.go @@ -242,6 +242,9 @@ func (client *client) Close() error { } func (client *client) Closed() bool { + client.lock.RLock() + defer client.lock.RUnlock() + return client.brokers == nil } @@ -529,6 +532,11 @@ func (client *client) RefreshCoordinator(consumerGroup string) error { // in the brokers map. It returns the broker that is registered, which may be the provided broker, // or a previously registered Broker instance. You must hold the write lock before calling this function. func (client *client) registerBroker(broker *Broker) { + if client.brokers == nil { + Logger.Printf("cannot register broker #%d at %s, client already closed", broker.ID(), broker.Addr()) + return + } + if client.brokers[broker.ID()] == nil { client.brokers[broker.ID()] = broker Logger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr()) diff --git a/consumer_group.go b/consumer_group.go index da99e8811..90ff14ee0 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -417,12 +417,6 @@ func (c *consumerGroup) leave() error { } func (c *consumerGroup) handleError(err error, topic string, partition int32) { - select { - case <-c.closed: - return - default: - } - if _, ok := err.(*ConsumerError); !ok && topic != "" && partition > -1 { err = &ConsumerError{ Topic: topic, @@ -431,13 +425,23 @@ func (c *consumerGroup) handleError(err error, topic string, partition int32) { } } - if c.config.Consumer.Return.Errors { - select { - case c.errors <- err: - default: - } - } else { + if !c.config.Consumer.Return.Errors { Logger.Println(err) + return + } + + c.lock.Lock() + defer c.lock.Unlock() + + select { + case <-c.closed: + return + default: + } + + select { + case c.errors <- err: + default: } } From e56a4a9baa7d83a60b70b111cd8251dc5e30687a Mon Sep 17 00:00:00 2001 From: Robert Laszczak Date: Tue, 12 Nov 2019 20:22:52 +0100 Subject: [PATCH 3/3] added comments for handleError --- consumer_group.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/consumer_group.go b/consumer_group.go index 90ff14ee0..b974dd9af 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -435,6 +435,7 @@ func (c *consumerGroup) handleError(err error, topic string, partition int32) { select { case <-c.closed: + //consumer is closed return default: } @@ -442,6 +443,7 @@ func (c *consumerGroup) handleError(err error, topic string, partition int32) { select { case c.errors <- err: default: + // no error listener } }