Skip to content

Commit

Permalink
kip-394: support static membership
Browse files Browse the repository at this point in the history
  • Loading branch information
aiquestion committed May 16, 2022
1 parent 54a84a7 commit c7f1512
Show file tree
Hide file tree
Showing 32 changed files with 1,727 additions and 200 deletions.
31 changes: 29 additions & 2 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ type ClusterAdmin interface {
// locally cached value if it's available.
Controller() (*Broker, error)

// Remove members from the consumer group by given member identities.
// This operation is supported by brokers with version 2.3 or higher
// This is for static membership feature. KIP-345
RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (*LeaveGroupResponse, error)

// Close shuts down the admin and closes underlying client.
Close() error
}
Expand Down Expand Up @@ -900,9 +905,13 @@ func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*Group
}

for broker, brokerGroups := range groupsPerBroker {
response, err := broker.DescribeGroups(&DescribeGroupsRequest{
describeReq := &DescribeGroupsRequest{
Groups: brokerGroups,
})
}
if ca.conf.Version.IsAtLeast(V2_3_0_0) {
describeReq.Version = 4
}
response, err := broker.DescribeGroups(describeReq)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1196,3 +1205,21 @@ func (ca *clusterAdmin) AlterClientQuotas(entity []QuotaEntityComponent, op Clie

return nil
}

func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (*LeaveGroupResponse, error) {
controller, err := ca.client.Coordinator(groupId)
if err != nil {
return nil, err
}
request := &LeaveGroupRequest{
Version: 3,
GroupId: groupId,
}
for _, instanceId := range groupInstanceIds {
groupInstanceId := instanceId
request.Members = append(request.Members, MemberIdentity{
GroupInstanceId: &groupInstanceId,
})
}
return controller.LeaveGroup(request)
}
31 changes: 31 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,8 @@ type Config struct {
// coordinator for the group.
UserData []byte
}
// support KIP-345
InstanceId string
}

Retry struct {
Expand Down Expand Up @@ -509,6 +511,7 @@ func NewConfig() *Config {

// Validate checks a Config instance. It will return a
// ConfigurationError if the specified values don't make sense.
//nolint:gocyclo // This function's cyclomatic complexity has go beyond 100
func (c *Config) Validate() error {
// some configuration values should be warned on but not fail completely, do those first
if !c.Net.TLS.Enable && c.Net.TLS.Config != nil {
Expand Down Expand Up @@ -754,6 +757,14 @@ func (c *Config) Validate() error {
case c.Consumer.Group.Rebalance.Retry.Backoff < 0:
return ConfigurationError("Consumer.Group.Rebalance.Retry.Backoff must be >= 0")
}
if c.Consumer.Group.InstanceId != "" {
if !c.Version.IsAtLeast(V2_3_0_0) {
return ConfigurationError("Consumer.Group.InstanceId need Version >= 2.3")
}
if err := validateGroupInstanceId(c.Consumer.Group.InstanceId); err != nil {
return err
}
}

// validate misc shared values
switch {
Expand All @@ -778,3 +789,23 @@ func (c *Config) getDialer() proxy.Dialer {
}
}
}

const MAX_GROUP_INSTANCE_ID_LENGTH = 249

var GROUP_INSTANCE_ID_REGEXP = regexp.MustCompile(`^[0-9a-zA-Z\._\-]+$`)

func validateGroupInstanceId(id string) error {
if id == "" {
return ConfigurationError("Group instance id must be non-empty string")
}
if id == "." || id == ".." {
return ConfigurationError(`Group instance id cannot be "." or ".."`)
}
if len(id) > MAX_GROUP_INSTANCE_ID_LENGTH {
return ConfigurationError(fmt.Sprintf(`Group instance id cannot be longer than %v, characters: %s`, MAX_GROUP_INSTANCE_ID_LENGTH, id))
}
if !GROUP_INSTANCE_ID_REGEXP.MatchString(id) {
return ConfigurationError(fmt.Sprintf(`Group instance id %s is illegal, it contains a character other than, '.', '_' and '-'`, id))
}
return nil
}
44 changes: 44 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sarama
import (
"errors"
"os"
"strings"
"testing"

"github.com/rcrowley/go-metrics"
Expand Down Expand Up @@ -501,6 +502,49 @@ func TestZstdConfigValidation(t *testing.T) {
}
}

func TestValidGroupInstanceId(t *testing.T) {
tests := []struct {
grouptInstanceId string
shouldHaveErr bool
}{
{"groupInstanceId1", false},
{"", true},
{".", true},
{"..", true},
{strings.Repeat("a", 250), true},
{"group_InstanceId.1", false},
{"group-InstanceId1", false},
{"group#InstanceId1", true},
}
for _, testcase := range tests {
err := validateGroupInstanceId(testcase.grouptInstanceId)
if !testcase.shouldHaveErr {
if err != nil {
t.Errorf("Expected validGroupInstanceId %s to pass, got error %v", testcase.grouptInstanceId, err)
}
} else {
if err == nil {
t.Errorf("Expected validGroupInstanceId %s to be error, got nil", testcase.grouptInstanceId)
}
if _, ok := err.(ConfigurationError); !ok {
t.Errorf("Excepted err to be ConfigurationError, got %v", err)
}
}
}
}

func TestGroupInstanceIdAndVersionValidation(t *testing.T) {
config := NewTestConfig()
config.Consumer.Group.InstanceId = "groupInstanceId1"
if err := config.Validate(); string(err.(ConfigurationError)) != "Consumer.Group.InstanceId need Version >= 2.3" {
t.Error("Expected invalid zstd/kafka version error, got ", err)
}
config.Version = V2_3_0_0
if err := config.Validate(); err != nil {
t.Error("Expected zstd to work, got ", err)
}
}

// This example shows how to integrate with an existing registry as well as publishing metrics
// on the standard output
func ExampleConfig_metrics() {
Expand Down
102 changes: 75 additions & 27 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,12 @@ type ConsumerGroup interface {
type consumerGroup struct {
client Client

config *Config
consumer Consumer
groupID string
memberID string
errors chan error
config *Config
consumer Consumer
groupID string
groupInstanceId *string
memberID string
errors chan error

lock sync.Mutex
closed chan none
Expand Down Expand Up @@ -127,15 +128,19 @@ func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) {
return nil, err
}

return &consumerGroup{
cg := &consumerGroup{
client: client,
consumer: consumer,
config: config,
groupID: groupID,
errors: make(chan error, config.ChannelBufferSize),
closed: make(chan none),
userData: config.Consumer.Group.Member.UserData,
}, nil
}
if client.Config().Consumer.Group.InstanceId != "" && config.Version.IsAtLeast(V2_3_0_0) {
cg.groupInstanceId = &client.Config().Consumer.Group.InstanceId
}
return cg, nil
}

// Errors implements ConsumerGroup.
Expand Down Expand Up @@ -300,6 +305,19 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
return nil, join.Err
}
return c.retryNewSession(ctx, topics, handler, retries, true)
case ErrMemberIdRequired:
// from JoinGroupRequest v4, if client start with empty member id,
// it need to get member id from response and send another join request to join group
if retries <= 0 {
return nil, join.Err
}
c.memberID = join.MemberId
return c.retryNewSession(ctx, topics, handler, retries, false)
case ErrFencedInstancedId:
if c.groupInstanceId != nil {
Logger.Printf("JoinGroup failed: group instance id %s has been fenced\n", *c.groupInstanceId)
}
return nil, join.Err
default:
return nil, join.Err
}
Expand Down Expand Up @@ -348,6 +366,11 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
return nil, groupRequest.Err
}
return c.retryNewSession(ctx, topics, handler, retries, true)
case ErrFencedInstancedId:
if c.groupInstanceId != nil {
Logger.Printf("JoinGroup failed: group instance id %s has been fenced\n", *c.groupInstanceId)
}
return nil, groupRequest.Err
default:
return nil, groupRequest.Err
}
Expand Down Expand Up @@ -389,6 +412,10 @@ func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (
req.Version = 1
req.RebalanceTimeout = int32(c.config.Consumer.Group.Rebalance.Timeout / time.Millisecond)
}
if c.groupInstanceId != nil {
req.Version = 5
req.GroupInstanceId = c.groupInstanceId
}

meta := &ConsumerGroupMemberMetadata{
Topics: topics,
Expand All @@ -409,6 +436,10 @@ func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrate
GenerationId: generationID,
}
strategy := c.config.Consumer.Group.Rebalance.Strategy
if c.groupInstanceId != nil {
req.Version = 3
req.GroupInstanceId = c.groupInstanceId
}
for memberID, topics := range plan {
assignment := &ConsumerGroupMemberAssignment{Topics: topics}
userDataBytes, err := strategy.AssignmentData(memberID, topics, generationID)
Expand All @@ -429,6 +460,10 @@ func (c *consumerGroup) heartbeatRequest(coordinator *Broker, memberID string, g
MemberId: memberID,
GenerationId: generationID,
}
if c.groupInstanceId != nil {
req.Version = 3
req.GroupInstanceId = c.groupInstanceId
}

return coordinator.Heartbeat(req)
}
Expand Down Expand Up @@ -466,25 +501,32 @@ func (c *consumerGroup) leave() error {
return err
}

resp, err := coordinator.LeaveGroup(&LeaveGroupRequest{
GroupId: c.groupID,
MemberId: c.memberID,
})
if err != nil {
_ = coordinator.Close()
return err
}
// KIP-345 if groupInstanceId is set, don not leave group when consumer closed.
// Since we do not discover ApiVersion for brokers, LeaveGroupRequest still use the old version request for now
if c.groupInstanceId == nil {
resp, err := coordinator.LeaveGroup(&LeaveGroupRequest{
GroupId: c.groupID,
MemberId: c.memberID,
})
if err != nil {
_ = coordinator.Close()
return err
}

// Unset memberID
c.memberID = ""
// Unset memberID
c.memberID = ""

// Check response
switch resp.Err {
case ErrRebalanceInProgress, ErrUnknownMemberId, ErrNoError:
return nil
default:
return resp.Err
// Check response
switch resp.Err {
case ErrRebalanceInProgress, ErrUnknownMemberId, ErrNoError:
return nil
default:
return resp.Err
}
} else {
c.memberID = ""
}
return nil
}

func (c *consumerGroup) handleError(err error, topic string, partition int32) {
Expand Down Expand Up @@ -628,15 +670,15 @@ type consumerGroupSession struct {
}

func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) {
// init context
ctx, cancel := context.WithCancel(ctx)

// init offset manager
offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client)
offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client, cancel)
if err != nil {
return nil, err
}

// init context
ctx, cancel := context.WithCancel(ctx)

// init session
sess := &consumerGroupSession{
parent: parent,
Expand Down Expand Up @@ -862,6 +904,12 @@ func (s *consumerGroupSession) heartbeatLoop() {
s.cancel()
case ErrUnknownMemberId, ErrIllegalGeneration:
return
case ErrFencedInstancedId:
if s.parent.groupInstanceId != nil {
Logger.Printf("JoinGroup failed: group instance id %s has been fenced\n", *s.parent.groupInstanceId)
}
s.parent.handleError(resp.Err, "", -1)
return
default:
s.parent.handleError(resp.Err, "", -1)
return
Expand Down
Loading

0 comments on commit c7f1512

Please sign in to comment.