Skip to content

Commit

Permalink
Clear all metadata when we have the latest info
Browse files Browse the repository at this point in the history
  • Loading branch information
Mongey committed Mar 8, 2018
1 parent eae9146 commit 407fc37
Showing 1 changed file with 7 additions and 3 deletions.
10 changes: 7 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,8 +649,9 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int)

switch err.(type) {
case nil:
allKnownMetaData := len(topics) == 0
// valid response, use it
shouldRetry, err := client.updateMetadata(response)
shouldRetry, err := client.updateMetadata(response, allKnownMetaData)
if shouldRetry {
Logger.Println("client/metadata found some partitions to be leaderless")
return retry(err) // note: err can be nil
Expand All @@ -674,7 +675,7 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int)
}

// if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
func (client *client) updateMetadata(data *MetadataResponse) (retry bool, err error) {
func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bool) (retry bool, err error) {
client.lock.Lock()
defer client.lock.Unlock()

Expand All @@ -685,7 +686,10 @@ func (client *client) updateMetadata(data *MetadataResponse) (retry bool, err er
for _, broker := range data.Brokers {
client.registerBroker(broker)
}

if allKnownMetaData {
client.metadata = make(map[string]map[int32]*PartitionMetadata)
client.cachedPartitionsResults = make(map[string][maxPartitionIndex][]int32)
}
for _, topic := range data.Topics {
delete(client.metadata, topic.Name)
delete(client.cachedPartitionsResults, topic.Name)
Expand Down

0 comments on commit 407fc37

Please sign in to comment.