diff --git a/client.go b/client.go index 937000a7b..f5563e938 100644 --- a/client.go +++ b/client.go @@ -649,8 +649,9 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int) switch err.(type) { case nil: + allKnownMetaData := len(topics) == 0 // valid response, use it - shouldRetry, err := client.updateMetadata(response) + shouldRetry, err := client.updateMetadata(response, allKnownMetaData) if shouldRetry { Logger.Println("client/metadata found some partitions to be leaderless") return retry(err) // note: err can be nil @@ -674,7 +675,7 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int) } // if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable -func (client *client) updateMetadata(data *MetadataResponse) (retry bool, err error) { +func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bool) (retry bool, err error) { client.lock.Lock() defer client.lock.Unlock() @@ -685,7 +686,10 @@ func (client *client) updateMetadata(data *MetadataResponse) (retry bool, err er for _, broker := range data.Brokers { client.registerBroker(broker) } - + if allKnownMetaData { + client.metadata = make(map[string]map[int32]*PartitionMetadata) + client.cachedPartitionsResults = make(map[string][maxPartitionIndex][]int32) + } for _, topic := range data.Topics { delete(client.metadata, topic.Name) delete(client.cachedPartitionsResults, topic.Name)