Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support timeout when fetching metadata #1359

Merged
merged 1 commit into from
May 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,11 @@ func (client *client) RefreshMetadata(topics ...string) error {
}
}

return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max)
deadline := time.Time{}
if client.conf.Metadata.Timeout > 0 {
deadline = time.Now().Add(client.conf.Metadata.Timeout)
}
return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max, deadline)
}

func (client *client) GetOffset(topic string, partitionID int32, time int64) (int64, error) {
Expand Down Expand Up @@ -737,20 +741,32 @@ func (client *client) refreshMetadata() error {
return nil
}

func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int) error {
func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, deadline time.Time) error {
pastDeadline := func(backoff time.Duration) bool {
if !deadline.IsZero() && time.Now().Add(backoff).After(deadline) {
// we are past the deadline
return true
}
return false
}
retry := func(err error) error {
if attemptsRemaining > 0 {
backoff := client.computeBackoff(attemptsRemaining)
if pastDeadline(backoff) {
Logger.Println("client/metadata skipping last retries as we would go past the metadata timeout")
return err
}
Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, attemptsRemaining)
if backoff > 0 {
time.Sleep(backoff)
}
return client.tryRefreshMetadata(topics, attemptsRemaining-1)
return client.tryRefreshMetadata(topics, attemptsRemaining-1, deadline)
}
return err
}

for broker := client.any(); broker != nil; broker = client.any() {
broker := client.any()
for ; broker != nil && !pastDeadline(0); broker = client.any() {
allowAutoTopicCreation := true
if len(topics) > 0 {
Logger.Printf("client/metadata fetching metadata for %v from broker %s\n", topics, broker.addr)
Expand Down Expand Up @@ -788,6 +804,11 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int)
}
}

if broker != nil {
Logger.Println("client/metadata not fetching metadata from broker %s as we would go past the metadata timeout\n", broker.addr)
return retry(ErrOutOfBrokers)
}

Logger.Println("client/metadata no available broker to send metadata request to")
client.resurrectDeadBrokers()
return retry(ErrOutOfBrokers)
Expand Down
70 changes: 70 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sarama

import (
"fmt"
"io"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -612,6 +613,75 @@ func TestClientController(t *testing.T) {
}
}

func TestClientMetadataTimeout(t *testing.T) {
for _, timeout := range []time.Duration{
250 * time.Millisecond, // Will cut the first retry pass
500 * time.Millisecond, // Will cut the second retry pass
750 * time.Millisecond, // Will cut the third retry pass
900 * time.Millisecond, // Will stop after the three retries
} {
t.Run(fmt.Sprintf("timeout=%v", timeout), func(t *testing.T) {
// Use a responsive broker to create a working client
initialSeed := NewMockBroker(t, 0)
emptyMetadata := new(MetadataResponse)
initialSeed.Returns(emptyMetadata)

conf := NewConfig()
// Speed up the metadata request failure because of a read timeout
conf.Net.ReadTimeout = 100 * time.Millisecond
// Disable backoff and refresh
conf.Metadata.Retry.Backoff = 0
conf.Metadata.RefreshFrequency = 0
// But configure a "global" timeout
conf.Metadata.Timeout = timeout
c, err := NewClient([]string{initialSeed.Addr()}, conf)
if err != nil {
t.Fatal(err)
}
initialSeed.Close()

client := c.(*client)

// Start seed brokers that do not reply to anything and therefore a read
// on the TCP connection will timeout to simulate unresponsive brokers
seed1 := NewMockBroker(t, 1)
defer seed1.Close()
seed2 := NewMockBroker(t, 2)
defer seed2.Close()

// Overwrite the seed brokers with a fixed ordering to make this test deterministic
safeClose(t, client.seedBrokers[0])
client.seedBrokers = []*Broker{NewBroker(seed1.Addr()), NewBroker(seed2.Addr())}
client.deadSeeds = []*Broker{}

// Start refreshing metadata in the background
errChan := make(chan error)
start := time.Now()
go func() {
errChan <- c.RefreshMetadata()
}()

// Check that the refresh fails fast enough (less than twice the configured timeout)
// instead of at least: 100 ms * 2 brokers * 3 retries = 800 ms
maxRefreshDuration := 2 * timeout
select {
case err := <-errChan:
t.Logf("Got err: %v after waiting for: %v", err, time.Since(start))
if err == nil {
t.Fatal("Expected failed RefreshMetadata, got nil")
}
if err != ErrOutOfBrokers {
t.Error("Expected failed RefreshMetadata with ErrOutOfBrokers, got:", err)
}
case <-time.After(maxRefreshDuration):
t.Fatalf("RefreshMetadata did not fail fast enough after waiting for %v", maxRefreshDuration)
}

safeClose(t, c)
})
}
}

func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
staleCoordinator := NewMockBroker(t, 2)
Expand Down
7 changes: 7 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ type Config struct {
// and usually more convenient, but can take up a substantial amount of
// memory if you have many topics and partitions. Defaults to true.
Full bool

// How long to wait for a successful metadata response.
// Disabled by default which means a metadata request against an unreachable
// cluster (all brokers are unreachable or unresponsive) can take up to
// `Net.[Dial|Read]Timeout * BrokerCount * (Metadata.Retry.Max + 1) + Metadata.Retry.Backoff * Metadata.Retry.Max`
// to fail.
Timeout time.Duration
}

// Producer is the namespace for configuration related to producing messages,
Expand Down