Skip to content

Commit

Permalink
reduce client close bookkeeping
Browse files Browse the repository at this point in the history
  • Loading branch information
nelz9999 committed Mar 1, 2019
1 parent 6bc31ae commit 6c354b8
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 40 deletions.
28 changes: 13 additions & 15 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,8 @@ func newTransactionManager(conf *Config, client Client) (*transactionManager, er
}

type asyncProducer struct {
client Client
conf *Config
ownClient bool
client Client
conf *Config

errors chan *ProducerError
input, successes, retries chan *ProducerMessage
Expand All @@ -113,18 +112,19 @@ func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
if err != nil {
return nil, err
}

p, err := NewAsyncProducerFromClient(client)
if err != nil {
return nil, err
}
p.(*asyncProducer).ownClient = true
return p, nil
return newAsyncProducer(client)
}

// NewAsyncProducerFromClient creates a new Producer using the given client. It is still
// necessary to call Close() on the underlying client when shutting down this producer.
func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
// For clients passed in by the client, ensure we don't
// call Close() on it.
cli := &nopCloserClient{client}
return newAsyncProducer(cli)
}

func newAsyncProducer(client Client) (AsyncProducer, error) {
// Check that we are not dealing with a closed Client before processing any other arguments
if client.Closed() {
return nil, ErrClosedClient
Expand Down Expand Up @@ -999,11 +999,9 @@ func (p *asyncProducer) shutdown() {

p.inFlight.Wait()

if p.ownClient {
err := p.client.Close()
if err != nil {
Logger.Println("producer/shutdown failed to close the embedded client:", err)
}
err := p.client.Close()
if err != nil {
Logger.Println("producer/shutdown failed to close the embedded client:", err)
}

close(p.input)
Expand Down
15 changes: 15 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -911,3 +911,18 @@ func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemainin
client.resurrectDeadBrokers()
return retry(ErrOutOfBrokers)
}

// nopCloserClient embeds an existing Client, but disables
// the Close method (yet all other methods pass
// through unchanged). This is for use in larger structs
// where it is undesirable to close the client that was
// passed in by the caller.
type nopCloserClient struct {
Client
}

// Close intercepts and purposely does not call the underlying
// client's Close() method.
func (ncc *nopCloserClient) Close() error {
return nil
}
25 changes: 11 additions & 14 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,8 @@ type Consumer interface {
}

type consumer struct {
client Client
conf *Config
ownClient bool
client Client
conf *Config

lock sync.Mutex
children map[string]map[int32]*partitionConsumer
Expand All @@ -90,18 +89,19 @@ func NewConsumer(addrs []string, config *Config) (Consumer, error) {
if err != nil {
return nil, err
}

c, err := NewConsumerFromClient(client)
if err != nil {
return nil, err
}
c.(*consumer).ownClient = true
return c, nil
return newConsumer(client)
}

// NewConsumerFromClient creates a new consumer using the given client. It is still
// necessary to call Close() on the underlying client when shutting down this consumer.
func NewConsumerFromClient(client Client) (Consumer, error) {
// For clients passed in by the client, ensure we don't
// call Close() on it.
cli := &nopCloserClient{client}
return newConsumer(cli)
}

func newConsumer(client Client) (Consumer, error) {
// Check that we are not dealing with a closed Client before processing any other arguments
if client.Closed() {
return nil, ErrClosedClient
Expand All @@ -118,10 +118,7 @@ func NewConsumerFromClient(client Client) (Consumer, error) {
}

func (c *consumer) Close() error {
if c.ownClient {
return c.client.Close()
}
return nil
return c.client.Close()
}

func (c *consumer) Topics() ([]string, error) {
Expand Down
23 changes: 12 additions & 11 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ type ConsumerGroup interface {
}

type consumerGroup struct {
client Client
ownClient bool
client Client

config *Config
consumer Consumer
Expand All @@ -73,20 +72,24 @@ func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerG
return nil, err
}

c, err := NewConsumerGroupFromClient(groupID, client)
c, err := newConsumerGroup(groupID, client)
if err != nil {
_ = client.Close()
return nil, err
}

c.(*consumerGroup).ownClient = true
return c, nil
return c, err
}

// NewConsumerGroupFromClient creates a new consumer group using the given client. It is still
// necessary to call Close() on the underlying client when shutting down this consumer.
// PLEASE NOTE: consumer groups can only re-use but not share clients.
func NewConsumerGroupFromClient(groupID string, client Client) (ConsumerGroup, error) {
// For clients passed in by the client, ensure we don't
// call Close() on it.
cli := &nopCloserClient{client}
return newConsumerGroup(groupID, cli)
}

func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) {
config := client.Config()
if !config.Version.IsAtLeast(V0_10_2_0) {
return nil, ConfigurationError("consumer groups require Version to be >= V0_10_2_0")
Expand Down Expand Up @@ -131,10 +134,8 @@ func (c *consumerGroup) Close() (err error) {
err = e
}

if c.ownClient {
if e := c.client.Close(); e != nil {
err = e
}
if e := c.client.Close(); e != nil {
err = e
}
})
return
Expand Down

0 comments on commit 6c354b8

Please sign in to comment.