Skip to content

Commit

Permalink
Add functional test for admin client quotas
Browse files Browse the repository at this point in the history
  • Loading branch information
agriffaut committed Dec 31, 2021
1 parent a1807bf commit cf02615
Showing 1 changed file with 125 additions and 0 deletions.
125 changes: 125 additions & 0 deletions functional_admin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
//go:build functional
// +build functional

package sarama

import (
"testing"
)

func TestFuncAdminQuotas(t *testing.T) {
checkKafkaVersion(t, "2.6.0.0")
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

kafkaVersion, err := ParseKafkaVersion(FunctionalTestEnv.KafkaVersion)
if err != nil {
t.Fatal(err)
}

config := NewTestConfig()
config.Version = kafkaVersion
adminClient, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
t.Fatal(err)
}

// Check that we can read the quotas, and that they are empty
quotas, err := adminClient.DescribeClientQuotas(nil, false)
if err != nil {
t.Fatal(err)
}
if len(quotas) != 0 {
t.Fatalf("Expected quotas to be empty at start, found: %v", quotas)
}

// Put a quota on default user
// /config/users/<default>
defaultUser := []QuotaEntityComponent{{
EntityType: QuotaEntityUser,
MatchType: QuotaMatchDefault,
}}
produceOp := ClientQuotasOp{
Key: "producer_byte_rate",
Value: 1024000,
}
if err = adminClient.AlterClientQuotas(defaultUser, produceOp, false); err != nil {
t.Fatal(err)
}

// Check that we now have a quota entry
quotas, err = adminClient.DescribeClientQuotas(nil, false)
if err != nil {
t.Fatal(err)
}
if len(quotas) == 0 {
t.Fatal("Expected not empty quotas")
}
if len(quotas) > 1 {
t.Fatalf("Expected one quota entry, found: %v", quotas)
}

// Put a quota on specific client-id for a specific user
// /config/users/<user>/clients/<client-id>
specificUserClientID := []QuotaEntityComponent{
{
EntityType: QuotaEntityUser,
MatchType: QuotaMatchExact,
Name: "sarama",
},
{
EntityType: QuotaEntityClientID,
MatchType: QuotaMatchExact,
Name: "sarama-consumer",
},
}
consumeOp := ClientQuotasOp{
Key: "consumer_byte_rate",
Value: 2048000,
}
if err = adminClient.AlterClientQuotas(specificUserClientID, consumeOp, false); err != nil {
t.Fatal(err)
}

// Check that we can query a specific quota entry
userFilter := QuotaFilterComponent{
EntityType: QuotaEntityUser,
MatchType: QuotaMatchExact,
Match: "sarama",
}
clientFilter := QuotaFilterComponent{
EntityType: QuotaEntityClientID,
MatchType: QuotaMatchExact,
Match: "sarama-consumer",
}
quotas, err = adminClient.DescribeClientQuotas([]QuotaFilterComponent{userFilter, clientFilter}, true)
if err != nil {
t.Fatal(err)
}
if len(quotas) == 0 {
t.Fatal("Expected not empty quotas")
}
if len(quotas) > 1 {
t.Fatalf("Expected one quota entry, found: %v", quotas)
}
if quotas[0].Values[consumeOp.Key] != consumeOp.Value {
t.Fatalf("Expected specific quota value to be %f, found: %v", consumeOp.Value, quotas[0].Values[consumeOp.Key])
}

// Remove quota entries
deleteProduceOp := ClientQuotasOp{
Key: produceOp.Key,
Remove: true,
}
if err = adminClient.AlterClientQuotas(defaultUser, deleteProduceOp, false); err != nil {
t.Fatal(err)
}

deleteConsumeOp := ClientQuotasOp{
Key: consumeOp.Key,
Remove: true,
}
if err = adminClient.AlterClientQuotas(specificUserClientID, deleteConsumeOp, false); err != nil {
t.Fatal(err)
}
}

0 comments on commit cf02615

Please sign in to comment.