Skip to content

Commit

Permalink
Merge pull request #1359 from slaunay/enhancement/metadata-refresh-fa…
Browse files Browse the repository at this point in the history
…il-fast

Support timeout when fetching metadata
  • Loading branch information
bai committed May 8, 2019
2 parents 457a8e6 + fe1b184 commit a5ecebc
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 4 deletions.
29 changes: 25 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,11 @@ func (client *client) RefreshMetadata(topics ...string) error {
}
}

return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max)
deadline := time.Time{}
if client.conf.Metadata.Timeout > 0 {
deadline = time.Now().Add(client.conf.Metadata.Timeout)
}
return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max, deadline)
}

func (client *client) GetOffset(topic string, partitionID int32, time int64) (int64, error) {
Expand Down Expand Up @@ -737,20 +741,32 @@ func (client *client) refreshMetadata() error {
return nil
}

func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int) error {
func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, deadline time.Time) error {
pastDeadline := func(backoff time.Duration) bool {
if !deadline.IsZero() && time.Now().Add(backoff).After(deadline) {
// we are past the deadline
return true
}
return false
}
retry := func(err error) error {
if attemptsRemaining > 0 {
backoff := client.computeBackoff(attemptsRemaining)
if pastDeadline(backoff) {
Logger.Println("client/metadata skipping last retries as we would go past the metadata timeout")
return err
}
Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, attemptsRemaining)
if backoff > 0 {
time.Sleep(backoff)
}
return client.tryRefreshMetadata(topics, attemptsRemaining-1)
return client.tryRefreshMetadata(topics, attemptsRemaining-1, deadline)
}
return err
}

for broker := client.any(); broker != nil; broker = client.any() {
broker := client.any()
for ; broker != nil && !pastDeadline(0); broker = client.any() {
allowAutoTopicCreation := true
if len(topics) > 0 {
Logger.Printf("client/metadata fetching metadata for %v from broker %s\n", topics, broker.addr)
Expand Down Expand Up @@ -800,6 +816,11 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int)
}
}

if broker != nil {
Logger.Println("client/metadata not fetching metadata from broker %s as we would go past the metadata timeout\n", broker.addr)
return retry(ErrOutOfBrokers)
}

Logger.Println("client/metadata no available broker to send metadata request to")
client.resurrectDeadBrokers()
return retry(ErrOutOfBrokers)
Expand Down
70 changes: 70 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sarama

import (
"fmt"
"io"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -612,6 +613,75 @@ func TestClientController(t *testing.T) {
}
}

func TestClientMetadataTimeout(t *testing.T) {
for _, timeout := range []time.Duration{
250 * time.Millisecond, // Will cut the first retry pass
500 * time.Millisecond, // Will cut the second retry pass
750 * time.Millisecond, // Will cut the third retry pass
900 * time.Millisecond, // Will stop after the three retries
} {
t.Run(fmt.Sprintf("timeout=%v", timeout), func(t *testing.T) {
// Use a responsive broker to create a working client
initialSeed := NewMockBroker(t, 0)
emptyMetadata := new(MetadataResponse)
initialSeed.Returns(emptyMetadata)

conf := NewConfig()
// Speed up the metadata request failure because of a read timeout
conf.Net.ReadTimeout = 100 * time.Millisecond
// Disable backoff and refresh
conf.Metadata.Retry.Backoff = 0
conf.Metadata.RefreshFrequency = 0
// But configure a "global" timeout
conf.Metadata.Timeout = timeout
c, err := NewClient([]string{initialSeed.Addr()}, conf)
if err != nil {
t.Fatal(err)
}
initialSeed.Close()

client := c.(*client)

// Start seed brokers that do not reply to anything and therefore a read
// on the TCP connection will timeout to simulate unresponsive brokers
seed1 := NewMockBroker(t, 1)
defer seed1.Close()
seed2 := NewMockBroker(t, 2)
defer seed2.Close()

// Overwrite the seed brokers with a fixed ordering to make this test deterministic
safeClose(t, client.seedBrokers[0])
client.seedBrokers = []*Broker{NewBroker(seed1.Addr()), NewBroker(seed2.Addr())}
client.deadSeeds = []*Broker{}

// Start refreshing metadata in the background
errChan := make(chan error)
start := time.Now()
go func() {
errChan <- c.RefreshMetadata()
}()

// Check that the refresh fails fast enough (less than twice the configured timeout)
// instead of at least: 100 ms * 2 brokers * 3 retries = 800 ms
maxRefreshDuration := 2 * timeout
select {
case err := <-errChan:
t.Logf("Got err: %v after waiting for: %v", err, time.Since(start))
if err == nil {
t.Fatal("Expected failed RefreshMetadata, got nil")
}
if err != ErrOutOfBrokers {
t.Error("Expected failed RefreshMetadata with ErrOutOfBrokers, got:", err)
}
case <-time.After(maxRefreshDuration):
t.Fatalf("RefreshMetadata did not fail fast enough after waiting for %v", maxRefreshDuration)
}

safeClose(t, c)
})
}
}

func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
staleCoordinator := NewMockBroker(t, 2)
Expand Down
7 changes: 7 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ type Config struct {
// and usually more convenient, but can take up a substantial amount of
// memory if you have many topics and partitions. Defaults to true.
Full bool

// How long to wait for a successful metadata response.
// Disabled by default which means a metadata request against an unreachable
// cluster (all brokers are unreachable or unresponsive) can take up to
// `Net.[Dial|Read]Timeout * BrokerCount * (Metadata.Retry.Max + 1) + Metadata.Retry.Backoff * Metadata.Retry.Max`
// to fail.
Timeout time.Duration
}

// Producer is the namespace for configuration related to producing messages,
Expand Down

0 comments on commit a5ecebc

Please sign in to comment.