Skip to content

Commit

Permalink
client: stop the background thread before closing
Browse files Browse the repository at this point in the history
Otherwise, in the rare case where a background metadata refresh coincided
exactly with a call to `Client.Close()` there was a race where we might have
potentially tried to write to a nil map. Also add a test for this.

Fixes #422.
  • Loading branch information
eapache committed Apr 15, 2015
1 parent 0fc4105 commit dd6ba39
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 5 deletions.
17 changes: 12 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ const (
)

type client struct {
conf *Config
closer chan none
conf *Config
closer, closed chan none // for shutting down background metadata updater

// the broker addresses given to us through the constructor are not guaranteed to be returned in
// the cluster metadata (I *think* it only returns brokers who are currently leading partitions?)
Expand Down Expand Up @@ -111,6 +111,7 @@ func NewClient(addrs []string, conf *Config) (Client, error) {
client := &client{
conf: conf,
closer: make(chan none),
closed: make(chan none),
brokers: make(map[int32]*Broker),
metadata: make(map[string]map[int32]*PartitionMetadata),
cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32),
Expand All @@ -129,6 +130,7 @@ func NewClient(addrs []string, conf *Config) (Client, error) {
// indicates that maybe part of the cluster is down, but is not fatal to creating the client
Logger.Println(err)
default:
close(client.closed) // we haven't started the background updater yet, so we have to do this manually
_ = client.Close()
return nil, err
}
Expand All @@ -151,6 +153,10 @@ func (client *client) Close() error {
return ErrClosedClient
}

// shutdown and wait for the background thread before we take the lock, to avoid races
close(client.closer)
<-client.closed

client.lock.Lock()
defer client.lock.Unlock()
Logger.Println("Closing Client")
Expand All @@ -166,8 +172,6 @@ func (client *client) Close() error {
client.brokers = nil
client.metadata = nil

close(client.closer)

return nil
}

Expand Down Expand Up @@ -530,19 +534,22 @@ func (client *client) getOffset(topic string, partitionID int32, time int64) (in
// core metadata update logic

func (client *client) backgroundMetadataUpdater() {
defer close(client.closed)

if client.conf.Metadata.RefreshFrequency == time.Duration(0) {
return
}

ticker := time.NewTicker(client.conf.Metadata.RefreshFrequency)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if err := client.RefreshMetadata(); err != nil {
Logger.Println("Client background metadata update:", err)
}
case <-client.closer:
ticker.Stop()
return
}
}
Expand Down
43 changes: 43 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"io"
"sync"
"testing"
"time"
)

func safeClose(t *testing.T, c io.Closer) {
Expand Down Expand Up @@ -554,3 +555,45 @@ func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) {
seedBroker.Close()
safeClose(t, client)
}

func TestClientAutorefreshShutdownRace(t *testing.T) {
seedBroker := newMockBroker(t, 1)

metadataResponse := new(MetadataResponse)
seedBroker.Returns(metadataResponse)

conf := NewConfig()
conf.Metadata.RefreshFrequency = 100 * time.Millisecond
client, err := NewClient([]string{seedBroker.Addr()}, conf)
if err != nil {
t.Fatal(err)
}

// Wait for the background refresh to kick in
time.Sleep(110 * time.Millisecond)

done := make(chan none)
go func() {
// Close the client
if err := client.Close(); err != nil {
t.Fatal(err)
}
close(done)
}()

// Wait for the Close to kick in
time.Sleep(10 * time.Millisecond)

// Then return some metadata to the still-running background thread
leader := newMockBroker(t, 2)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition("foo", 0, leader.BrokerID(), []int32{2}, []int32{2}, ErrNoError)
seedBroker.Returns(metadataResponse)

<-done

seedBroker.Close()

// give the update time to happen so we get a panic if it's still running (which it shouldn't)
time.Sleep(10 * time.Millisecond)
}

0 comments on commit dd6ba39

Please sign in to comment.