diff --git a/client.go b/client.go index b786cd9db..bd68520e4 100644 --- a/client.go +++ b/client.go @@ -5,6 +5,7 @@ import ( "math/rand" "sort" "sync" + "sync/atomic" "time" ) @@ -133,6 +134,8 @@ type client struct { cachedPartitionsResults map[string][maxPartitionIndex][]int32 lock sync.RWMutex // protects access to the maps that hold cluster state. + + updateMetaDataMs int64 //store update metadata time } // NewClient creates a new Client. It connects to one of the given broker addresses @@ -877,10 +880,16 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, Logger.Println("client/metadata skipping last retries as we would go past the metadata timeout") return err } - Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining) if backoff > 0 { time.Sleep(backoff) } + + t := atomic.LoadInt64(&client.updateMetaDataMs) + if time.Since(time.Unix(t/1e3, 0)) < backoff { + return err + } + Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining) + return client.tryRefreshMetadata(topics, attemptsRemaining-1, deadline) } return err @@ -903,6 +912,12 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, } else if client.conf.Version.IsAtLeast(V0_10_0_0) { req.Version = 1 } + + t := atomic.LoadInt64(&client.updateMetaDataMs) + if !atomic.CompareAndSwapInt64(&client.updateMetaDataMs, t, time.Now().UnixNano()/int64(time.Millisecond)) { + return nil + } + response, err := broker.GetMetadata(req) var kerror KError var packetEncodingError PacketEncodingError diff --git a/client_test.go b/client_test.go index 4fe4873d2..b659719c9 100644 --- a/client_test.go +++ b/client_test.go @@ -813,6 +813,44 @@ func TestClientMetadataTimeout(t *testing.T) { } } +func TestClientUpdateMetadataErrorAndRetry(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + + metadataResponse1 := new(MetadataResponse) + metadataResponse1.AddBroker(seedBroker.Addr(), 1) + seedBroker.Returns(metadataResponse1) + + config := NewTestConfig() + config.Metadata.Retry.Max = 3 + config.Metadata.Retry.Backoff = 200 * time.Millisecond + config.Metadata.RefreshFrequency = 0 + config.Net.ReadTimeout = 10 * time.Millisecond + config.Net.WriteTimeout = 10 * time.Millisecond + client, err := NewClient([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + waitGroup := sync.WaitGroup{} + waitGroup.Add(10) + for i := 0; i < 10; i++ { + go func() { + defer waitGroup.Done() + var failedMetadataResponse MetadataResponse + failedMetadataResponse.AddBroker(seedBroker.Addr(), 1) + failedMetadataResponse.AddTopic("new_topic", ErrUnknownTopicOrPartition) + seedBroker.Returns(&failedMetadataResponse) + err := client.RefreshMetadata() + if err == nil { + t.Error("should return error") + return + } + }() + } + waitGroup.Wait() + safeClose(t, client) + seedBroker.Close() +} + func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) { seedBroker := NewMockBroker(t, 1) staleCoordinator := NewMockBroker(t, 2)