From 66abce6c253951dcc5a001492683686322d4a363 Mon Sep 17 00:00:00 2001 From: Conor Mongey Date: Wed, 12 Dec 2018 18:03:49 +0000 Subject: [PATCH] Implements v1 of {Create,Describe,Delete}AclRequest --- acl_bindings.go | 26 +++++++++++++++++----- acl_create_request.go | 17 +++++++++----- acl_create_request_test.go | 34 +++++++++++++++++++++++++++- acl_delete_request.go | 13 +++++++++-- acl_delete_request_test.go | 43 ++++++++++++++++++++++++++++++++++++ acl_delete_response.go | 13 ++++++----- acl_describe_request.go | 13 +++++++++-- acl_describe_request_test.go | 34 ++++++++++++++++++++++++++-- acl_describe_response.go | 12 +++++++--- acl_filter.go | 29 +++++++++++++++++++----- acl_types.go | 12 ++++++++++ 11 files changed, 214 insertions(+), 32 deletions(-) diff --git a/acl_bindings.go b/acl_bindings.go index 51517359a..b91c28202 100644 --- a/acl_bindings.go +++ b/acl_bindings.go @@ -1,17 +1,26 @@ package sarama type Resource struct { - ResourceType AclResourceType - ResourceName string + ResourceType AclResourceType + ResourceName string + ResoucePatternType AclResourcePatternType } -func (r *Resource) encode(pe packetEncoder) error { +func (r *Resource) encode(pe packetEncoder, version int16) error { pe.putInt8(int8(r.ResourceType)) if err := pe.putString(r.ResourceName); err != nil { return err } + if version == 1 { + if r.ResoucePatternType == AclPatternUnknown { + Logger.Print("Cannot encode an unknown resource pattern type, using Literal instead") + r.ResoucePatternType = AclPatternLiteral + } + pe.putInt8(int8(r.ResoucePatternType)) + } + return nil } @@ -25,6 +34,13 @@ func (r *Resource) decode(pd packetDecoder, version int16) (err error) { if r.ResourceName, err = pd.getString(); err != nil { return err } + if version == 1 { + pattern, err := pd.getInt8() + if err != nil { + return err + } + r.ResoucePatternType = AclResourcePatternType(pattern) + } return nil } @@ -80,8 +96,8 @@ type ResourceAcls struct { Acls []*Acl } -func (r *ResourceAcls) encode(pe packetEncoder) error { - if err := r.Resource.encode(pe); err != nil { +func (r *ResourceAcls) encode(pe packetEncoder, version int16) error { + if err := r.Resource.encode(pe, version); err != nil { return err } diff --git a/acl_create_request.go b/acl_create_request.go index 0b6ecbec3..d3d5ad8d9 100644 --- a/acl_create_request.go +++ b/acl_create_request.go @@ -1,6 +1,7 @@ package sarama type CreateAclsRequest struct { + Version int16 AclCreations []*AclCreation } @@ -10,7 +11,7 @@ func (c *CreateAclsRequest) encode(pe packetEncoder) error { } for _, aclCreation := range c.AclCreations { - if err := aclCreation.encode(pe); err != nil { + if err := aclCreation.encode(pe, c.Version); err != nil { return err } } @@ -19,6 +20,7 @@ func (c *CreateAclsRequest) encode(pe packetEncoder) error { } func (c *CreateAclsRequest) decode(pd packetDecoder, version int16) (err error) { + c.Version = version n, err := pd.getArrayLength() if err != nil { return err @@ -41,11 +43,16 @@ func (d *CreateAclsRequest) key() int16 { } func (d *CreateAclsRequest) version() int16 { - return 0 + return d.Version } func (d *CreateAclsRequest) requiredVersion() KafkaVersion { - return V0_11_0_0 + switch d.Version { + case 1: + return V2_0_0_0 + default: + return V0_11_0_0 + } } type AclCreation struct { @@ -53,8 +60,8 @@ type AclCreation struct { Acl } -func (a *AclCreation) encode(pe packetEncoder) error { - if err := a.Resource.encode(pe); err != nil { +func (a *AclCreation) encode(pe packetEncoder, version int16) error { + if err := a.Resource.encode(pe, version); err != nil { return err } if err := a.Acl.encode(pe); err != nil { diff --git a/acl_create_request_test.go b/acl_create_request_test.go index fb4b35c16..b47453ac2 100644 --- a/acl_create_request_test.go +++ b/acl_create_request_test.go @@ -12,10 +12,21 @@ var ( 2, // all 2, // deny } + aclCreateRequestv1 = []byte{ + 0, 0, 0, 1, + 3, // resource type = group + 0, 5, 'g', 'r', 'o', 'u', 'p', + 3, // resource pattten type = literal + 0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l', + 0, 4, 'h', 'o', 's', 't', + 2, // all + 2, // deny + } ) -func TestCreateAclsRequest(t *testing.T) { +func TestCreateAclsRequestv0(t *testing.T) { req := &CreateAclsRequest{ + Version: 0, AclCreations: []*AclCreation{{ Resource: Resource{ ResourceType: AclResourceGroup, @@ -32,3 +43,24 @@ func TestCreateAclsRequest(t *testing.T) { testRequest(t, "create request", req, aclCreateRequest) } + +func TestCreateAclsRequestv1(t *testing.T) { + req := &CreateAclsRequest{ + Version: 1, + AclCreations: []*AclCreation{{ + Resource: Resource{ + ResourceType: AclResourceGroup, + ResourceName: "group", + ResoucePatternType: AclPatternLiteral, + }, + Acl: Acl{ + Principal: "principal", + Host: "host", + Operation: AclOperationAll, + PermissionType: AclPermissionDeny, + }}, + }, + } + + testRequest(t, "create request v1", req, aclCreateRequestv1) +} diff --git a/acl_delete_request.go b/acl_delete_request.go index 4133dceab..5e94ad733 100644 --- a/acl_delete_request.go +++ b/acl_delete_request.go @@ -1,6 +1,7 @@ package sarama type DeleteAclsRequest struct { + Version int Filters []*AclFilter } @@ -10,6 +11,7 @@ func (d *DeleteAclsRequest) encode(pe packetEncoder) error { } for _, filter := range d.Filters { + filter.Version = d.Version if err := filter.encode(pe); err != nil { return err } @@ -19,6 +21,7 @@ func (d *DeleteAclsRequest) encode(pe packetEncoder) error { } func (d *DeleteAclsRequest) decode(pd packetDecoder, version int16) (err error) { + d.Version = int(version) n, err := pd.getArrayLength() if err != nil { return err @@ -27,6 +30,7 @@ func (d *DeleteAclsRequest) decode(pd packetDecoder, version int16) (err error) d.Filters = make([]*AclFilter, n) for i := 0; i < n; i++ { d.Filters[i] = new(AclFilter) + d.Filters[i].Version = int(version) if err := d.Filters[i].decode(pd, version); err != nil { return err } @@ -40,9 +44,14 @@ func (d *DeleteAclsRequest) key() int16 { } func (d *DeleteAclsRequest) version() int16 { - return 0 + return int16(d.Version) } func (d *DeleteAclsRequest) requiredVersion() KafkaVersion { - return V0_11_0_0 + switch d.Version { + case 1: + return V2_0_0_0 + default: + return V0_11_0_0 + } } diff --git a/acl_delete_request_test.go b/acl_delete_request_test.go index 2efdcb48e..cb126e30f 100644 --- a/acl_delete_request_test.go +++ b/acl_delete_request_test.go @@ -3,6 +3,28 @@ package sarama import "testing" var ( + aclDeleteRequestNullsv1 = []byte{ + 0, 0, 0, 1, + 1, + 255, 255, + 1, // Any + 255, 255, + 255, 255, + 11, + 3, + } + + aclDeleteRequestv1 = []byte{ + 0, 0, 0, 1, + 1, // any + 0, 6, 'f', 'i', 'l', 't', 'e', 'r', + 1, // Any Filter + 0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l', + 0, 4, 'h', 'o', 's', 't', + 4, // write + 3, // allow + } + aclDeleteRequestNulls = []byte{ 0, 0, 0, 1, 1, @@ -67,3 +89,24 @@ func TestDeleteAclsRequest(t *testing.T) { testRequest(t, "delete request array", req, aclDeleteRequestArray) } + +func TestDeleteAclsRequestV1(t *testing.T) { + req := &DeleteAclsRequest{ + Version: 1, + Filters: []*AclFilter{{ + ResourceType: AclResourceAny, + Operation: AclOperationAlterConfigs, + PermissionType: AclPermissionAllow, + ResourcePatternTypeFilter: AclPatternAny, + }}, + } + + testRequest(t, "delete request nulls", req, aclDeleteRequestNullsv1) + + req.Filters[0].ResourceName = nullString("filter") + req.Filters[0].Principal = nullString("principal") + req.Filters[0].Host = nullString("host") + req.Filters[0].Operation = AclOperationWrite + + testRequest(t, "delete request", req, aclDeleteRequestv1) +} diff --git a/acl_delete_response.go b/acl_delete_response.go index b5e1c45eb..a885fe5c0 100644 --- a/acl_delete_response.go +++ b/acl_delete_response.go @@ -3,6 +3,7 @@ package sarama import "time" type DeleteAclsResponse struct { + Version int16 ThrottleTime time.Duration FilterResponses []*FilterResponse } @@ -15,7 +16,7 @@ func (a *DeleteAclsResponse) encode(pe packetEncoder) error { } for _, filterResponse := range a.FilterResponses { - if err := filterResponse.encode(pe); err != nil { + if err := filterResponse.encode(pe, a.Version); err != nil { return err } } @@ -51,7 +52,7 @@ func (d *DeleteAclsResponse) key() int16 { } func (d *DeleteAclsResponse) version() int16 { - return 0 + return int16(d.Version) } func (d *DeleteAclsResponse) requiredVersion() KafkaVersion { @@ -64,7 +65,7 @@ type FilterResponse struct { MatchingAcls []*MatchingAcl } -func (f *FilterResponse) encode(pe packetEncoder) error { +func (f *FilterResponse) encode(pe packetEncoder, version int16) error { pe.putInt16(int16(f.Err)) if err := pe.putNullableString(f.ErrMsg); err != nil { return err @@ -74,7 +75,7 @@ func (f *FilterResponse) encode(pe packetEncoder) error { return err } for _, matchingAcl := range f.MatchingAcls { - if err := matchingAcl.encode(pe); err != nil { + if err := matchingAcl.encode(pe, version); err != nil { return err } } @@ -115,13 +116,13 @@ type MatchingAcl struct { Acl } -func (m *MatchingAcl) encode(pe packetEncoder) error { +func (m *MatchingAcl) encode(pe packetEncoder, version int16) error { pe.putInt16(int16(m.Err)) if err := pe.putNullableString(m.ErrMsg); err != nil { return err } - if err := m.Resource.encode(pe); err != nil { + if err := m.Resource.encode(pe, version); err != nil { return err } diff --git a/acl_describe_request.go b/acl_describe_request.go index 02a5a1f0e..3c95320e1 100644 --- a/acl_describe_request.go +++ b/acl_describe_request.go @@ -1,14 +1,18 @@ package sarama type DescribeAclsRequest struct { + Version int AclFilter } func (d *DescribeAclsRequest) encode(pe packetEncoder) error { + d.AclFilter.Version = d.Version return d.AclFilter.encode(pe) } func (d *DescribeAclsRequest) decode(pd packetDecoder, version int16) (err error) { + d.Version = int(version) + d.AclFilter.Version = int(version) return d.AclFilter.decode(pd, version) } @@ -17,9 +21,14 @@ func (d *DescribeAclsRequest) key() int16 { } func (d *DescribeAclsRequest) version() int16 { - return 0 + return int16(d.Version) } func (d *DescribeAclsRequest) requiredVersion() KafkaVersion { - return V0_11_0_0 + switch d.Version { + case 1: + return V2_0_0_0 + default: + return V0_11_0_0 + } } diff --git a/acl_describe_request_test.go b/acl_describe_request_test.go index 3af14c616..3cb73eb79 100644 --- a/acl_describe_request_test.go +++ b/acl_describe_request_test.go @@ -13,15 +13,24 @@ var ( 5, // acl operation 3, // acl permission type } + aclDescribeRequestV1 = []byte{ + 2, // resource type + 0, 5, 't', 'o', 'p', 'i', 'c', + 1, // any Type + 0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l', + 0, 4, 'h', 'o', 's', 't', + 5, // acl operation + 3, // acl permission type + } ) -func TestAclDescribeRequest(t *testing.T) { +func TestAclDescribeRequestV0(t *testing.T) { resourcename := "topic" principal := "principal" host := "host" req := &DescribeAclsRequest{ - AclFilter{ + AclFilter: AclFilter{ ResourceType: AclResourceTopic, ResourceName: &resourcename, Principal: &principal, @@ -33,3 +42,24 @@ func TestAclDescribeRequest(t *testing.T) { testRequest(t, "", req, aclDescribeRequest) } + +func TestAclDescribeRequestV1(t *testing.T) { + resourcename := "topic" + principal := "principal" + host := "host" + + req := &DescribeAclsRequest{ + Version: 1, + AclFilter: AclFilter{ + ResourceType: AclResourceTopic, + ResourceName: &resourcename, + ResourcePatternTypeFilter: AclPatternAny, + Principal: &principal, + Host: &host, + Operation: AclOperationCreate, + PermissionType: AclPermissionAllow, + }, + } + + testRequest(t, "", req, aclDescribeRequestV1) +} diff --git a/acl_describe_response.go b/acl_describe_response.go index 5bc9497f4..db3a88c70 100644 --- a/acl_describe_response.go +++ b/acl_describe_response.go @@ -3,6 +3,7 @@ package sarama import "time" type DescribeAclsResponse struct { + Version int16 ThrottleTime time.Duration Err KError ErrMsg *string @@ -22,7 +23,7 @@ func (d *DescribeAclsResponse) encode(pe packetEncoder) error { } for _, resourceAcl := range d.ResourceAcls { - if err := resourceAcl.encode(pe); err != nil { + if err := resourceAcl.encode(pe, d.Version); err != nil { return err } } @@ -72,9 +73,14 @@ func (d *DescribeAclsResponse) key() int16 { } func (d *DescribeAclsResponse) version() int16 { - return 0 + return int16(d.Version) } func (d *DescribeAclsResponse) requiredVersion() KafkaVersion { - return V0_11_0_0 + switch d.Version { + case 1: + return V2_0_0_0 + default: + return V0_11_0_0 + } } diff --git a/acl_filter.go b/acl_filter.go index 970635421..fad555875 100644 --- a/acl_filter.go +++ b/acl_filter.go @@ -1,12 +1,14 @@ package sarama type AclFilter struct { - ResourceType AclResourceType - ResourceName *string - Principal *string - Host *string - Operation AclOperation - PermissionType AclPermissionType + Version int + ResourceType AclResourceType + ResourceName *string + ResourcePatternTypeFilter AclResourcePatternType + Principal *string + Host *string + Operation AclOperation + PermissionType AclPermissionType } func (a *AclFilter) encode(pe packetEncoder) error { @@ -14,6 +16,11 @@ func (a *AclFilter) encode(pe packetEncoder) error { if err := pe.putNullableString(a.ResourceName); err != nil { return err } + + if a.Version == 1 { + pe.putInt8(int8(a.ResourcePatternTypeFilter)) + } + if err := pe.putNullableString(a.Principal); err != nil { return err } @@ -37,6 +44,16 @@ func (a *AclFilter) decode(pd packetDecoder, version int16) (err error) { return err } + if a.Version == 1 { + pattern, err := pd.getInt8() + + if err != nil { + return err + } + + a.ResourcePatternTypeFilter = AclResourcePatternType(pattern) + } + if a.Principal, err = pd.getNullableString(); err != nil { return err } diff --git a/acl_types.go b/acl_types.go index 19da6f2f4..72b7985d0 100644 --- a/acl_types.go +++ b/acl_types.go @@ -40,3 +40,15 @@ const ( AclResourceCluster AclResourceType = 4 AclResourceTransactionalID AclResourceType = 5 ) + +type AclResourcePatternType int + +// ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/PatternType.java + +const ( + AclPatternUnknown AclResourcePatternType = iota + AclPatternAny + AclPatternMatch + AclPatternLiteral + AclPatternPrefixed +)