Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Satisfy the error interface in create responses #1154

Merged
merged 2 commits into from
Feb 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO
}

if topicErr.Err != ErrNoError {
return topicErr.Err
return topicErr
}

return nil
Expand Down Expand Up @@ -354,7 +354,7 @@ func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [
}

if topicErr.Err != ErrNoError {
return topicErr.Err
return topicErr
}

return nil
Expand Down
39 changes: 35 additions & 4 deletions admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

import (
"errors"
"strings"
"testing"
)

Expand Down Expand Up @@ -105,7 +106,7 @@ func TestClusterAdminCreateTopicWithInvalidTopicDetail(t *testing.T) {
}
}

func TestClusterAdminCreateTopicWithDiffVersion(t *testing.T) {
func TestClusterAdminCreateTopicWithoutAuthorization(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

Expand All @@ -118,16 +119,17 @@ func TestClusterAdminCreateTopicWithDiffVersion(t *testing.T) {

config := NewConfig()
config.Version = V0_11_0_0

admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

err = admin.CreateTopic("my_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false)
if err != ErrInsufficientData {
err = admin.CreateTopic("_internal_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false)
want := "insufficient permissions to create topic with reserved prefix"
if !strings.HasSuffix(err.Error(), want) {
t.Fatal(err)
}

err = admin.Close()
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -297,6 +299,35 @@ func TestClusterAdminCreatePartitionsWithDiffVersion(t *testing.T) {
}
}

func TestClusterAdminCreatePartitionsWithoutAuthorization(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(seedBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
"CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
})

config := NewConfig()
config.Version = V1_0_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

err = admin.CreatePartitions("_internal_topic", 3, nil, false)
want := "insufficient permissions to create partition on topic with reserved prefix"
if !strings.HasSuffix(err.Error(), want) {
t.Fatal(err)
}
err = admin.Close()
if err != nil {
t.Fatal(err)
}
}

func TestClusterAdminDeleteRecords(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()
Expand Down
13 changes: 12 additions & 1 deletion create_partitions_response.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package sarama

import "time"
import (
"fmt"
"time"
)

type CreatePartitionsResponse struct {
ThrottleTime time.Duration
Expand Down Expand Up @@ -69,6 +72,14 @@ type TopicPartitionError struct {
ErrMsg *string
}

func (t *TopicPartitionError) Error() string {
text := t.Err.Error()
if t.ErrMsg != nil {
text = fmt.Sprintf("%s - %s", text, *t.ErrMsg)
}
return text
}

func (t *TopicPartitionError) encode(pe packetEncoder) error {
pe.putInt16(int16(t.Err))

Expand Down
24 changes: 24 additions & 0 deletions create_partitions_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,27 @@ func TestCreatePartitionsResponse(t *testing.T) {
t.Errorf("Decoding error: expected %v but got %v", decodedresp, resp)
}
}

func TestTopicPartitionError(t *testing.T) {
// Assert that TopicPartitionError satisfies error interface
var err error = &TopicPartitionError{
Err: ErrTopicAuthorizationFailed,
}

got := err.Error()
want := ErrTopicAuthorizationFailed.Error()
if got != want {
t.Errorf("TopicPartitionError.Error() = %v; want %v", got, want)
}

msg := "reason why topic authorization failed"
err = &TopicPartitionError{
Err: ErrTopicAuthorizationFailed,
ErrMsg: &msg,
}
got = err.Error()
want = ErrTopicAuthorizationFailed.Error() + " - " + msg
if got != want {
t.Errorf("TopicPartitionError.Error() = %v; want %v", got, want)
}
}
13 changes: 12 additions & 1 deletion create_topics_response.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package sarama

import "time"
import (
"fmt"
"time"
)

type CreateTopicsResponse struct {
Version int16
Expand Down Expand Up @@ -83,6 +86,14 @@ type TopicError struct {
ErrMsg *string
}

func (t *TopicError) Error() string {
text := t.Err.Error()
if t.ErrMsg != nil {
text = fmt.Sprintf("%s - %s", text, *t.ErrMsg)
}
return text
}

func (t *TopicError) encode(pe packetEncoder, version int16) error {
pe.putInt16(int16(t.Err))

Expand Down
24 changes: 24 additions & 0 deletions create_topics_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,27 @@ func TestCreateTopicsResponse(t *testing.T) {

testResponse(t, "version 2", resp, createTopicsResponseV2)
}

func TestTopicError(t *testing.T) {
// Assert that TopicError satisfies error interface
var err error = &TopicError{
Err: ErrTopicAuthorizationFailed,
}

got := err.Error()
want := ErrTopicAuthorizationFailed.Error()
if got != want {
t.Errorf("TopicError.Error() = %v; want %v", got, want)
}

msg := "reason why topic authorization failed"
err = &TopicError{
Err: ErrTopicAuthorizationFailed,
ErrMsg: &msg,
}
got = err.Error()
want = ErrTopicAuthorizationFailed.Error() + " - " + msg
if got != want {
t.Errorf("TopicError.Error() = %v; want %v", got, want)
}
}
21 changes: 20 additions & 1 deletion mockresponses.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

import (
"fmt"
"strings"
)

// TestReporter has methods matching go's testing.T to avoid importing
Expand Down Expand Up @@ -612,10 +613,20 @@ func NewMockCreateTopicsResponse(t TestReporter) *MockCreateTopicsResponse {

func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoder {
req := reqBody.(*CreateTopicsRequest)
res := &CreateTopicsResponse{}
res := &CreateTopicsResponse{
Version: req.Version,
}
res.TopicErrors = make(map[string]*TopicError)

for topic, _ := range req.TopicDetails {
if res.Version >= 1 && strings.HasPrefix(topic, "_") {
msg := "insufficient permissions to create topic with reserved prefix"
res.TopicErrors[topic] = &TopicError{
Err: ErrTopicAuthorizationFailed,
ErrMsg: &msg,
}
continue
}
res.TopicErrors[topic] = &TopicError{Err: ErrNoError}
}
return res
Expand Down Expand Up @@ -654,6 +665,14 @@ func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoder {
res.TopicPartitionErrors = make(map[string]*TopicPartitionError)

for topic, _ := range req.TopicPartitions {
if strings.HasPrefix(topic, "_") {
msg := "insufficient permissions to create partition on topic with reserved prefix"
res.TopicPartitionErrors[topic] = &TopicPartitionError{
Err: ErrTopicAuthorizationFailed,
ErrMsg: &msg,
}
continue
}
res.TopicPartitionErrors[topic] = &TopicPartitionError{Err: ErrNoError}
}
return res
Expand Down