Skip to content

Commit

Permalink
Merge pull request #1531 from roblaszczak/master
Browse files Browse the repository at this point in the history
fixed panic on calling updateMetadata on closed client
  • Loading branch information
bai authored Nov 12, 2019
2 parents 9ad3f01 + e56a4a9 commit afedeca
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 12 deletions.
12 changes: 12 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,9 @@ func (client *client) Close() error {
}

func (client *client) Closed() bool {
client.lock.RLock()
defer client.lock.RUnlock()

return client.brokers == nil
}

Expand Down Expand Up @@ -529,6 +532,11 @@ func (client *client) RefreshCoordinator(consumerGroup string) error {
// in the brokers map. It returns the broker that is registered, which may be the provided broker,
// or a previously registered Broker instance. You must hold the write lock before calling this function.
func (client *client) registerBroker(broker *Broker) {
if client.brokers == nil {
Logger.Printf("cannot register broker #%d at %s, client already closed", broker.ID(), broker.Addr())
return
}

if client.brokers[broker.ID()] == nil {
client.brokers[broker.ID()] = broker
Logger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr())
Expand Down Expand Up @@ -833,6 +841,10 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int,

// if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bool) (retry bool, err error) {
if client.Closed() {
return
}

client.lock.Lock()
defer client.lock.Unlock()

Expand Down
30 changes: 18 additions & 12 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,12 +417,6 @@ func (c *consumerGroup) leave() error {
}

func (c *consumerGroup) handleError(err error, topic string, partition int32) {
select {
case <-c.closed:
return
default:
}

if _, ok := err.(*ConsumerError); !ok && topic != "" && partition > -1 {
err = &ConsumerError{
Topic: topic,
Expand All @@ -431,13 +425,25 @@ func (c *consumerGroup) handleError(err error, topic string, partition int32) {
}
}

if c.config.Consumer.Return.Errors {
select {
case c.errors <- err:
default:
}
} else {
if !c.config.Consumer.Return.Errors {
Logger.Println(err)
return
}

c.lock.Lock()
defer c.lock.Unlock()

select {
case <-c.closed:
//consumer is closed
return
default:
}

select {
case c.errors <- err:
default:
// no error listener
}
}

Expand Down

0 comments on commit afedeca

Please sign in to comment.