Skip to content

Commit

Permalink
Optimize experimental Kafka scaler and fix consumer group logic
Browse files Browse the repository at this point in the history
Signed-off-by: Adrien Fillon <adrien.fillon@manomano.com>
  • Loading branch information
adrien-f committed Apr 25, 2024
1 parent e0fb59e commit b1f5874
Showing 1 changed file with 50 additions and 30 deletions.
80 changes: 50 additions & 30 deletions pkg/scalers/apache_kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,58 +422,78 @@ func getApacheKafkaClient(ctx context.Context, metadata apacheKafkaMetadata, log
}

func (s *apacheKafkaScaler) getTopicPartitions(ctx context.Context) (map[string][]int, error) {
// in case of empty topic name, we will get all topics that the consumer group is subscribed to
if len(s.metadata.topic) == 0 {
return s.getTopicPartitionsFromConsumerGroup(ctx)
}

metadata, err := s.client.Metadata(ctx, &kafka.MetadataRequest{
Addr: s.client.Addr,
Addr: s.client.Addr,
Topics: s.metadata.topic,
})
if err != nil {
return nil, fmt.Errorf("error getting metadata: %w", err)
}
s.logger.V(1).Info(fmt.Sprintf("Listed topics %v", metadata.Topics))

if len(s.metadata.topic) == 0 {
// in case of empty topic name, we will get all topics that the consumer group is subscribed to
describeGrpReq := &kafka.DescribeGroupsRequest{
Addr: s.client.Addr,
GroupIDs: []string{
s.metadata.group,
},
}
describeGrp, err := s.client.DescribeGroups(ctx, describeGrpReq)
if err != nil {
return nil, fmt.Errorf("error describing group: %w", err)
}
if len(describeGrp.Groups[0].Members) == 0 {
return nil, fmt.Errorf("no active members in group %s, group-state is %s", s.metadata.group, describeGrp.Groups[0].GroupState)
}
s.logger.V(4).Info(fmt.Sprintf("Described group %s with response %v", s.metadata.group, describeGrp))

result := make(map[string][]int)
for _, topic := range metadata.Topics {
partitions := make([]int, 0)
result := make(map[string][]int)
for _, topic := range metadata.Topics {
partitions := make([]int, 0)
if kedautil.Contains(s.metadata.topic, topic.Name) {
for _, partition := range topic.Partitions {
// if no partitions limitatitions are specified, all partitions are considered
if (len(s.metadata.partitionLimitation) == 0) ||
(len(s.metadata.partitionLimitation) > 0 && kedautil.Contains(s.metadata.partitionLimitation, int32(partition.ID))) {
partitions = append(partitions, partition.ID)
}
}
result[topic.Name] = partitions
}
return result, nil
result[topic.Name] = partitions
}
return result, nil
}

func (s *apacheKafkaScaler) getTopicPartitionsFromConsumerGroup(ctx context.Context) (map[string][]int, error) {
describeGrp, err := s.client.DescribeGroups(ctx, &kafka.DescribeGroupsRequest{
Addr: s.client.Addr,
GroupIDs: []string{
s.metadata.group,
},
})

// Currently, the request could fail because of an unsupported version of the protocol
// See: https://github.com/segmentio/kafka-go/issues/1212
if err != nil {
return nil, fmt.Errorf("error describing group %s: %w", s.metadata.group, err)
}

if describeGrp.Groups[0].Error != nil {
return nil, fmt.Errorf("error describing group %s: %w", s.metadata.group, describeGrp.Groups[0].Error)
}

if len(describeGrp.Groups[0].Members) == 0 {
return nil, fmt.Errorf("no active members in group %s, group-state is %s", s.metadata.group, describeGrp.Groups[0].GroupState)
}

s.logger.V(4).Info(fmt.Sprintf("Described group %s with response %v", s.metadata.group, describeGrp))

// get all topics that the consumer group is subscribed to
result := make(map[string][]int)
for _, topic := range metadata.Topics {
partitions := make([]int, 0)
if kedautil.Contains(s.metadata.topic, topic.Name) {
for _, member := range describeGrp.Groups[0].Members {
for _, topic := range member.MemberAssignments.Topics {
if _, found := result[topic.Topic]; !found {
result[topic.Topic] = []int{}
}

for _, partition := range topic.Partitions {
// if no partitions limitations are specified, all partitions are considered
if (len(s.metadata.partitionLimitation) == 0) ||
(len(s.metadata.partitionLimitation) > 0 && kedautil.Contains(s.metadata.partitionLimitation, int32(partition.ID))) {
partitions = append(partitions, partition.ID)
(len(s.metadata.partitionLimitation) > 0 && kedautil.Contains(s.metadata.partitionLimitation, int32(partition))) {
result[topic.Topic] = append(result[topic.Topic], partition)
}
}
}
result[topic.Name] = partitions
}

return result, nil
}

Expand Down

0 comments on commit b1f5874

Please sign in to comment.