Skip to content

Commit

Permalink
Refactor Consumer Group to use Client (#947)
Browse files Browse the repository at this point in the history
* refactor ConsumerGroup to use Client

* add test for readers sharing the default transport
  • Loading branch information
rhansen2 authored Sep 12, 2022
1 parent d4b89e7 commit ee37c7f
Show file tree
Hide file tree
Showing 11 changed files with 565 additions and 704 deletions.
67 changes: 18 additions & 49 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,15 @@ const (
ReadCommitted IsolationLevel = 1
)

var (
// DefaultClientID is the default value used as ClientID of kafka
// connections.
DefaultClientID string
)
// DefaultClientID is the default value used as ClientID of kafka
// connections.
var DefaultClientID string

func init() {
progname := filepath.Base(os.Args[0])
hostname, _ := os.Hostname()
DefaultClientID = fmt.Sprintf("%s@%s (github.com/segmentio/kafka-go)", progname, hostname)
DefaultTransport.(*Transport).ClientID = DefaultClientID
}

// NewConn returns a new kafka connection for the given topic and partition.
Expand Down Expand Up @@ -263,10 +262,12 @@ func (c *Conn) Controller() (broker Broker, err error) {
}
for _, brokerMeta := range res.Brokers {
if brokerMeta.NodeID == res.ControllerID {
broker = Broker{ID: int(brokerMeta.NodeID),
broker = Broker{
ID: int(brokerMeta.NodeID),
Port: int(brokerMeta.Port),
Host: brokerMeta.Host,
Rack: brokerMeta.Rack}
Rack: brokerMeta.Rack,
}
break
}
}
Expand Down Expand Up @@ -322,7 +323,6 @@ func (c *Conn) findCoordinator(request findCoordinatorRequestV0) (findCoordinato
err := c.readOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(findCoordinator, v0, id, request)

},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
Expand All @@ -340,32 +340,6 @@ func (c *Conn) findCoordinator(request findCoordinatorRequestV0) (findCoordinato
return response, nil
}

// heartbeat sends a heartbeat message required by consumer groups
//
// See http://kafka.apache.org/protocol.html#The_Messages_Heartbeat
func (c *Conn) heartbeat(request heartbeatRequestV0) (heartbeatResponseV0, error) {
var response heartbeatResponseV0

err := c.writeOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(heartbeat, v0, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
return (&response).readFrom(&c.rbuf, size)
}())
},
)
if err != nil {
return heartbeatResponseV0{}, err
}
if response.ErrorCode != 0 {
return heartbeatResponseV0{}, Error(response.ErrorCode)
}

return response, nil
}

// joinGroup attempts to join a consumer group
//
// See http://kafka.apache.org/protocol.html#The_Messages_JoinGroup
Expand Down Expand Up @@ -752,9 +726,8 @@ func (c *Conn) ReadBatch(minBytes, maxBytes int) *Batch {
// ReadBatchWith in every way is similar to ReadBatch. ReadBatch is configured
// with the default values in ReadBatchConfig except for minBytes and maxBytes.
func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {

var adjustedDeadline time.Time
var maxFetch = int(c.fetchMaxBytes)
maxFetch := int(c.fetchMaxBytes)

if cfg.MinBytes < 0 || cfg.MinBytes > maxFetch {
return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: minBytes of %d out of [1,%d] bounds", cfg.MinBytes, maxFetch)}
Expand Down Expand Up @@ -960,7 +933,6 @@ func (c *Conn) readOffset(t int64) (offset int64, err error) {
// connection. If there are none, the method fetches all partitions of the kafka
// cluster.
func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err error) {

if len(topics) == 0 {
if len(c.topic) != 0 {
defaultTopics := [...]string{c.topic}
Expand Down Expand Up @@ -1107,11 +1079,10 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
switch produceVersion {
case v7:
recordBatch, err :=
newRecordBatch(
codec,
msgs...,
)
recordBatch, err := newRecordBatch(
codec,
msgs...,
)
if err != nil {
return err
}
Expand All @@ -1126,11 +1097,10 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
recordBatch,
)
case v3:
recordBatch, err :=
newRecordBatch(
codec,
msgs...,
)
recordBatch, err := newRecordBatch(
codec,
msgs...,
)
if err != nil {
return err
}
Expand Down Expand Up @@ -1195,7 +1165,6 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
}
return size, err
}

})
if err != nil {
return size, err
Expand Down Expand Up @@ -1555,7 +1524,7 @@ func (c *Conn) saslAuthenticate(data []byte) ([]byte, error) {
return nil, err
}
if version == v1 {
var request = saslAuthenticateRequestV0{Data: data}
request := saslAuthenticateRequestV0{Data: data}
var response saslAuthenticateResponseV0

err := c.writeOperation(
Expand Down
Loading

0 comments on commit ee37c7f

Please sign in to comment.