diff --git a/client.go b/client.go index d20d356c0..dd861dca6 100644 --- a/client.go +++ b/client.go @@ -70,8 +70,8 @@ const ( ) type client struct { - conf *Config - closer chan none + conf *Config + closer, closed chan none // for shutting down background metadata updater // the broker addresses given to us through the constructor are not guaranteed to be returned in // the cluster metadata (I *think* it only returns brokers who are currently leading partitions?) @@ -111,6 +111,7 @@ func NewClient(addrs []string, conf *Config) (Client, error) { client := &client{ conf: conf, closer: make(chan none), + closed: make(chan none), brokers: make(map[int32]*Broker), metadata: make(map[string]map[int32]*PartitionMetadata), cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32), @@ -151,6 +152,10 @@ func (client *client) Close() error { return ErrClosedClient } + // shutdown and wait for the background thread before we take the lock, to avoid races + close(client.closer) + <-client.closed + client.lock.Lock() defer client.lock.Unlock() Logger.Println("Closing Client") @@ -166,8 +171,6 @@ func (client *client) Close() error { client.brokers = nil client.metadata = nil - close(client.closer) - return nil } @@ -530,11 +533,15 @@ func (client *client) getOffset(topic string, partitionID int32, time int64) (in // core metadata update logic func (client *client) backgroundMetadataUpdater() { + defer close(client.closed) + if client.conf.Metadata.RefreshFrequency == time.Duration(0) { return } ticker := time.NewTicker(client.conf.Metadata.RefreshFrequency) + defer ticker.Stop() + for { select { case <-ticker.C: @@ -542,7 +549,6 @@ func (client *client) backgroundMetadataUpdater() { Logger.Println("Client background metadata update:", err) } case <-client.closer: - ticker.Stop() return } } diff --git a/client_test.go b/client_test.go index a5e182123..d1d24af64 100644 --- a/client_test.go +++ b/client_test.go @@ -4,6 +4,7 @@ import ( "io" "sync" "testing" + "time" ) func safeClose(t *testing.T, c io.Closer) { @@ -554,3 +555,45 @@ func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) { seedBroker.Close() safeClose(t, client) } + +func TestClientAutorefreshShutdownRace(t *testing.T) { + seedBroker := newMockBroker(t, 1) + + metadataResponse := new(MetadataResponse) + seedBroker.Returns(metadataResponse) + + conf := NewConfig() + conf.Metadata.RefreshFrequency = 100 * time.Millisecond + client, err := NewClient([]string{seedBroker.Addr()}, conf) + if err != nil { + t.Fatal(err) + } + + // Wait for the background refresh to kick in + time.Sleep(110 * time.Millisecond) + + done := make(chan none) + go func() { + // Close the client + if err := client.Close(); err != nil { + t.Fatal(err) + } + close(done) + }() + + // Wait for the Close to kick in + time.Sleep(10 * time.Millisecond) + + // Then return some metadata to the still-running background thread + leader := newMockBroker(t, 2) + metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) + metadataResponse.AddTopicPartition("foo", 0, leader.BrokerID(), []int32{2}, []int32{2}, ErrNoError) + seedBroker.Returns(metadataResponse) + + <-done + + seedBroker.Close() + + // give the update time to happen so we get a panic if it's still running (which it shouldn't) + time.Sleep(10 * time.Millisecond) +}