Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for latest protocol messages #672

Merged
merged 1 commit into from
Jun 9, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions api_versions_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package sarama

type ApiVersionsRequest struct {
}

func (r *ApiVersionsRequest) encode(pe packetEncoder) error {
return nil
}

func (r *ApiVersionsRequest) decode(pd packetDecoder) (err error) {
return nil
}

func (r *ApiVersionsRequest) key() int16 {
return 18
}

func (r *ApiVersionsRequest) version() int16 {
return 0
}
14 changes: 14 additions & 0 deletions api_versions_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package sarama

import "testing"

var (
apiVersionRequest = []byte{}
)

func TestApiVersionsRequest(t *testing.T) {
var request *ApiVersionsRequest

request = new(ApiVersionsRequest)
testRequest(t, "basic", request, apiVersionRequest)
}
74 changes: 74 additions & 0 deletions api_versions_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package sarama

type ApiVersionsResponseBlock struct {
ApiKey int16
MinVersion int16
MaxVersion int16
}

func (r *ApiVersionsResponseBlock) encode(pe packetEncoder) error {
pe.putInt16(r.ApiKey)
pe.putInt16(r.MinVersion)
pe.putInt16(r.MaxVersion)
return nil
}

func (r *ApiVersionsResponseBlock) decode(pd packetDecoder) error {
var err error

if r.ApiKey, err = pd.getInt16(); err != nil {
return err
}

if r.MinVersion, err = pd.getInt16(); err != nil {
return err
}

if r.MaxVersion, err = pd.getInt16(); err != nil {
return err
}

return nil
}

type ApiVersionsResponse struct {
Err KError
ApiVersions []*ApiVersionsResponseBlock
}

func (r *ApiVersionsResponse) encode(pe packetEncoder) error {
pe.putInt16(int16(r.Err))
if err := pe.putArrayLength(len(r.ApiVersions)); err != nil {
return err
}
for _, apiVersion := range r.ApiVersions {
if err := apiVersion.encode(pe); err != nil {
return err
}
}
return nil
}

func (r *ApiVersionsResponse) decode(pd packetDecoder) error {
if kerr, err := pd.getInt16(); err != nil {
return err
} else {
r.Err = KError(kerr)
}

numBlocks, err := pd.getArrayLength()
if err != nil {
return err
}

r.ApiVersions = make([]*ApiVersionsResponseBlock, numBlocks)
for i := 0; i < numBlocks; i++ {
block := new(ApiVersionsResponseBlock)
if err := block.decode(pd); err != nil {
return err
}
r.ApiVersions[i] = block
}

return nil
}
32 changes: 32 additions & 0 deletions api_versions_response_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package sarama

import "testing"

var (
apiVersionResponse = []byte{
0x00, 0x00,
0x00, 0x00, 0x00, 0x01,
0x00, 0x03,
0x00, 0x02,
0x00, 0x01,
}
)

func TestApiVersionsResponse(t *testing.T) {
var response *ApiVersionsResponse

response = new(ApiVersionsResponse)
testDecodable(t, "no error", response, apiVersionResponse)
if response.Err != ErrNoError {
t.Error("Decoding error failed: no error expected but found", response.Err)
}
if response.ApiVersions[0].ApiKey != 0x03 {
t.Error("Decoding error: expected 0x03 but got", response.ApiVersions[0].ApiKey)
}
if response.ApiVersions[0].MinVersion != 0x02 {
t.Error("Decoding error: expected 0x02 but got", response.ApiVersions[0].MinVersion)
}
if response.ApiVersions[0].MaxVersion != 0x01 {
t.Error("Decoding error: expected 0x01 but got", response.ApiVersions[0].MaxVersion)
}
}
15 changes: 15 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ const (
ErrMessageSizeTooLarge KError = 10
ErrStaleControllerEpochCode KError = 11
ErrOffsetMetadataTooLarge KError = 12
ErrNetworkException KError = 13
ErrOffsetsLoadInProgress KError = 14
ErrConsumerCoordinatorNotAvailable KError = 15
ErrNotCoordinatorForConsumer KError = 16
Expand All @@ -103,6 +104,10 @@ const (
ErrTopicAuthorizationFailed KError = 29
ErrGroupAuthorizationFailed KError = 30
ErrClusterAuthorizationFailed KError = 31
ErrInvalidTimestamp KError = 32
ErrUnsupportedSASLMechanism KError = 33
ErrIllegalSASLState KError = 34
ErrUnsupportedVersion KError = 35
)

func (err KError) Error() string {
Expand Down Expand Up @@ -137,6 +142,8 @@ func (err KError) Error() string {
return "kafka server: StaleControllerEpochCode (internal error code for broker-to-broker communication)."
case ErrOffsetMetadataTooLarge:
return "kafka server: Specified a string larger than the configured maximum for offset metadata."
case ErrNetworkException:
return "kafka server: The server disconnected before a response was received."
case ErrOffsetsLoadInProgress:
return "kafka server: The broker is still loading offsets after a leader change for that offset's topic partition."
case ErrConsumerCoordinatorNotAvailable:
Expand Down Expand Up @@ -173,6 +180,14 @@ func (err KError) Error() string {
return "kafka server: The client is not authorized to access this group."
case ErrClusterAuthorizationFailed:
return "kafka server: The client is not authorized to send this request type."
case ErrInvalidTimestamp:
return "kafka server: The timestamp of the message is out of acceptable range."
case ErrUnsupportedSASLMechanism:
return "kafka server: The broker does not support the requested SASL mechanism."
case ErrIllegalSASLState:
return "kafka server: Request is not valid given the current SASL state."
case ErrUnsupportedVersion:
return "kafka server: The version of API is not supported."
}

return fmt.Sprintf("Unknown error, how did this happen? Error code = %d", err)
Expand Down
4 changes: 4 additions & 0 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ func allocateBody(key, version int16) requestBody {
return &DescribeGroupsRequest{}
case 16:
return &ListGroupsRequest{}
case 17:
return &SaslHandshakeRequest{}
case 18:
return &ApiVersionsRequest{}
}
return nil
}
29 changes: 29 additions & 0 deletions sasl_handshake_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package sarama

type SaslHandshakeRequest struct {
Mechanism string
}

func (r *SaslHandshakeRequest) encode(pe packetEncoder) error {
if err := pe.putString(r.Mechanism); err != nil {
return err
}

return nil
}

func (r *SaslHandshakeRequest) decode(pd packetDecoder) (err error) {
if r.Mechanism, err = pd.getString(); err != nil {
return err
}

return nil
}

func (r *SaslHandshakeRequest) key() int16 {
return 17
}

func (r *SaslHandshakeRequest) version() int16 {
return 0
}
17 changes: 17 additions & 0 deletions sasl_handshake_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package sarama

import "testing"

var (
baseSaslRequest = []byte{
0, 3, 'f', 'o', 'o', // Mechanism
}
)

func TestSaslHandshakeRequest(t *testing.T) {
var request *SaslHandshakeRequest

request = new(SaslHandshakeRequest)
request.Mechanism = "foo"
testRequest(t, "basic", request, baseSaslRequest)
}
26 changes: 26 additions & 0 deletions sasl_handshake_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package sarama

type SaslHandshakeResponse struct {
Err KError
EnabledMechanisms []string
}

func (r *SaslHandshakeResponse) encode(pe packetEncoder) error {
pe.putInt16(int16(r.Err))
return pe.putStringArray(r.EnabledMechanisms)
}

func (r *SaslHandshakeResponse) decode(pd packetDecoder) error {
if kerr, err := pd.getInt16(); err != nil {
return err
} else {
r.Err = KError(kerr)
}

var err error
if r.EnabledMechanisms, err = pd.getStringArray(); err != nil {
return err
}

return nil
}
24 changes: 24 additions & 0 deletions sasl_handshake_response_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package sarama

import "testing"

var (
saslHandshakeResponse = []byte{
0x00, 0x00,
0x00, 0x00, 0x00, 0x01,
0x00, 0x03, 'f', 'o', 'o',
}
)

func TestSaslHandshakeResponse(t *testing.T) {
var response *SaslHandshakeResponse

response = new(SaslHandshakeResponse)
testDecodable(t, "no error", response, saslHandshakeResponse)
if response.Err != ErrNoError {
t.Error("Decoding error failed: no error expected but found", response.Err)
}
if response.EnabledMechanisms[0] != "foo" {
t.Error("Decoding error failed: expected 'foo' but found", response.EnabledMechanisms)
}
}