Skip to content

Commit

Permalink
Merge pull request #1236 from Mongey/cm-v1-create-acl-request
Browse files Browse the repository at this point in the history
Implements v1 of {Create,Describe,Delete}AclRequest
  • Loading branch information
bai authored Feb 22, 2019
2 parents 7bee863 + 66abce6 commit 8c2d15c
Show file tree
Hide file tree
Showing 11 changed files with 214 additions and 32 deletions.
26 changes: 21 additions & 5 deletions acl_bindings.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,26 @@
package sarama

type Resource struct {
ResourceType AclResourceType
ResourceName string
ResourceType AclResourceType
ResourceName string
ResoucePatternType AclResourcePatternType
}

func (r *Resource) encode(pe packetEncoder) error {
func (r *Resource) encode(pe packetEncoder, version int16) error {
pe.putInt8(int8(r.ResourceType))

if err := pe.putString(r.ResourceName); err != nil {
return err
}

if version == 1 {
if r.ResoucePatternType == AclPatternUnknown {
Logger.Print("Cannot encode an unknown resource pattern type, using Literal instead")
r.ResoucePatternType = AclPatternLiteral
}
pe.putInt8(int8(r.ResoucePatternType))
}

return nil
}

Expand All @@ -25,6 +34,13 @@ func (r *Resource) decode(pd packetDecoder, version int16) (err error) {
if r.ResourceName, err = pd.getString(); err != nil {
return err
}
if version == 1 {
pattern, err := pd.getInt8()
if err != nil {
return err
}
r.ResoucePatternType = AclResourcePatternType(pattern)
}

return nil
}
Expand Down Expand Up @@ -80,8 +96,8 @@ type ResourceAcls struct {
Acls []*Acl
}

func (r *ResourceAcls) encode(pe packetEncoder) error {
if err := r.Resource.encode(pe); err != nil {
func (r *ResourceAcls) encode(pe packetEncoder, version int16) error {
if err := r.Resource.encode(pe, version); err != nil {
return err
}

Expand Down
17 changes: 12 additions & 5 deletions acl_create_request.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sarama

type CreateAclsRequest struct {
Version int16
AclCreations []*AclCreation
}

Expand All @@ -10,7 +11,7 @@ func (c *CreateAclsRequest) encode(pe packetEncoder) error {
}

for _, aclCreation := range c.AclCreations {
if err := aclCreation.encode(pe); err != nil {
if err := aclCreation.encode(pe, c.Version); err != nil {
return err
}
}
Expand All @@ -19,6 +20,7 @@ func (c *CreateAclsRequest) encode(pe packetEncoder) error {
}

func (c *CreateAclsRequest) decode(pd packetDecoder, version int16) (err error) {
c.Version = version
n, err := pd.getArrayLength()
if err != nil {
return err
Expand All @@ -41,20 +43,25 @@ func (d *CreateAclsRequest) key() int16 {
}

func (d *CreateAclsRequest) version() int16 {
return 0
return d.Version
}

func (d *CreateAclsRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
switch d.Version {
case 1:
return V2_0_0_0
default:
return V0_11_0_0
}
}

type AclCreation struct {
Resource
Acl
}

func (a *AclCreation) encode(pe packetEncoder) error {
if err := a.Resource.encode(pe); err != nil {
func (a *AclCreation) encode(pe packetEncoder, version int16) error {
if err := a.Resource.encode(pe, version); err != nil {
return err
}
if err := a.Acl.encode(pe); err != nil {
Expand Down
34 changes: 33 additions & 1 deletion acl_create_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,21 @@ var (
2, // all
2, // deny
}
aclCreateRequestv1 = []byte{
0, 0, 0, 1,
3, // resource type = group
0, 5, 'g', 'r', 'o', 'u', 'p',
3, // resource pattten type = literal
0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l',
0, 4, 'h', 'o', 's', 't',
2, // all
2, // deny
}
)

func TestCreateAclsRequest(t *testing.T) {
func TestCreateAclsRequestv0(t *testing.T) {
req := &CreateAclsRequest{
Version: 0,
AclCreations: []*AclCreation{{
Resource: Resource{
ResourceType: AclResourceGroup,
Expand All @@ -32,3 +43,24 @@ func TestCreateAclsRequest(t *testing.T) {

testRequest(t, "create request", req, aclCreateRequest)
}

func TestCreateAclsRequestv1(t *testing.T) {
req := &CreateAclsRequest{
Version: 1,
AclCreations: []*AclCreation{{
Resource: Resource{
ResourceType: AclResourceGroup,
ResourceName: "group",
ResoucePatternType: AclPatternLiteral,
},
Acl: Acl{
Principal: "principal",
Host: "host",
Operation: AclOperationAll,
PermissionType: AclPermissionDeny,
}},
},
}

testRequest(t, "create request v1", req, aclCreateRequestv1)
}
13 changes: 11 additions & 2 deletions acl_delete_request.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sarama

type DeleteAclsRequest struct {
Version int
Filters []*AclFilter
}

Expand All @@ -10,6 +11,7 @@ func (d *DeleteAclsRequest) encode(pe packetEncoder) error {
}

for _, filter := range d.Filters {
filter.Version = d.Version
if err := filter.encode(pe); err != nil {
return err
}
Expand All @@ -19,6 +21,7 @@ func (d *DeleteAclsRequest) encode(pe packetEncoder) error {
}

func (d *DeleteAclsRequest) decode(pd packetDecoder, version int16) (err error) {
d.Version = int(version)
n, err := pd.getArrayLength()
if err != nil {
return err
Expand All @@ -27,6 +30,7 @@ func (d *DeleteAclsRequest) decode(pd packetDecoder, version int16) (err error)
d.Filters = make([]*AclFilter, n)
for i := 0; i < n; i++ {
d.Filters[i] = new(AclFilter)
d.Filters[i].Version = int(version)
if err := d.Filters[i].decode(pd, version); err != nil {
return err
}
Expand All @@ -40,9 +44,14 @@ func (d *DeleteAclsRequest) key() int16 {
}

func (d *DeleteAclsRequest) version() int16 {
return 0
return int16(d.Version)
}

func (d *DeleteAclsRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
switch d.Version {
case 1:
return V2_0_0_0
default:
return V0_11_0_0
}
}
43 changes: 43 additions & 0 deletions acl_delete_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,28 @@ package sarama
import "testing"

var (
aclDeleteRequestNullsv1 = []byte{
0, 0, 0, 1,
1,
255, 255,
1, // Any
255, 255,
255, 255,
11,
3,
}

aclDeleteRequestv1 = []byte{
0, 0, 0, 1,
1, // any
0, 6, 'f', 'i', 'l', 't', 'e', 'r',
1, // Any Filter
0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l',
0, 4, 'h', 'o', 's', 't',
4, // write
3, // allow
}

aclDeleteRequestNulls = []byte{
0, 0, 0, 1,
1,
Expand Down Expand Up @@ -67,3 +89,24 @@ func TestDeleteAclsRequest(t *testing.T) {

testRequest(t, "delete request array", req, aclDeleteRequestArray)
}

func TestDeleteAclsRequestV1(t *testing.T) {
req := &DeleteAclsRequest{
Version: 1,
Filters: []*AclFilter{{
ResourceType: AclResourceAny,
Operation: AclOperationAlterConfigs,
PermissionType: AclPermissionAllow,
ResourcePatternTypeFilter: AclPatternAny,
}},
}

testRequest(t, "delete request nulls", req, aclDeleteRequestNullsv1)

req.Filters[0].ResourceName = nullString("filter")
req.Filters[0].Principal = nullString("principal")
req.Filters[0].Host = nullString("host")
req.Filters[0].Operation = AclOperationWrite

testRequest(t, "delete request", req, aclDeleteRequestv1)
}
13 changes: 7 additions & 6 deletions acl_delete_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sarama
import "time"

type DeleteAclsResponse struct {
Version int16
ThrottleTime time.Duration
FilterResponses []*FilterResponse
}
Expand All @@ -15,7 +16,7 @@ func (a *DeleteAclsResponse) encode(pe packetEncoder) error {
}

for _, filterResponse := range a.FilterResponses {
if err := filterResponse.encode(pe); err != nil {
if err := filterResponse.encode(pe, a.Version); err != nil {
return err
}
}
Expand Down Expand Up @@ -51,7 +52,7 @@ func (d *DeleteAclsResponse) key() int16 {
}

func (d *DeleteAclsResponse) version() int16 {
return 0
return int16(d.Version)
}

func (d *DeleteAclsResponse) requiredVersion() KafkaVersion {
Expand All @@ -64,7 +65,7 @@ type FilterResponse struct {
MatchingAcls []*MatchingAcl
}

func (f *FilterResponse) encode(pe packetEncoder) error {
func (f *FilterResponse) encode(pe packetEncoder, version int16) error {
pe.putInt16(int16(f.Err))
if err := pe.putNullableString(f.ErrMsg); err != nil {
return err
Expand All @@ -74,7 +75,7 @@ func (f *FilterResponse) encode(pe packetEncoder) error {
return err
}
for _, matchingAcl := range f.MatchingAcls {
if err := matchingAcl.encode(pe); err != nil {
if err := matchingAcl.encode(pe, version); err != nil {
return err
}
}
Expand Down Expand Up @@ -115,13 +116,13 @@ type MatchingAcl struct {
Acl
}

func (m *MatchingAcl) encode(pe packetEncoder) error {
func (m *MatchingAcl) encode(pe packetEncoder, version int16) error {
pe.putInt16(int16(m.Err))
if err := pe.putNullableString(m.ErrMsg); err != nil {
return err
}

if err := m.Resource.encode(pe); err != nil {
if err := m.Resource.encode(pe, version); err != nil {
return err
}

Expand Down
13 changes: 11 additions & 2 deletions acl_describe_request.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package sarama

type DescribeAclsRequest struct {
Version int
AclFilter
}

func (d *DescribeAclsRequest) encode(pe packetEncoder) error {
d.AclFilter.Version = d.Version
return d.AclFilter.encode(pe)
}

func (d *DescribeAclsRequest) decode(pd packetDecoder, version int16) (err error) {
d.Version = int(version)
d.AclFilter.Version = int(version)
return d.AclFilter.decode(pd, version)
}

Expand All @@ -17,9 +21,14 @@ func (d *DescribeAclsRequest) key() int16 {
}

func (d *DescribeAclsRequest) version() int16 {
return 0
return int16(d.Version)
}

func (d *DescribeAclsRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
switch d.Version {
case 1:
return V2_0_0_0
default:
return V0_11_0_0
}
}
Loading

0 comments on commit 8c2d15c

Please sign in to comment.