From c5f42484292f88bbc1139403e6cdcf655495c155 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Thu, 9 Jun 2016 12:30:33 -0400 Subject: [PATCH] Add support for latest protocol messages Specifically `SaslHandshake` (17) and `ApiVersions` (18) as well as related errors. --- api_versions_request.go | 20 +++++++++ api_versions_request_test.go | 14 +++++++ api_versions_response.go | 74 +++++++++++++++++++++++++++++++++ api_versions_response_test.go | 32 ++++++++++++++ errors.go | 15 +++++++ request.go | 4 ++ sasl_handshake_request.go | 29 +++++++++++++ sasl_handshake_request_test.go | 17 ++++++++ sasl_handshake_response.go | 26 ++++++++++++ sasl_handshake_response_test.go | 24 +++++++++++ 10 files changed, 255 insertions(+) create mode 100644 api_versions_request.go create mode 100644 api_versions_request_test.go create mode 100644 api_versions_response.go create mode 100644 api_versions_response_test.go create mode 100644 sasl_handshake_request.go create mode 100644 sasl_handshake_request_test.go create mode 100644 sasl_handshake_response.go create mode 100644 sasl_handshake_response_test.go diff --git a/api_versions_request.go b/api_versions_request.go new file mode 100644 index 000000000..13be55184 --- /dev/null +++ b/api_versions_request.go @@ -0,0 +1,20 @@ +package sarama + +type ApiVersionsRequest struct { +} + +func (r *ApiVersionsRequest) encode(pe packetEncoder) error { + return nil +} + +func (r *ApiVersionsRequest) decode(pd packetDecoder) (err error) { + return nil +} + +func (r *ApiVersionsRequest) key() int16 { + return 18 +} + +func (r *ApiVersionsRequest) version() int16 { + return 0 +} diff --git a/api_versions_request_test.go b/api_versions_request_test.go new file mode 100644 index 000000000..5ab4fa71c --- /dev/null +++ b/api_versions_request_test.go @@ -0,0 +1,14 @@ +package sarama + +import "testing" + +var ( + apiVersionRequest = []byte{} +) + +func TestApiVersionsRequest(t *testing.T) { + var request *ApiVersionsRequest + + request = new(ApiVersionsRequest) + testRequest(t, "basic", request, apiVersionRequest) +} diff --git a/api_versions_response.go b/api_versions_response.go new file mode 100644 index 000000000..ad810a8d3 --- /dev/null +++ b/api_versions_response.go @@ -0,0 +1,74 @@ +package sarama + +type ApiVersionsResponseBlock struct { + ApiKey int16 + MinVersion int16 + MaxVersion int16 +} + +func (r *ApiVersionsResponseBlock) encode(pe packetEncoder) error { + pe.putInt16(r.ApiKey) + pe.putInt16(r.MinVersion) + pe.putInt16(r.MaxVersion) + return nil +} + +func (r *ApiVersionsResponseBlock) decode(pd packetDecoder) error { + var err error + + if r.ApiKey, err = pd.getInt16(); err != nil { + return err + } + + if r.MinVersion, err = pd.getInt16(); err != nil { + return err + } + + if r.MaxVersion, err = pd.getInt16(); err != nil { + return err + } + + return nil +} + +type ApiVersionsResponse struct { + Err KError + ApiVersions []*ApiVersionsResponseBlock +} + +func (r *ApiVersionsResponse) encode(pe packetEncoder) error { + pe.putInt16(int16(r.Err)) + if err := pe.putArrayLength(len(r.ApiVersions)); err != nil { + return err + } + for _, apiVersion := range r.ApiVersions { + if err := apiVersion.encode(pe); err != nil { + return err + } + } + return nil +} + +func (r *ApiVersionsResponse) decode(pd packetDecoder) error { + if kerr, err := pd.getInt16(); err != nil { + return err + } else { + r.Err = KError(kerr) + } + + numBlocks, err := pd.getArrayLength() + if err != nil { + return err + } + + r.ApiVersions = make([]*ApiVersionsResponseBlock, numBlocks) + for i := 0; i < numBlocks; i++ { + block := new(ApiVersionsResponseBlock) + if err := block.decode(pd); err != nil { + return err + } + r.ApiVersions[i] = block + } + + return nil +} diff --git a/api_versions_response_test.go b/api_versions_response_test.go new file mode 100644 index 000000000..6ca37e11e --- /dev/null +++ b/api_versions_response_test.go @@ -0,0 +1,32 @@ +package sarama + +import "testing" + +var ( + apiVersionResponse = []byte{ + 0x00, 0x00, + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x03, + 0x00, 0x02, + 0x00, 0x01, + } +) + +func TestApiVersionsResponse(t *testing.T) { + var response *ApiVersionsResponse + + response = new(ApiVersionsResponse) + testDecodable(t, "no error", response, apiVersionResponse) + if response.Err != ErrNoError { + t.Error("Decoding error failed: no error expected but found", response.Err) + } + if response.ApiVersions[0].ApiKey != 0x03 { + t.Error("Decoding error: expected 0x03 but got", response.ApiVersions[0].ApiKey) + } + if response.ApiVersions[0].MinVersion != 0x02 { + t.Error("Decoding error: expected 0x02 but got", response.ApiVersions[0].MinVersion) + } + if response.ApiVersions[0].MaxVersion != 0x01 { + t.Error("Decoding error: expected 0x01 but got", response.ApiVersions[0].MaxVersion) + } +} diff --git a/errors.go b/errors.go index a837087f1..cfb7006f7 100644 --- a/errors.go +++ b/errors.go @@ -85,6 +85,7 @@ const ( ErrMessageSizeTooLarge KError = 10 ErrStaleControllerEpochCode KError = 11 ErrOffsetMetadataTooLarge KError = 12 + ErrNetworkException KError = 13 ErrOffsetsLoadInProgress KError = 14 ErrConsumerCoordinatorNotAvailable KError = 15 ErrNotCoordinatorForConsumer KError = 16 @@ -103,6 +104,10 @@ const ( ErrTopicAuthorizationFailed KError = 29 ErrGroupAuthorizationFailed KError = 30 ErrClusterAuthorizationFailed KError = 31 + ErrInvalidTimestamp KError = 32 + ErrUnsupportedSASLMechanism KError = 33 + ErrIllegalSASLState KError = 34 + ErrUnsupportedVersion KError = 35 ) func (err KError) Error() string { @@ -137,6 +142,8 @@ func (err KError) Error() string { return "kafka server: StaleControllerEpochCode (internal error code for broker-to-broker communication)." case ErrOffsetMetadataTooLarge: return "kafka server: Specified a string larger than the configured maximum for offset metadata." + case ErrNetworkException: + return "kafka server: The server disconnected before a response was received." case ErrOffsetsLoadInProgress: return "kafka server: The broker is still loading offsets after a leader change for that offset's topic partition." case ErrConsumerCoordinatorNotAvailable: @@ -173,6 +180,14 @@ func (err KError) Error() string { return "kafka server: The client is not authorized to access this group." case ErrClusterAuthorizationFailed: return "kafka server: The client is not authorized to send this request type." + case ErrInvalidTimestamp: + return "kafka server: The timestamp of the message is out of acceptable range." + case ErrUnsupportedSASLMechanism: + return "kafka server: The broker does not support the requested SASL mechanism." + case ErrIllegalSASLState: + return "kafka server: Request is not valid given the current SASL state." + case ErrUnsupportedVersion: + return "kafka server: The version of API is not supported." } return fmt.Sprintf("Unknown error, how did this happen? Error code = %d", err) diff --git a/request.go b/request.go index b9f654ba2..4e887ebb8 100644 --- a/request.go +++ b/request.go @@ -107,6 +107,10 @@ func allocateBody(key, version int16) requestBody { return &DescribeGroupsRequest{} case 16: return &ListGroupsRequest{} + case 17: + return &SaslHandshakeRequest{} + case 18: + return &ApiVersionsRequest{} } return nil } diff --git a/sasl_handshake_request.go b/sasl_handshake_request.go new file mode 100644 index 000000000..3373bd100 --- /dev/null +++ b/sasl_handshake_request.go @@ -0,0 +1,29 @@ +package sarama + +type SaslHandshakeRequest struct { + Mechanism string +} + +func (r *SaslHandshakeRequest) encode(pe packetEncoder) error { + if err := pe.putString(r.Mechanism); err != nil { + return err + } + + return nil +} + +func (r *SaslHandshakeRequest) decode(pd packetDecoder) (err error) { + if r.Mechanism, err = pd.getString(); err != nil { + return err + } + + return nil +} + +func (r *SaslHandshakeRequest) key() int16 { + return 17 +} + +func (r *SaslHandshakeRequest) version() int16 { + return 0 +} diff --git a/sasl_handshake_request_test.go b/sasl_handshake_request_test.go new file mode 100644 index 000000000..806e628fd --- /dev/null +++ b/sasl_handshake_request_test.go @@ -0,0 +1,17 @@ +package sarama + +import "testing" + +var ( + baseSaslRequest = []byte{ + 0, 3, 'f', 'o', 'o', // Mechanism + } +) + +func TestSaslHandshakeRequest(t *testing.T) { + var request *SaslHandshakeRequest + + request = new(SaslHandshakeRequest) + request.Mechanism = "foo" + testRequest(t, "basic", request, baseSaslRequest) +} diff --git a/sasl_handshake_response.go b/sasl_handshake_response.go new file mode 100644 index 000000000..3113d069c --- /dev/null +++ b/sasl_handshake_response.go @@ -0,0 +1,26 @@ +package sarama + +type SaslHandshakeResponse struct { + Err KError + EnabledMechanisms []string +} + +func (r *SaslHandshakeResponse) encode(pe packetEncoder) error { + pe.putInt16(int16(r.Err)) + return pe.putStringArray(r.EnabledMechanisms) +} + +func (r *SaslHandshakeResponse) decode(pd packetDecoder) error { + if kerr, err := pd.getInt16(); err != nil { + return err + } else { + r.Err = KError(kerr) + } + + var err error + if r.EnabledMechanisms, err = pd.getStringArray(); err != nil { + return err + } + + return nil +} diff --git a/sasl_handshake_response_test.go b/sasl_handshake_response_test.go new file mode 100644 index 000000000..a8d80cc04 --- /dev/null +++ b/sasl_handshake_response_test.go @@ -0,0 +1,24 @@ +package sarama + +import "testing" + +var ( + saslHandshakeResponse = []byte{ + 0x00, 0x00, + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x03, 'f', 'o', 'o', + } +) + +func TestSaslHandshakeResponse(t *testing.T) { + var response *SaslHandshakeResponse + + response = new(SaslHandshakeResponse) + testDecodable(t, "no error", response, saslHandshakeResponse) + if response.Err != ErrNoError { + t.Error("Decoding error failed: no error expected but found", response.Err) + } + if response.EnabledMechanisms[0] != "foo" { + t.Error("Decoding error failed: expected 'foo' but found", response.EnabledMechanisms) + } +}