Skip to content

Commit

Permalink
Merge pull request #2096 from agriffaut/bug/issue-2095
Browse files Browse the repository at this point in the history
fix: ignore non-nil but empty error strings in Describe/Alter client quotas responses
  • Loading branch information
dnwe authored Jan 4, 2022
2 parents 543b4cb + cf02615 commit 5feff36
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 1 deletion.
5 changes: 4 additions & 1 deletion admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -1125,7 +1125,7 @@ func (ca *clusterAdmin) DescribeClientQuotas(components []QuotaFilterComponent,
return nil, err
}

if rsp.ErrorMsg != nil {
if rsp.ErrorMsg != nil && len(*rsp.ErrorMsg) > 0 {
return nil, errors.New(*rsp.ErrorMsg)
}
if rsp.ErrorCode != ErrNoError {
Expand Down Expand Up @@ -1157,6 +1157,9 @@ func (ca *clusterAdmin) AlterClientQuotas(entity []QuotaEntityComponent, op Clie
}

for _, entry := range rsp.Entries {
if entry.ErrorMsg != nil && len(*entry.ErrorMsg) > 0 {
return errors.New(*entry.ErrorMsg)
}
if entry.ErrorCode != ErrNoError {
return entry.ErrorCode
}
Expand Down
125 changes: 125 additions & 0 deletions functional_admin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
//go:build functional
// +build functional

package sarama

import (
"testing"
)

func TestFuncAdminQuotas(t *testing.T) {
checkKafkaVersion(t, "2.6.0.0")
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

kafkaVersion, err := ParseKafkaVersion(FunctionalTestEnv.KafkaVersion)
if err != nil {
t.Fatal(err)
}

config := NewTestConfig()
config.Version = kafkaVersion
adminClient, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
t.Fatal(err)
}

// Check that we can read the quotas, and that they are empty
quotas, err := adminClient.DescribeClientQuotas(nil, false)
if err != nil {
t.Fatal(err)
}
if len(quotas) != 0 {
t.Fatalf("Expected quotas to be empty at start, found: %v", quotas)
}

// Put a quota on default user
// /config/users/<default>
defaultUser := []QuotaEntityComponent{{
EntityType: QuotaEntityUser,
MatchType: QuotaMatchDefault,
}}
produceOp := ClientQuotasOp{
Key: "producer_byte_rate",
Value: 1024000,
}
if err = adminClient.AlterClientQuotas(defaultUser, produceOp, false); err != nil {
t.Fatal(err)
}

// Check that we now have a quota entry
quotas, err = adminClient.DescribeClientQuotas(nil, false)
if err != nil {
t.Fatal(err)
}
if len(quotas) == 0 {
t.Fatal("Expected not empty quotas")
}
if len(quotas) > 1 {
t.Fatalf("Expected one quota entry, found: %v", quotas)
}

// Put a quota on specific client-id for a specific user
// /config/users/<user>/clients/<client-id>
specificUserClientID := []QuotaEntityComponent{
{
EntityType: QuotaEntityUser,
MatchType: QuotaMatchExact,
Name: "sarama",
},
{
EntityType: QuotaEntityClientID,
MatchType: QuotaMatchExact,
Name: "sarama-consumer",
},
}
consumeOp := ClientQuotasOp{
Key: "consumer_byte_rate",
Value: 2048000,
}
if err = adminClient.AlterClientQuotas(specificUserClientID, consumeOp, false); err != nil {
t.Fatal(err)
}

// Check that we can query a specific quota entry
userFilter := QuotaFilterComponent{
EntityType: QuotaEntityUser,
MatchType: QuotaMatchExact,
Match: "sarama",
}
clientFilter := QuotaFilterComponent{
EntityType: QuotaEntityClientID,
MatchType: QuotaMatchExact,
Match: "sarama-consumer",
}
quotas, err = adminClient.DescribeClientQuotas([]QuotaFilterComponent{userFilter, clientFilter}, true)
if err != nil {
t.Fatal(err)
}
if len(quotas) == 0 {
t.Fatal("Expected not empty quotas")
}
if len(quotas) > 1 {
t.Fatalf("Expected one quota entry, found: %v", quotas)
}
if quotas[0].Values[consumeOp.Key] != consumeOp.Value {
t.Fatalf("Expected specific quota value to be %f, found: %v", consumeOp.Value, quotas[0].Values[consumeOp.Key])
}

// Remove quota entries
deleteProduceOp := ClientQuotasOp{
Key: produceOp.Key,
Remove: true,
}
if err = adminClient.AlterClientQuotas(defaultUser, deleteProduceOp, false); err != nil {
t.Fatal(err)
}

deleteConsumeOp := ClientQuotasOp{
Key: consumeOp.Key,
Remove: true,
}
if err = adminClient.AlterClientQuotas(specificUserClientID, deleteConsumeOp, false); err != nil {
t.Fatal(err)
}
}

0 comments on commit 5feff36

Please sign in to comment.