Skip to content

Commit

Permalink
fix: kafka controller does not panic on invalid kafka client config (#…
Browse files Browse the repository at this point in the history
…3938)

* fix: kafka controller does not panic on invalid kafka client config

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fix: consumergroup configs are validated as non-nil

Signed-off-by: Calum Murray <cmurray@redhat.com>

---------

Signed-off-by: Calum Murray <cmurray@redhat.com>
  • Loading branch information
Cali0707 committed Jul 23, 2024
1 parent 44f5263 commit 3d45d70
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ func (cts *ConsumerTemplateSpec) Validate(ctx context.Context) *apis.FieldError
cts.Spec.Delivery.Validate(specCtx).ViaField("delivery"),
cts.Spec.Subscriber.Validate(specCtx).ViaField("subscriber"),
)
if cts.Spec.Configs.Configs == nil {
err = err.Also(apis.ErrMissingField("spec.configs"))
}
return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ func TestConsumerGroup_Validate(t *testing.T) {
Host: "127.0.0.1",
},
},
Configs: ConsumerConfigs{
Configs: map[string]string{},
},
},
},
},
Expand Down Expand Up @@ -136,6 +139,9 @@ func TestConsumerGroup_Validate(t *testing.T) {
APIVersion: "flows.knative.dev/v1",
},
},
Configs: ConsumerConfigs{
Configs: map[string]string{},
},
},
},
},
Expand Down Expand Up @@ -191,6 +197,9 @@ func TestConsumerGroup_Validate(t *testing.T) {
Host: "127.0.0.1",
},
},
Configs: ConsumerConfigs{
Configs: map[string]string{},
},
},
},
},
Expand All @@ -215,6 +224,9 @@ func TestConsumerGroup_Validate(t *testing.T) {
Host: "127.0.0.1",
},
},
Configs: ConsumerConfigs{
Configs: map[string]string{},
},
},
},
},
Expand Down Expand Up @@ -243,6 +255,9 @@ func TestConsumerGroup_Validate(t *testing.T) {
Host: "127.0.0.1",
},
},
Configs: ConsumerConfigs{
Configs: map[string]string{},
},
},
},
},
Expand All @@ -267,6 +282,9 @@ func TestConsumerGroup_Validate(t *testing.T) {
Host: "127.0.0.1",
},
},
Configs: ConsumerConfigs{
Configs: map[string]string{},
},
},
},
},
Expand Down
4 changes: 2 additions & 2 deletions control-plane/pkg/reconciler/consumergroup/consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,10 +304,10 @@ func (r *Reconciler) deleteConsumerGroupMetadata(ctx context.Context, cg *kafkai
bootstrapServers := kafka.BootstrapServersArray(cg.Spec.Template.Spec.Configs.Configs["bootstrap.servers"])

kafkaClusterAdminClient, err := r.GetKafkaClusterAdmin(ctx, bootstrapServers, kafakSecret)
defer kafkaClusterAdminClient.Close()
if err != nil {
return fmt.Errorf("cannot obtain Kafka cluster admin, %w", err)
}
defer kafkaClusterAdminClient.Close()

groupId := cg.Spec.Template.Spec.Configs.Configs["group.id"]
if err = kafkaClusterAdminClient.DeleteConsumerGroup(groupId); err != nil && !errorIsOneOf(err, sarama.ErrUnknownTopicOrPartition, sarama.ErrGroupIDNotFound) {
Expand Down Expand Up @@ -599,10 +599,10 @@ func (r *Reconciler) reconcileInitialOffset(ctx context.Context, cg *kafkaintern
bootstrapServers := kafka.BootstrapServersArray(cg.Spec.Template.Spec.Configs.Configs["bootstrap.servers"])

kafkaClusterAdminClient, err := r.GetKafkaClusterAdmin(ctx, bootstrapServers, kafkaSecret)
defer kafkaClusterAdminClient.Close()
if err != nil {
return fmt.Errorf("cannot obtain Kafka cluster admin, %w", err)
}
defer kafkaClusterAdminClient.Close()

kafkaClient, err := r.GetKafkaClient(ctx, bootstrapServers, kafkaSecret)
if err != nil {
Expand Down

0 comments on commit 3d45d70

Please sign in to comment.