diff --git a/client.go b/client.go index 8b72ee743..c4c54b2dc 100644 --- a/client.go +++ b/client.go @@ -435,7 +435,11 @@ func (client *client) RefreshMetadata(topics ...string) error { } } - return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max) + deadline := time.Time{} + if client.conf.Metadata.Timeout > 0 { + deadline = time.Now().Add(client.conf.Metadata.Timeout) + } + return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max, deadline) } func (client *client) GetOffset(topic string, partitionID int32, time int64) (int64, error) { @@ -737,20 +741,32 @@ func (client *client) refreshMetadata() error { return nil } -func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int) error { +func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, deadline time.Time) error { + pastDeadline := func(backoff time.Duration) bool { + if !deadline.IsZero() && time.Now().Add(backoff).After(deadline) { + // we are past the deadline + return true + } + return false + } retry := func(err error) error { if attemptsRemaining > 0 { backoff := client.computeBackoff(attemptsRemaining) + if pastDeadline(backoff) { + Logger.Println("client/metadata skipping last retries as we would go past the metadata timeout") + return err + } Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, attemptsRemaining) if backoff > 0 { time.Sleep(backoff) } - return client.tryRefreshMetadata(topics, attemptsRemaining-1) + return client.tryRefreshMetadata(topics, attemptsRemaining-1, deadline) } return err } - for broker := client.any(); broker != nil; broker = client.any() { + broker := client.any() + for ; broker != nil && !pastDeadline(0); broker = client.any() { allowAutoTopicCreation := true if len(topics) > 0 { Logger.Printf("client/metadata fetching metadata for %v from broker %s\n", topics, broker.addr) @@ -800,6 +816,11 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int) } } + if broker != nil { + Logger.Println("client/metadata not fetching metadata from broker %s as we would go past the metadata timeout\n", broker.addr) + return retry(ErrOutOfBrokers) + } + Logger.Println("client/metadata no available broker to send metadata request to") client.resurrectDeadBrokers() return retry(ErrOutOfBrokers) diff --git a/client_test.go b/client_test.go index d0290bb4d..86fecac1d 100644 --- a/client_test.go +++ b/client_test.go @@ -1,6 +1,7 @@ package sarama import ( + "fmt" "io" "sync" "sync/atomic" @@ -612,6 +613,75 @@ func TestClientController(t *testing.T) { } } +func TestClientMetadataTimeout(t *testing.T) { + for _, timeout := range []time.Duration{ + 250 * time.Millisecond, // Will cut the first retry pass + 500 * time.Millisecond, // Will cut the second retry pass + 750 * time.Millisecond, // Will cut the third retry pass + 900 * time.Millisecond, // Will stop after the three retries + } { + t.Run(fmt.Sprintf("timeout=%v", timeout), func(t *testing.T) { + // Use a responsive broker to create a working client + initialSeed := NewMockBroker(t, 0) + emptyMetadata := new(MetadataResponse) + initialSeed.Returns(emptyMetadata) + + conf := NewConfig() + // Speed up the metadata request failure because of a read timeout + conf.Net.ReadTimeout = 100 * time.Millisecond + // Disable backoff and refresh + conf.Metadata.Retry.Backoff = 0 + conf.Metadata.RefreshFrequency = 0 + // But configure a "global" timeout + conf.Metadata.Timeout = timeout + c, err := NewClient([]string{initialSeed.Addr()}, conf) + if err != nil { + t.Fatal(err) + } + initialSeed.Close() + + client := c.(*client) + + // Start seed brokers that do not reply to anything and therefore a read + // on the TCP connection will timeout to simulate unresponsive brokers + seed1 := NewMockBroker(t, 1) + defer seed1.Close() + seed2 := NewMockBroker(t, 2) + defer seed2.Close() + + // Overwrite the seed brokers with a fixed ordering to make this test deterministic + safeClose(t, client.seedBrokers[0]) + client.seedBrokers = []*Broker{NewBroker(seed1.Addr()), NewBroker(seed2.Addr())} + client.deadSeeds = []*Broker{} + + // Start refreshing metadata in the background + errChan := make(chan error) + start := time.Now() + go func() { + errChan <- c.RefreshMetadata() + }() + + // Check that the refresh fails fast enough (less than twice the configured timeout) + // instead of at least: 100 ms * 2 brokers * 3 retries = 800 ms + maxRefreshDuration := 2 * timeout + select { + case err := <-errChan: + t.Logf("Got err: %v after waiting for: %v", err, time.Since(start)) + if err == nil { + t.Fatal("Expected failed RefreshMetadata, got nil") + } + if err != ErrOutOfBrokers { + t.Error("Expected failed RefreshMetadata with ErrOutOfBrokers, got:", err) + } + case <-time.After(maxRefreshDuration): + t.Fatalf("RefreshMetadata did not fail fast enough after waiting for %v", maxRefreshDuration) + } + + safeClose(t, c) + }) + } +} + func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) { seedBroker := NewMockBroker(t, 1) staleCoordinator := NewMockBroker(t, 2) diff --git a/config.go b/config.go index d0cfa62e7..9cba9358e 100644 --- a/config.go +++ b/config.go @@ -121,6 +121,13 @@ type Config struct { // and usually more convenient, but can take up a substantial amount of // memory if you have many topics and partitions. Defaults to true. Full bool + + // How long to wait for a successful metadata response. + // Disabled by default which means a metadata request against an unreachable + // cluster (all brokers are unreachable or unresponsive) can take up to + // `Net.[Dial|Read]Timeout * BrokerCount * (Metadata.Retry.Max + 1) + Metadata.Retry.Backoff * Metadata.Retry.Max` + // to fail. + Timeout time.Duration } // Producer is the namespace for configuration related to producing messages,