From c42b2e0ed061fbdbade7fcb86ddf88a000a7bc4d Mon Sep 17 00:00:00 2001 From: HaoSunUber <86338940+HaoSunUber@users.noreply.github.com> Date: Fri, 13 Oct 2023 13:54:35 -0700 Subject: [PATCH] fix(client): ignore empty Metadata responses when refreshing (#2672) We should skip the metadata refresh if the startup phase broker returns empty brokers in metadata response. The Java client skips the empty response to update the metadata cache (https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L1149) and we should make a feature parity in Sarama too Fixes #2664 Signed-off-by: Hao Sun --- client.go | 7 ++++ client_test.go | 86 ++++++++++++++++++++++++++++++++++++++---- client_tls_test.go | 4 +- offset_manager_test.go | 4 +- sync_producer_test.go | 1 + 5 files changed, 92 insertions(+), 10 deletions(-) diff --git a/client.go b/client.go index 5e665eaef..c6364eead 100644 --- a/client.go +++ b/client.go @@ -1035,6 +1035,13 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, var kerror KError var packetEncodingError PacketEncodingError if err == nil { + // When talking to the startup phase of a broker, it is possible to receive an empty metadata set. We should remove that broker and try next broker (https://issues.apache.org/jira/browse/KAFKA-7924). + if len(response.Brokers) == 0 { + Logger.Println("client/metadata receiving empty brokers from the metadata response when requesting the broker #%d at %s", broker.ID(), broker.addr) + _ = broker.Close() + client.deregisterBroker(broker) + continue + } allKnownMetaData := len(topics) == 0 // valid response, use it shouldRetry, err := client.updateMetadata(response, allKnownMetaData) diff --git a/client_test.go b/client_test.go index 78243bce0..155827a00 100644 --- a/client_test.go +++ b/client_test.go @@ -23,7 +23,9 @@ func safeClose(t testing.TB, c io.Closer) { func TestSimpleClient(t *testing.T) { seedBroker := NewMockBroker(t, 1) - seedBroker.Returns(new(MetadataResponse)) + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) + seedBroker.Returns(metadataResponse) client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) if err != nil { @@ -92,6 +94,7 @@ func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) { } metadataResponse = new(MetadataResponse) + metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition) seedBroker.Returns(metadataResponse) @@ -111,6 +114,7 @@ func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) { } metadataResponse = new(MetadataResponse) + metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition) seedBroker.Returns(metadataResponse) @@ -358,6 +362,7 @@ func TestClientReceivingUnknownTopicWithBackoffFunc(t *testing.T) { seedBroker := NewMockBroker(t, 1) metadataResponse1 := new(MetadataResponse) + metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) seedBroker.Returns(metadataResponse1) retryCount := int32(0) @@ -375,6 +380,7 @@ func TestClientReceivingUnknownTopicWithBackoffFunc(t *testing.T) { metadataUnknownTopic := new(MetadataResponse) metadataUnknownTopic.AddTopic("new_topic", ErrUnknownTopicOrPartition) + metadataUnknownTopic.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) seedBroker.Returns(metadataUnknownTopic) seedBroker.Returns(metadataUnknownTopic) @@ -395,6 +401,7 @@ func TestClientReceivingUnknownTopic(t *testing.T) { seedBroker := NewMockBroker(t, 1) metadataResponse1 := new(MetadataResponse) + metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) seedBroker.Returns(metadataResponse1) config := NewTestConfig() @@ -406,6 +413,7 @@ func TestClientReceivingUnknownTopic(t *testing.T) { } metadataUnknownTopic := new(MetadataResponse) + metadataUnknownTopic.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) metadataUnknownTopic.AddTopic("new_topic", ErrUnknownTopicOrPartition) seedBroker.Returns(metadataUnknownTopic) seedBroker.Returns(metadataUnknownTopic) @@ -481,6 +489,53 @@ func TestClientReceivingPartialMetadata(t *testing.T) { leader.Close() } +func TestClientRefreshBehaviourWhenEmptyMetadataResponse(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + broker := NewMockBroker(t, 2) + + metadataResponse1 := new(MetadataResponse) + metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) + seedBroker.Returns(metadataResponse1) + + c, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) + if err != nil { + t.Fatal(err) + } + client := c.(*client) + if len(client.seedBrokers) != 1 { + t.Error("incorrect number of live seeds") + } + if len(client.deadSeeds) != 0 { + t.Error("incorrect number of dead seeds") + } + if len(client.brokers) != 1 { + t.Error("incorrect number of brokers") + } + + // Empty metadata response + seedBroker.Returns(new(MetadataResponse)) + metadataResponse2 := new(MetadataResponse) + metadataResponse2.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) + metadataResponse2.AddBroker(broker.Addr(), broker.BrokerID()) + seedBroker.Returns(metadataResponse2) + err = c.RefreshMetadata() + if err != nil { + t.Fatal(err) + } + if len(client.seedBrokers) != 1 { + t.Error("incorrect number of live seeds") + } + if len(client.deadSeeds) != 0 { + t.Error("incorrect number of dead seeds") + } + if len(client.brokers) != 2 { + t.Error("incorrect number of brokers") + } + broker.Close() + seedBroker.Close() + safeClose(t, client) +} + func TestClientRefreshBehaviour(t *testing.T) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 5) @@ -633,8 +688,9 @@ func TestClientGetBroker(t *testing.T) { func TestClientResurrectDeadSeeds(t *testing.T) { initialSeed := NewMockBroker(t, 0) - emptyMetadata := new(MetadataResponse) - initialSeed.Returns(emptyMetadata) + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(initialSeed.Addr(), initialSeed.BrokerID()) + initialSeed.Returns(metadataResponse) conf := NewTestConfig() conf.Metadata.Retry.Backoff = 0 @@ -643,7 +699,6 @@ func TestClientResurrectDeadSeeds(t *testing.T) { if err != nil { t.Fatal(err) } - initialSeed.Close() client := c.(*client) @@ -658,6 +713,7 @@ func TestClientResurrectDeadSeeds(t *testing.T) { safeClose(t, client.seedBrokers[0]) client.seedBrokers = []*Broker{NewBroker(addr1), NewBroker(addr2), NewBroker(addr3)} client.deadSeeds = []*Broker{} + client.brokers = map[int32]*Broker{} wg := sync.WaitGroup{} wg.Add(1) @@ -676,7 +732,9 @@ func TestClientResurrectDeadSeeds(t *testing.T) { seed3.Close() seed1.Close() - seed2.Returns(emptyMetadata) + metadataResponse2 := new(MetadataResponse) + metadataResponse2.AddBroker(seed2.Addr(), seed2.BrokerID()) + seed2.Returns(metadataResponse2) wg.Wait() @@ -767,6 +825,7 @@ func TestClientMetadataTimeout(t *testing.T) { // Use a responsive broker to create a working client initialSeed := NewMockBroker(t, 0) emptyMetadata := new(MetadataResponse) + emptyMetadata.AddBroker(initialSeed.Addr(), initialSeed.BrokerID()) initialSeed.Returns(emptyMetadata) conf := NewTestConfig() @@ -996,6 +1055,7 @@ func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) { coordinator := NewMockBroker(t, 2) metadataResponse1 := new(MetadataResponse) + metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) seedBroker.Returns(metadataResponse1) config := NewTestConfig() @@ -1011,11 +1071,13 @@ func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) { seedBroker.Returns(coordinatorResponse1) metadataResponse2 := new(MetadataResponse) + metadataResponse2.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) metadataResponse2.AddTopic("__consumer_offsets", ErrUnknownTopicOrPartition) seedBroker.Returns(metadataResponse2) replicas := []int32{coordinator.BrokerID()} metadataResponse3 := new(MetadataResponse) + metadataResponse3.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) metadataResponse3.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, []int32{}, ErrNoError) seedBroker.Returns(metadataResponse3) @@ -1049,6 +1111,7 @@ func TestClientAutorefreshShutdownRace(t *testing.T) { defer seedBroker.Close() metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) seedBroker.Returns(metadataResponse) conf := NewTestConfig() @@ -1105,7 +1168,9 @@ func TestClientConnectionRefused(t *testing.T) { func TestClientCoordinatorConnectionRefused(t *testing.T) { t.Parallel() seedBroker := NewMockBroker(t, 1) - seedBroker.Returns(new(MetadataResponse)) + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) + seedBroker.Returns(metadataResponse) client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) if err != nil { @@ -1130,7 +1195,10 @@ func TestClientCoordinatorConnectionRefused(t *testing.T) { func TestInitProducerIDConnectionRefused(t *testing.T) { t.Parallel() seedBroker := NewMockBroker(t, 1) - seedBroker.Returns(&MetadataResponse{Version: 4}) + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) + metadataResponse.Version = 4 + seedBroker.Returns(metadataResponse) config := NewTestConfig() config.Producer.Idempotent = true @@ -1161,7 +1229,9 @@ func TestInitProducerIDConnectionRefused(t *testing.T) { func TestMetricsCleanup(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() - seedBroker.Returns(new(MetadataResponse)) + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) + seedBroker.Returns(metadataResponse) config := NewTestConfig() metrics.GetOrRegisterMeter("a", config.MetricRegistry) diff --git a/client_tls_test.go b/client_tls_test.go index aa01d63a6..7c2432c99 100644 --- a/client_tls_test.go +++ b/client_tls_test.go @@ -197,7 +197,9 @@ func doListenerTLSTest(t *testing.T, expectSuccess bool, serverConfig, clientCon seedBroker := NewMockBrokerListener(childT, 1, seedListener) defer seedBroker.Close() - seedBroker.Returns(new(MetadataResponse)) + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) + seedBroker.Returns(metadataResponse) config := NewTestConfig() config.Net.TLS.Enable = true diff --git a/offset_manager_test.go b/offset_manager_test.go index c3ac33641..def296be6 100644 --- a/offset_manager_test.go +++ b/offset_manager_test.go @@ -78,7 +78,9 @@ func initPartitionOffsetManager(t *testing.T, om OffsetManager, func TestNewOffsetManager(t *testing.T) { seedBroker := NewMockBroker(t, 1) - seedBroker.Returns(new(MetadataResponse)) + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) + seedBroker.Returns(metadataResponse) defer seedBroker.Close() testClient, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) diff --git a/sync_producer_test.go b/sync_producer_test.go index 8d366b011..776ae5f69 100644 --- a/sync_producer_test.go +++ b/sync_producer_test.go @@ -271,6 +271,7 @@ func TestSyncProducerToNonExistingTopic(t *testing.T) { } metadataResponse = new(MetadataResponse) + metadataResponse.AddBroker(broker.Addr(), broker.BrokerID()) metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition) broker.Returns(metadataResponse)