Skip to content

Commit

Permalink
Merge pull request #1218 from nrvnrvn/match-error-codes-and-descriptions
Browse files Browse the repository at this point in the history
Synced error names and descriptions with the kafka's protocol
  • Loading branch information
bai authored Jan 23, 2019
2 parents 0a21d90 + a0f0d8f commit 04e4286
Show file tree
Hide file tree
Showing 17 changed files with 104 additions and 104 deletions.
8 changes: 4 additions & 4 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (ca *clusterAdmin) Controller() (*Broker, error) {
func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {

if topic == "" {
return ErrInvalidTopic
return ErrInvalidTopicException
}

if detail == nil {
Expand Down Expand Up @@ -153,7 +153,7 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO
func (ca *clusterAdmin) DeleteTopic(topic string) error {

if topic == "" {
return ErrInvalidTopic
return ErrInvalidTopicException
}

request := &DeleteTopicsRequest{
Expand Down Expand Up @@ -188,7 +188,7 @@ func (ca *clusterAdmin) DeleteTopic(topic string) error {

func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
if topic == "" {
return ErrInvalidTopic
return ErrInvalidTopicException
}

topicPartitions := make(map[string]*TopicPartition)
Expand Down Expand Up @@ -224,7 +224,7 @@ func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [
func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {

if topic == "" {
return ErrInvalidTopic
return ErrInvalidTopicException
}

topics := make(map[string]*DeleteRecordsRequestTopic)
Expand Down
2 changes: 1 addition & 1 deletion admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func TestClusterAdminDeleteEmptyTopic(t *testing.T) {
}

err = admin.DeleteTopic("")
if err != ErrInvalidTopic {
if err != ErrInvalidTopicException {
t.Fatal(err)
}

Expand Down
6 changes: 3 additions & 3 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func (p *asyncProducer) dispatcher() {
continue
}
if msg.byteSize(version) > p.conf.Producer.MaxMessageBytes {
p.returnError(msg, ErrMessageSizeTooLarge)
p.returnError(msg, ErrMessageTooLarge)
continue
}

Expand Down Expand Up @@ -827,7 +827,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
case ErrDuplicateSequenceNumber:
bp.parent.returnSuccesses(pSet.msgs)
// Retriable errors
case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
case ErrCorruptMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
retryTopics = append(retryTopics, topic)
// Other non-retriable errors
Expand All @@ -852,7 +852,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
}

switch block.Err {
case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
case ErrCorruptMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
bp.broker.ID(), topic, partition, block.Err)
Expand Down
10 changes: 5 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ func (client *client) RefreshMetadata(topics ...string) error {
// off to Kafka. See: https://github.com/Shopify/sarama/pull/38#issuecomment-26362310
for _, topic := range topics {
if len(topic) == 0 {
return ErrInvalidTopic // this is the error that 0.8.2 and later correctly return
return ErrInvalidTopicException // this is the error that 0.8.2 and later correctly return
}
}

Expand Down Expand Up @@ -465,7 +465,7 @@ func (client *client) Coordinator(consumerGroup string) (*Broker, error) {
}

if coordinator == nil {
return nil, ErrConsumerCoordinatorNotAvailable
return nil, ErrCoordinatorNotAvailable
}

_ = coordinator.Open(client.conf)
Expand Down Expand Up @@ -790,7 +790,7 @@ func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bo
switch topic.Err {
case ErrNoError:
break
case ErrInvalidTopic, ErrTopicAuthorizationFailed: // don't retry, don't store partial results
case ErrInvalidTopicException, ErrTopicAuthorizationFailed: // don't retry, don't store partial results
err = topic.Err
continue
case ErrUnknownTopicOrPartition: // retry, do not store partial partition results
Expand Down Expand Up @@ -876,7 +876,7 @@ func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemainin
Logger.Printf("client/coordinator coordinator for consumergroup %s is #%d (%s)\n", consumerGroup, response.Coordinator.ID(), response.Coordinator.Addr())
return response, nil

case ErrConsumerCoordinatorNotAvailable:
case ErrCoordinatorNotAvailable:
Logger.Printf("client/coordinator coordinator for consumer group %s is not available\n", consumerGroup)

// This is very ugly, but this scenario will only happen once per cluster.
Expand All @@ -887,7 +887,7 @@ func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemainin
time.Sleep(2 * time.Second)
}

return retry(ErrConsumerCoordinatorNotAvailable)
return retry(ErrCoordinatorNotAvailable)
default:
return nil, response.Err
}
Expand Down
4 changes: 2 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) {
}

coordinatorResponse1 := new(ConsumerMetadataResponse)
coordinatorResponse1.Err = ErrConsumerCoordinatorNotAvailable
coordinatorResponse1.Err = ErrCoordinatorNotAvailable
seedBroker.Returns(coordinatorResponse1)

coordinatorResponse2 := new(ConsumerMetadataResponse)
Expand Down Expand Up @@ -581,7 +581,7 @@ func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) {
}

coordinatorResponse1 := new(ConsumerMetadataResponse)
coordinatorResponse1.Err = ErrConsumerCoordinatorNotAvailable
coordinatorResponse1.Err = ErrCoordinatorNotAvailable
seedBroker.Returns(coordinatorResponse1)

metadataResponse2 := new(MetadataResponse)
Expand Down
2 changes: 1 addition & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ type Config struct {
Default int32
// The maximum number of message bytes to fetch from the broker in a
// single request. Messages larger than this will return
// ErrMessageTooLarge and will not be consumable, so you must be sure
// ErrMessageSizeTooLarge and will not be consumable, so you must be sure
// this is at least as large as your largest message. Defaults to 0
// (no limit). Similar to the JVM's `fetch.message.max.bytes`. The
// global `sarama.MaxResponseSize` still applies.
Expand Down
2 changes: 1 addition & 1 deletion consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
if partialTrailingMessage {
if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
// we can't ask for more data, we've hit the configured limit
child.sendError(ErrMessageTooLarge)
child.sendError(ErrMessageSizeTooLarge)
child.offset++ // skip this one so we can keep processing future messages
} else {
child.fetchSize *= 2
Expand Down
8 changes: 4 additions & 4 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (c *consumerGroup) newSession(ctx context.Context, coordinator *Broker, top
switch join.Err {
case ErrNoError:
c.memberID = join.MemberId
case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
case ErrUnknownMemberID, ErrIllegalGeneration: // reset member ID and retry immediately
c.memberID = ""
return c.newSession(ctx, coordinator, topics, handler, retries)
case ErrRebalanceInProgress: // retry after backoff
Expand Down Expand Up @@ -234,7 +234,7 @@ func (c *consumerGroup) newSession(ctx context.Context, coordinator *Broker, top
}
switch sync.Err {
case ErrNoError:
case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
case ErrUnknownMemberID, ErrIllegalGeneration: // reset member ID and retry immediately
c.memberID = ""
return c.newSession(ctx, coordinator, topics, handler, retries)
case ErrRebalanceInProgress: // retry after backoff
Expand Down Expand Up @@ -366,7 +366,7 @@ func (c *consumerGroup) leave() error {

// Check response
switch resp.Err {
case ErrRebalanceInProgress, ErrUnknownMemberId, ErrNoError:
case ErrRebalanceInProgress, ErrUnknownMemberID, ErrNoError:
return nil
default:
return resp.Err
Expand Down Expand Up @@ -664,7 +664,7 @@ func (s *consumerGroupSession) heartbeatLoop() {
switch resp.Err {
case ErrNoError:
retries = s.parent.config.Metadata.Retry.Max
case ErrRebalanceInProgress, ErrUnknownMemberId, ErrIllegalGeneration:
case ErrRebalanceInProgress, ErrUnknownMemberID, ErrIllegalGeneration:
return
default:
s.parent.handleError(err, "", -1)
Expand Down
6 changes: 3 additions & 3 deletions consumer_metadata_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@ var (
)

func TestConsumerMetadataResponseError(t *testing.T) {
response := &ConsumerMetadataResponse{Err: ErrOffsetsLoadInProgress}
response := &ConsumerMetadataResponse{Err: ErrCoordinatorLoadInProgress}
testEncodable(t, "", response, consumerMetadataResponseError)

decodedResp := &ConsumerMetadataResponse{}
if err := versionedDecode(consumerMetadataResponseError, decodedResp, 0); err != nil {
t.Error("could not decode: ", err)
}

if decodedResp.Err != ErrOffsetsLoadInProgress {
t.Errorf("got %s, want %s", decodedResp.Err, ErrOffsetsLoadInProgress)
if decodedResp.Err != ErrCoordinatorLoadInProgress {
t.Errorf("got %s, want %s", decodedResp.Err, ErrCoordinatorLoadInProgress)
}
}

Expand Down
Loading

0 comments on commit 04e4286

Please sign in to comment.