Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize experimental Kafka scaler and fix consumer group logic #5697

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 50 additions & 30 deletions pkg/scalers/apache_kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,58 +422,78 @@ func getApacheKafkaClient(ctx context.Context, metadata apacheKafkaMetadata, log
}

func (s *apacheKafkaScaler) getTopicPartitions(ctx context.Context) (map[string][]int, error) {
// in case of empty topic name, we will get all topics that the consumer group is subscribed to
if len(s.metadata.topic) == 0 {
return s.getTopicPartitionsFromConsumerGroup(ctx)
}

metadata, err := s.client.Metadata(ctx, &kafka.MetadataRequest{
Addr: s.client.Addr,
Addr: s.client.Addr,
Topics: s.metadata.topic,
})
if err != nil {
return nil, fmt.Errorf("error getting metadata: %w", err)
}
s.logger.V(1).Info(fmt.Sprintf("Listed topics %v", metadata.Topics))

if len(s.metadata.topic) == 0 {
// in case of empty topic name, we will get all topics that the consumer group is subscribed to
describeGrpReq := &kafka.DescribeGroupsRequest{
Addr: s.client.Addr,
GroupIDs: []string{
s.metadata.group,
},
}
describeGrp, err := s.client.DescribeGroups(ctx, describeGrpReq)
if err != nil {
return nil, fmt.Errorf("error describing group: %w", err)
}
if len(describeGrp.Groups[0].Members) == 0 {
return nil, fmt.Errorf("no active members in group %s, group-state is %s", s.metadata.group, describeGrp.Groups[0].GroupState)
}
s.logger.V(4).Info(fmt.Sprintf("Described group %s with response %v", s.metadata.group, describeGrp))

result := make(map[string][]int)
for _, topic := range metadata.Topics {
partitions := make([]int, 0)
result := make(map[string][]int)
for _, topic := range metadata.Topics {
partitions := make([]int, 0)
if kedautil.Contains(s.metadata.topic, topic.Name) {
for _, partition := range topic.Partitions {
// if no partitions limitatitions are specified, all partitions are considered
if (len(s.metadata.partitionLimitation) == 0) ||
(len(s.metadata.partitionLimitation) > 0 && kedautil.Contains(s.metadata.partitionLimitation, int32(partition.ID))) {
partitions = append(partitions, partition.ID)
}
}
result[topic.Name] = partitions
}
return result, nil
result[topic.Name] = partitions
}
return result, nil
}

func (s *apacheKafkaScaler) getTopicPartitionsFromConsumerGroup(ctx context.Context) (map[string][]int, error) {
describeGrp, err := s.client.DescribeGroups(ctx, &kafka.DescribeGroupsRequest{
Addr: s.client.Addr,
GroupIDs: []string{
s.metadata.group,
},
})

// Currently, the request could fail because of an unsupported version of the protocol
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is a known error, can we put a link to the issue in the comment so that we can easily trace it back in the future ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done 👍

// See: https://github.com/segmentio/kafka-go/issues/1212
if err != nil {
return nil, fmt.Errorf("error describing group %s: %w", s.metadata.group, err)
}

if describeGrp.Groups[0].Error != nil {
return nil, fmt.Errorf("error describing group %s: %w", s.metadata.group, describeGrp.Groups[0].Error)
}

if len(describeGrp.Groups[0].Members) == 0 {
return nil, fmt.Errorf("no active members in group %s, group-state is %s", s.metadata.group, describeGrp.Groups[0].GroupState)
}

s.logger.V(4).Info(fmt.Sprintf("Described group %s with response %v", s.metadata.group, describeGrp))

// get all topics that the consumer group is subscribed to
result := make(map[string][]int)
for _, topic := range metadata.Topics {
partitions := make([]int, 0)
if kedautil.Contains(s.metadata.topic, topic.Name) {
for _, member := range describeGrp.Groups[0].Members {
for _, topic := range member.MemberAssignments.Topics {
if _, found := result[topic.Topic]; !found {
result[topic.Topic] = []int{}
}

for _, partition := range topic.Partitions {
// if no partitions limitations are specified, all partitions are considered
if (len(s.metadata.partitionLimitation) == 0) ||
(len(s.metadata.partitionLimitation) > 0 && kedautil.Contains(s.metadata.partitionLimitation, int32(partition.ID))) {
partitions = append(partitions, partition.ID)
(len(s.metadata.partitionLimitation) > 0 && kedautil.Contains(s.metadata.partitionLimitation, int32(partition))) {
result[topic.Topic] = append(result[topic.Topic], partition)
}
}
}
result[topic.Name] = partitions
}

return result, nil
}

Expand Down
Loading