diff --git a/client.go b/client.go index 040cfe9e3..e9bdecf03 100644 --- a/client.go +++ b/client.go @@ -242,6 +242,9 @@ func (client *client) Close() error { } func (client *client) Closed() bool { + client.lock.RLock() + defer client.lock.RUnlock() + return client.brokers == nil } @@ -529,6 +532,11 @@ func (client *client) RefreshCoordinator(consumerGroup string) error { // in the brokers map. It returns the broker that is registered, which may be the provided broker, // or a previously registered Broker instance. You must hold the write lock before calling this function. func (client *client) registerBroker(broker *Broker) { + if client.brokers == nil { + Logger.Printf("cannot register broker #%d at %s, client already closed", broker.ID(), broker.Addr()) + return + } + if client.brokers[broker.ID()] == nil { client.brokers[broker.ID()] = broker Logger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr()) @@ -833,6 +841,10 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, // if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bool) (retry bool, err error) { + if client.Closed() { + return + } + client.lock.Lock() defer client.lock.Unlock() diff --git a/consumer_group.go b/consumer_group.go index da99e8811..b974dd9af 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -417,12 +417,6 @@ func (c *consumerGroup) leave() error { } func (c *consumerGroup) handleError(err error, topic string, partition int32) { - select { - case <-c.closed: - return - default: - } - if _, ok := err.(*ConsumerError); !ok && topic != "" && partition > -1 { err = &ConsumerError{ Topic: topic, @@ -431,13 +425,25 @@ func (c *consumerGroup) handleError(err error, topic string, partition int32) { } } - if c.config.Consumer.Return.Errors { - select { - case c.errors <- err: - default: - } - } else { + if !c.config.Consumer.Return.Errors { Logger.Println(err) + return + } + + c.lock.Lock() + defer c.lock.Unlock() + + select { + case <-c.closed: + //consumer is closed + return + default: + } + + select { + case c.errors <- err: + default: + // no error listener } }