Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reduce client close bookkeeping #1297

Merged
merged 1 commit into from
Mar 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 13 additions & 15 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,8 @@ func newTransactionManager(conf *Config, client Client) (*transactionManager, er
}

type asyncProducer struct {
client Client
conf *Config
ownClient bool
client Client
conf *Config

errors chan *ProducerError
input, successes, retries chan *ProducerMessage
Expand All @@ -113,18 +112,19 @@ func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
if err != nil {
return nil, err
}

p, err := NewAsyncProducerFromClient(client)
if err != nil {
return nil, err
}
p.(*asyncProducer).ownClient = true
return p, nil
return newAsyncProducer(client)
}

// NewAsyncProducerFromClient creates a new Producer using the given client. It is still
// necessary to call Close() on the underlying client when shutting down this producer.
func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
// For clients passed in by the client, ensure we don't
// call Close() on it.
cli := &nopCloserClient{client}
return newAsyncProducer(cli)
}

func newAsyncProducer(client Client) (AsyncProducer, error) {
// Check that we are not dealing with a closed Client before processing any other arguments
if client.Closed() {
return nil, ErrClosedClient
Expand Down Expand Up @@ -999,11 +999,9 @@ func (p *asyncProducer) shutdown() {

p.inFlight.Wait()

if p.ownClient {
err := p.client.Close()
if err != nil {
Logger.Println("producer/shutdown failed to close the embedded client:", err)
}
err := p.client.Close()
if err != nil {
Logger.Println("producer/shutdown failed to close the embedded client:", err)
}

close(p.input)
Expand Down
15 changes: 15 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -911,3 +911,18 @@ func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemainin
client.resurrectDeadBrokers()
return retry(ErrOutOfBrokers)
}

// nopCloserClient embeds an existing Client, but disables
// the Close method (yet all other methods pass
// through unchanged). This is for use in larger structs
// where it is undesirable to close the client that was
// passed in by the caller.
type nopCloserClient struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

really like this pattern, we also do this wrapping pattern in one of our internal library. 👍

Client
}

// Close intercepts and purposely does not call the underlying
// client's Close() method.
func (ncc *nopCloserClient) Close() error {
return nil
}
25 changes: 11 additions & 14 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,8 @@ type Consumer interface {
}

type consumer struct {
client Client
conf *Config
ownClient bool
client Client
conf *Config

lock sync.Mutex
children map[string]map[int32]*partitionConsumer
Expand All @@ -90,18 +89,19 @@ func NewConsumer(addrs []string, config *Config) (Consumer, error) {
if err != nil {
return nil, err
}

c, err := NewConsumerFromClient(client)
if err != nil {
return nil, err
}
c.(*consumer).ownClient = true
return c, nil
return newConsumer(client)
}

// NewConsumerFromClient creates a new consumer using the given client. It is still
// necessary to call Close() on the underlying client when shutting down this consumer.
func NewConsumerFromClient(client Client) (Consumer, error) {
// For clients passed in by the client, ensure we don't
// call Close() on it.
cli := &nopCloserClient{client}
return newConsumer(cli)
}

func newConsumer(client Client) (Consumer, error) {
// Check that we are not dealing with a closed Client before processing any other arguments
if client.Closed() {
return nil, ErrClosedClient
Expand All @@ -118,10 +118,7 @@ func NewConsumerFromClient(client Client) (Consumer, error) {
}

func (c *consumer) Close() error {
if c.ownClient {
return c.client.Close()
}
return nil
return c.client.Close()
}

func (c *consumer) Topics() ([]string, error) {
Expand Down
23 changes: 12 additions & 11 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ type ConsumerGroup interface {
}

type consumerGroup struct {
client Client
ownClient bool
client Client

config *Config
consumer Consumer
Expand All @@ -73,20 +72,24 @@ func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerG
return nil, err
}

c, err := NewConsumerGroupFromClient(groupID, client)
c, err := newConsumerGroup(groupID, client)
if err != nil {
_ = client.Close()
return nil, err
}

c.(*consumerGroup).ownClient = true
return c, nil
return c, err
}

// NewConsumerGroupFromClient creates a new consumer group using the given client. It is still
// necessary to call Close() on the underlying client when shutting down this consumer.
// PLEASE NOTE: consumer groups can only re-use but not share clients.
func NewConsumerGroupFromClient(groupID string, client Client) (ConsumerGroup, error) {
// For clients passed in by the client, ensure we don't
// call Close() on it.
cli := &nopCloserClient{client}
return newConsumerGroup(groupID, cli)
}

func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) {
config := client.Config()
if !config.Version.IsAtLeast(V0_10_2_0) {
return nil, ConfigurationError("consumer groups require Version to be >= V0_10_2_0")
Expand Down Expand Up @@ -131,10 +134,8 @@ func (c *consumerGroup) Close() (err error) {
err = e
}

if c.ownClient {
if e := c.client.Close(); e != nil {
err = e
}
if e := c.client.Close(); e != nil {
err = e
}
})
return
Expand Down