From 6c354b8b3ccbb06723f111444046bae8b0ec4dac Mon Sep 17 00:00:00 2001 From: Nelz Date: Thu, 28 Feb 2019 21:17:39 -0800 Subject: [PATCH] reduce client close bookkeeping --- async_producer.go | 28 +++++++++++++--------------- client.go | 15 +++++++++++++++ consumer.go | 25 +++++++++++-------------- consumer_group.go | 23 ++++++++++++----------- 4 files changed, 51 insertions(+), 40 deletions(-) diff --git a/async_producer.go b/async_producer.go index 5174a35b7..5db0a73d4 100644 --- a/async_producer.go +++ b/async_producer.go @@ -92,9 +92,8 @@ func newTransactionManager(conf *Config, client Client) (*transactionManager, er } type asyncProducer struct { - client Client - conf *Config - ownClient bool + client Client + conf *Config errors chan *ProducerError input, successes, retries chan *ProducerMessage @@ -113,18 +112,19 @@ func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) { if err != nil { return nil, err } - - p, err := NewAsyncProducerFromClient(client) - if err != nil { - return nil, err - } - p.(*asyncProducer).ownClient = true - return p, nil + return newAsyncProducer(client) } // NewAsyncProducerFromClient creates a new Producer using the given client. It is still // necessary to call Close() on the underlying client when shutting down this producer. func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) { + // For clients passed in by the client, ensure we don't + // call Close() on it. + cli := &nopCloserClient{client} + return newAsyncProducer(cli) +} + +func newAsyncProducer(client Client) (AsyncProducer, error) { // Check that we are not dealing with a closed Client before processing any other arguments if client.Closed() { return nil, ErrClosedClient @@ -999,11 +999,9 @@ func (p *asyncProducer) shutdown() { p.inFlight.Wait() - if p.ownClient { - err := p.client.Close() - if err != nil { - Logger.Println("producer/shutdown failed to close the embedded client:", err) - } + err := p.client.Close() + if err != nil { + Logger.Println("producer/shutdown failed to close the embedded client:", err) } close(p.input) diff --git a/client.go b/client.go index 0016f8fbe..efd93bb59 100644 --- a/client.go +++ b/client.go @@ -911,3 +911,18 @@ func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemainin client.resurrectDeadBrokers() return retry(ErrOutOfBrokers) } + +// nopCloserClient embeds an existing Client, but disables +// the Close method (yet all other methods pass +// through unchanged). This is for use in larger structs +// where it is undesirable to close the client that was +// passed in by the caller. +type nopCloserClient struct { + Client +} + +// Close intercepts and purposely does not call the underlying +// client's Close() method. +func (ncc *nopCloserClient) Close() error { + return nil +} diff --git a/consumer.go b/consumer.go index ce72ff1d6..c4d295791 100644 --- a/consumer.go +++ b/consumer.go @@ -75,9 +75,8 @@ type Consumer interface { } type consumer struct { - client Client - conf *Config - ownClient bool + client Client + conf *Config lock sync.Mutex children map[string]map[int32]*partitionConsumer @@ -90,18 +89,19 @@ func NewConsumer(addrs []string, config *Config) (Consumer, error) { if err != nil { return nil, err } - - c, err := NewConsumerFromClient(client) - if err != nil { - return nil, err - } - c.(*consumer).ownClient = true - return c, nil + return newConsumer(client) } // NewConsumerFromClient creates a new consumer using the given client. It is still // necessary to call Close() on the underlying client when shutting down this consumer. func NewConsumerFromClient(client Client) (Consumer, error) { + // For clients passed in by the client, ensure we don't + // call Close() on it. + cli := &nopCloserClient{client} + return newConsumer(cli) +} + +func newConsumer(client Client) (Consumer, error) { // Check that we are not dealing with a closed Client before processing any other arguments if client.Closed() { return nil, ErrClosedClient @@ -118,10 +118,7 @@ func NewConsumerFromClient(client Client) (Consumer, error) { } func (c *consumer) Close() error { - if c.ownClient { - return c.client.Close() - } - return nil + return c.client.Close() } func (c *consumer) Topics() ([]string, error) { diff --git a/consumer_group.go b/consumer_group.go index 8c8babcfd..7f9804dce 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -52,8 +52,7 @@ type ConsumerGroup interface { } type consumerGroup struct { - client Client - ownClient bool + client Client config *Config consumer Consumer @@ -73,20 +72,24 @@ func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerG return nil, err } - c, err := NewConsumerGroupFromClient(groupID, client) + c, err := newConsumerGroup(groupID, client) if err != nil { _ = client.Close() - return nil, err } - - c.(*consumerGroup).ownClient = true - return c, nil + return c, err } // NewConsumerGroupFromClient creates a new consumer group using the given client. It is still // necessary to call Close() on the underlying client when shutting down this consumer. // PLEASE NOTE: consumer groups can only re-use but not share clients. func NewConsumerGroupFromClient(groupID string, client Client) (ConsumerGroup, error) { + // For clients passed in by the client, ensure we don't + // call Close() on it. + cli := &nopCloserClient{client} + return newConsumerGroup(groupID, cli) +} + +func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) { config := client.Config() if !config.Version.IsAtLeast(V0_10_2_0) { return nil, ConfigurationError("consumer groups require Version to be >= V0_10_2_0") @@ -131,10 +134,8 @@ func (c *consumerGroup) Close() (err error) { err = e } - if c.ownClient { - if e := c.client.Close(); e != nil { - err = e - } + if e := c.client.Close(); e != nil { + err = e } }) return