Skip to content

Commit

Permalink
Merge branch 'master' into patch-1
Browse files Browse the repository at this point in the history
  • Loading branch information
milindl authored Jul 24, 2023
2 parents 759d84f + e5c618f commit 9e3ca7b
Show file tree
Hide file tree
Showing 36 changed files with 2,349 additions and 334 deletions.
8 changes: 1 addition & 7 deletions .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ agent:
global_job_config:
env_vars:
- name: LIBRDKAFKA_VERSION
value: v2.2.0-RC1
value: v2.2.0
prologue:
commands:
- checkout
Expand Down Expand Up @@ -109,17 +109,11 @@ blocks:
prologue:
commands:
# Install Go
- cache restore win-go-1.19
- "& .\\mk\\setup-go.ps1"
- cache delete win-go-1.19
- cache store win-go-1.19 ($env:USERPROFILE + '\go')
- cache restore msys2-x64
# Set up msys2
- ".\\mk\\mingw-w64\\setup-msys2.ps1"
- $env:PATH = 'C:\msys64\usr\bin;' + $env:PATH
- bash -lc './mk/mingw-w64/msys2-dependencies.sh'
- cache delete msys2-x64
- cache store msys2-x64 c:/msys64
jobs:
- name: "Static Build"
env_vars:
Expand Down
16 changes: 15 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,24 @@
# Confluent's Golang client for Apache Kafka

# v2.3.0

This is a maintenance release.

## Fixes

* Fixes a bug in the mock schema registry client where the wrong ID was being
returned for pre-registered schema (#971, @srlk).


# v2.2.0

This is a feature release.

* [KIP-339](https://cwiki.apache.org/confluence/display/KAFKA/KIP-339%3A+Create+a+new+IncrementalAlterConfigs+API)
IncrementalAlterConfigs API (#945).
* [KIP-554](https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API):
User SASL/SCRAM credentials alteration and description (#1004).

## Fixes

* Fixes a nil pointer bug in the protobuf `Serializer.Serialize()`, caused due to
Expand All @@ -16,7 +30,7 @@ This is a feature release.
* Deprecate m.LeaderEpoch in favor of m.TopicPartition.LeaderEpoch (#1012).

confluent-kafka-go is based on librdkafka v2.2.0, see the
[librdkafka v2.2.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.2.0-RC1)
[librdkafka v2.2.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.2.0)
for a complete list of changes, enhancements, fixes and upgrade considerations.


Expand Down
3 changes: 3 additions & 0 deletions examples/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ admin_delete_consumer_groups/admin_delete_consumer_groups
admin_delete_topics/admin_delete_topics
admin_describe_acls/admin_describe_acls
admin_describe_config/admin_describe_config
admin_incremental_alter_configs/admin_incremental_alter_configs
admin_describe_consumer_groups/admin_describe_consumer_groups
admin_list_consumer_groups/admin_list_consumer_groups
admin_list_consumer_group_offsets/admin_list_consumer_group_offsets
admin_describe_user_scram_credentials/admin_describe_user_scram_credentials
admin_alter_user_scram_credentials/admin_alter_user_scram_credentials
avro_generic_consumer_example/avro_generic_consumer_example
avro_generic_producer_example/avro_generic_producer_example
avro_specific_consumer_example/avro_specific_consumer_example
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/**
* Copyright 2023 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Alter user SCRAM credentials example
package main

import (
"context"
"fmt"
"os"
"strconv"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func usage(reason string) {
fmt.Fprintf(os.Stderr,
"Error: %s\n",
reason)
fmt.Fprintf(os.Stderr,
"Usage: %s <bootstrap-servers> "+
"UPSERT <user1> <mechanism1> "+
"<iterations1> <password1> <salt1> "+
"[UPSERT <user2> <mechanism2> <iterations2> "+
"<password2> <salt2> DELETE <user3> <mechanism3> ...]\n",
os.Args[0])
os.Exit(1)
}

func main() {
// 2 + variable arguments
nArgs := len(os.Args)

if nArgs < 2 {
usage("bootstrap-servers required")
}

if nArgs == 2 {
usage("at least one upsert/delete required")
}

bootstrapServers := os.Args[1]

// Create new AdminClient.
ac, err := kafka.NewAdminClient(&kafka.ConfigMap{
"bootstrap.servers": bootstrapServers,
})
if err != nil {
fmt.Printf("Failed to create Admin client: %s\n", err)
os.Exit(1)
}
defer ac.Close()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

var upsertions []kafka.UserScramCredentialUpsertion
var deletions []kafka.UserScramCredentialDeletion

i := 2
nAlterations := 0
for i < nArgs {
switch os.Args[i] {
case "UPSERT":
if i+5 >= nArgs {
usage(fmt.Sprintf(
"wrong argument count for alteration %d: expected 6, found %d",
nAlterations, nArgs-i))
}

user := os.Args[i+1]
mechanism, err := kafka.ScramMechanismFromString(os.Args[i+2])
if err != nil {
usage(err.Error())
}
iterations, err := strconv.Atoi(os.Args[i+3])
if err != nil {
usage(err.Error())
}
password := []byte(os.Args[i+4])
salt := []byte(os.Args[i+5])
// if salt is an empty string,
// set it to nil to generate it randomly.
if len(salt) == 0 {
salt = nil
}
upsertions = append(upsertions,
kafka.UserScramCredentialUpsertion{
User: user,
ScramCredentialInfo: kafka.ScramCredentialInfo{
Mechanism: mechanism,
Iterations: iterations,
},
Password: password,
Salt: salt,
})
i += 6
case "DELETE":
if i+2 >= nArgs {
usage(fmt.Sprintf(
"wrong argument count for alteration %d: expected 3, found %d",
nAlterations, nArgs-i))
}

user := os.Args[i+1]
mechanism, err := kafka.ScramMechanismFromString(os.Args[i+2])
if err != nil {
usage(err.Error())
}
deletions = append(deletions,
kafka.UserScramCredentialDeletion{
User: user,
Mechanism: mechanism,
})
i += 3
default:
usage(fmt.Sprintf("unknown alteration %s", os.Args[i]))
}
nAlterations++
}

alterRes, alterErr := ac.AlterUserScramCredentials(ctx, upsertions, deletions)
if alterErr != nil {
fmt.Printf("Failed to alter user scram credentials: %s\n", alterErr)
os.Exit(1)
}

for username, err := range alterRes.Errors {
fmt.Printf("Username: %s\n", username)
if err.Code() == kafka.ErrNoError {
fmt.Printf(" Success\n")
} else {
fmt.Printf(" Error[%d]: %s\n", err.Code(), err.String())
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/**
* Copyright 2023 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Describe user SCRAM credentials example
package main

import (
"context"
"fmt"
"os"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func usage(reason string) {
fmt.Fprintf(os.Stderr,
"Error: %s\n",
reason)
fmt.Fprintf(os.Stderr,
"Usage: %s <bootstrap-servers> <user1> <user2> ...\n",
os.Args[0])
os.Exit(1)
}

func main() {

// 2 + n arguments
nArgs := len(os.Args)

if nArgs < 2 {
usage("bootstrap-servers required")
}

bootstrapServers := os.Args[1]

// Create new AdminClient.
ac, err := kafka.NewAdminClient(&kafka.ConfigMap{
"bootstrap.servers": bootstrapServers,
})
if err != nil {
fmt.Printf("Failed to create Admin client: %s\n", err)
os.Exit(1)
}
defer ac.Close()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

describeRes, describeErr := ac.DescribeUserScramCredentials(ctx, os.Args[2:])
if describeErr != nil {
fmt.Printf("Failed to describe user scram credentials: %s\n", describeErr)
os.Exit(1)
}

for username, description := range describeRes.Descriptions {
fmt.Printf("Username: %s \n", username)
if description.Error.Code() == kafka.ErrNoError {
for i := 0; i < len(description.ScramCredentialInfos); i++ {
fmt.Printf(" Mechanism: %s Iterations: %d\n",
description.ScramCredentialInfos[i].Mechanism,
description.ScramCredentialInfos[i].Iterations)
}
} else {
fmt.Printf(" Error[%d]: %s\n",
description.Error.Code(), description.Error.String())
}
}

}
Loading

0 comments on commit 9e3ca7b

Please sign in to comment.