Skip to content

Commit

Permalink
CreateMode: fixing incorrect create mode TTL including refactor of cr…
Browse files Browse the repository at this point in the history
…eate flags

BREAKING Client behavior: since this proposes a new parsing of the Create flag integer this will break clients if they rely on CreateContainer as it was creating znodes that may not have been containers.

Changes:
- Fix FlagTTL value from 4 -> 5
- Adding all known CreateMode values to Flag constants.
- Adding a createMode private struct behind the flag integer, replicate CreateMode from ZK java lib.
- Create, CreateContainer, CreateTTL methods now parse the flag integer passed in based on the constants defined.
- Rewrite tests to better catch CreateContainer and CreateTTL (no assertions on zk behavior since zk does not expose znode mode)
- Added Change-Detector unit tests for create mode values.
  • Loading branch information
jeffbean committed Jul 4, 2024
1 parent 6131812 commit a726174
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 79 deletions.
40 changes: 30 additions & 10 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1055,43 +1055,63 @@ func (c *Conn) Set(path string, data []byte, version int32) (*Stat, error) {
// same as the input, for example when creating a sequence znode the returned path
// will be the input path with a sequence number appended.
func (c *Conn) Create(path string, data []byte, flags int32, acl []ACL) (string, error) {
if err := validatePath(path, flags&FlagSequence == FlagSequence); err != nil {
createMode, err := parseCreateMode(flags)
if err != nil {
return "", err
}

if err := validatePath(path, createMode.sequential); err != nil {
return "", err
}

res := &createResponse{}
_, err := c.request(opCreate, &CreateRequest{path, data, acl, flags}, res, nil)
_, err = c.request(opCreate, &CreateRequest{path, data, acl, createMode.toFlag()}, res, nil)
if err == ErrConnectionClosed {
return "", err
}
return res.Path, err
}

// CreateContainer creates a container znode and returns the path.
func (c *Conn) CreateContainer(path string, data []byte, flags int32, acl []ACL) (string, error) {
if err := validatePath(path, flags&FlagSequence == FlagSequence); err != nil {
//
// Containers cannot be ephemeral or sequential, or have TTLs.
// Ensure that we reject flags for TTL, Sequence, and Ephemeral.
func (c *Conn) CreateContainer(path string, data []byte, flag int32, acl []ACL) (string, error) {
createMode, err := parseCreateMode(flag)
if err != nil {
return "", err
}
if flags&FlagTTL != FlagTTL {

if err := validatePath(path, createMode.sequential); err != nil {
return "", err
}

if !createMode.isContainer {
return "", ErrInvalidFlags
}

res := &createResponse{}
_, err := c.request(opCreateContainer, &CreateContainerRequest{path, data, acl, flags}, res, nil)
_, err = c.request(opCreateContainer, &CreateRequest{path, data, acl, createMode.toFlag()}, res, nil)
return res.Path, err
}

// CreateTTL creates a TTL znode, which will be automatically deleted by server after the TTL.
func (c *Conn) CreateTTL(path string, data []byte, flags int32, acl []ACL, ttl time.Duration) (string, error) {
if err := validatePath(path, flags&FlagSequence == FlagSequence); err != nil {
func (c *Conn) CreateTTL(path string, data []byte, flag int32, acl []ACL, ttl time.Duration) (string, error) {
createMode, err := parseCreateMode(flag)
if err != nil {
return "", err
}
if flags&FlagTTL != FlagTTL {

if err := validatePath(path, createMode.sequential); err != nil {
return "", err
}

if !createMode.isTTL {
return "", ErrInvalidFlags
}

res := &createResponse{}
_, err := c.request(opCreateTTL, &CreateTTLRequest{path, data, acl, flags, ttl.Milliseconds()}, res, nil)
_, err = c.request(opCreateTTL, &CreateTTLRequest{path, data, acl, createMode.toFlag(), ttl.Milliseconds()}, res, nil)
return res.Path, err
}

Expand Down
7 changes: 0 additions & 7 deletions constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,6 @@ const (
StateHasSession = State(101)
)

const (
// FlagEphemeral means the node is ephemeral.
FlagEphemeral = 1
FlagSequence = 2
FlagTTL = 4
)

var (
stateNames = map[State]string{
StateUnknown: "StateUnknown",
Expand Down
52 changes: 52 additions & 0 deletions create_mode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package zk

import "fmt"

const (
FlagPersistent = 0
FlagEphemeral = 1
FlagSequence = 2
FlagEphemeralSequential = 3
FlagContainer = 4
FlagTTL = 5
FlagPersistentSequentialWithTTL = 6
)

type createMode struct {
flag int32
ephemeral bool
sequential bool
isContainer bool
isTTL bool
}

func (cm *createMode) toFlag() int32 {
return cm.flag
}

// parsing a flag integer into the CreateMode needed to call the correct
// Create RPC to Zookeeper.
//
// NOTE: This parse method is designed to be able to copy and paste the same
// CreateMode ENUM constructors from Java:
// https://github.com/apache/zookeeper/blob/master/zookeeper-server/src/main/java/org/apache/zookeeper/CreateMode.java
func parseCreateMode(flag int32) (createMode, error) {
switch flag {
case FlagPersistent:
return createMode{0, false, false, false, false}, nil
case FlagEphemeral:
return createMode{1, true, false, false, false}, nil
case FlagSequence:
return createMode{2, false, true, false, false}, nil
case FlagEphemeralSequential:
return createMode{3, true, true, false, false}, nil
case FlagContainer:
return createMode{4, false, false, true, false}, nil
case FlagTTL:
return createMode{5, false, false, false, true}, nil
case FlagPersistentSequentialWithTTL:
return createMode{6, false, true, false, true}, nil
default:
return createMode{}, fmt.Errorf("invalid flag value: [%v]", flag)
}
}
43 changes: 43 additions & 0 deletions create_mode_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package zk

import (
"strings"
"testing"
)

func TestParseCreateMode(t *testing.T) {
changeDetectorTests := []struct {
name string
flag int32
wantIntValue int32
}{
{"valid flag createmode 0 persistant", FlagPersistent, 0},
{"ephemeral", FlagEphemeral, 1},
{"sequential", FlagSequence, 2},
{"ephemeral sequential", FlagEphemeralSequential, 3},
{"container", FlagContainer, 4},
{"ttl", FlagTTL, 5},
{"persistentSequential w/TTL", FlagPersistentSequentialWithTTL, 6},
}
for _, tt := range changeDetectorTests {
t.Run(tt.name, func(t *testing.T) {
cm, err := parseCreateMode(tt.flag)
requireNoError(t, err)
if cm.toFlag() != tt.wantIntValue {
// change detector test for enum values.
t.Fatalf("createmode value of flag; want: %v, got: %v", cm.toFlag(), tt.wantIntValue)
}
})
}

t.Run("failed to parse", func(t *testing.T) {
cm, err := parseCreateMode(-123)
if err == nil {
t.Fatalf("error expected, got: %v", cm)
}
if !strings.Contains(err.Error(), "invalid flag value") {
t.Fatalf("unexpected error value: %v", err)
}
})

}
6 changes: 1 addition & 5 deletions structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,6 @@ type CreateRequest struct {
Flags int32
}

type CreateContainerRequest CreateRequest

type CreateTTLRequest struct {
Path string
Data []byte
Expand Down Expand Up @@ -598,10 +596,8 @@ func requestStructForOp(op int32) interface{} {
switch op {
case opClose:
return &closeRequest{}
case opCreate:
case opCreate, opCreateContainer:
return &CreateRequest{}
case opCreateContainer:
return &CreateContainerRequest{}
case opCreateTTL:
return &CreateTTLRequest{}
case opDelete:
Expand Down
173 changes: 116 additions & 57 deletions zk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,46 +109,59 @@ func TestIntegration_CreateTTL(t *testing.T) {
}
defer zk.Close()

path := "/gozk-test"

if err := zk.Delete(path, -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
if _, err := zk.CreateTTL("", []byte{1, 2, 3, 4}, FlagTTL|FlagEphemeral, WorldACL(PermAll), 60*time.Second); err != ErrInvalidPath {
t.Fatalf("Create path check failed")
}
if _, err := zk.CreateTTL(path, []byte{1, 2, 3, 4}, 0, WorldACL(PermAll), 60*time.Second); err != ErrInvalidFlags {
t.Fatalf("Create flags check failed")
}
if p, err := zk.CreateTTL(path, []byte{1, 2, 3, 4}, FlagTTL|FlagEphemeral, WorldACL(PermAll), 60*time.Second); err != nil {
t.Fatalf("Create returned error: %+v", err)
} else if p != path {
t.Fatalf("Create returned different path '%s' != '%s'", p, path)
}
if data, stat, err := zk.Get(path); err != nil {
t.Fatalf("Get returned error: %+v", err)
} else if stat == nil {
t.Fatal("Get returned nil stat")
} else if len(data) < 4 {
t.Fatal("Get returned wrong size data")
}
tests := []struct {
name string
createFlags int32
giveDuration time.Duration
wantErr string
}{
{
name: "valid create ttl",
createFlags: FlagTTL,
giveDuration: time.Minute,
},
{
name: "valid change detector",
createFlags: 5,
giveDuration: time.Minute,
},
{
name: "invalid flag for create mode",
createFlags: 999,
giveDuration: time.Minute,
wantErr: "invalid flag value: [999]",
},
}

const testPath = "/ttl_znode_tests"
// create sub node to create per test in avoiding using the root path.
_, err = zk.Create(testPath, nil /* data */, FlagPersistent, WorldACL(PermAll))
requireNoError(t, err)

for idx, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
path := filepath.Join(testPath, fmt.Sprint(idx))
_, err := zk.CreateTTL(path, []byte{12}, tt.createFlags, WorldACL(PermAll), tt.giveDuration)
if tt.wantErr == "" {
requireNoError(t, err, fmt.Sprintf("error not expected: path; %q; flags %v", path, tt.createFlags))
return
}

if err := zk.Delete(path, -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
if p, err := zk.CreateTTL(path, []byte{1, 2, 3, 4}, FlagTTL|FlagSequence, WorldACL(PermAll), 60*time.Second); err != nil {
t.Fatalf("Create returned error: %+v", err)
} else if !strings.HasPrefix(p, path) {
t.Fatalf("Create returned invalid path '%s' are not '%s' with sequence", p, path)
} else if data, stat, err := zk.Get(p); err != nil {
t.Fatalf("Get returned error: %+v", err)
} else if stat == nil {
t.Fatal("Get returned nil stat")
} else if len(data) < 4 {
t.Fatal("Get returned wrong size data")
// want an error
if err == nil {
t.Fatalf("did not get expected error: %q", tt.wantErr)
}
if !strings.Contains(err.Error(), tt.wantErr) {
t.Fatalf("wanted error not found: %v; got: %v", tt.wantErr, err.Error())
}
})
}
}

// NOTE: Currently there is not a way to get the znode after creating and
// asserting it is once mode or another. This means these tests are only testing the
// path of creation, but is not asserting that the resulting znode is the
// mode we set with flags.
func TestIntegration_CreateContainer(t *testing.T) {
ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
Expand All @@ -161,28 +174,74 @@ func TestIntegration_CreateContainer(t *testing.T) {
}
defer zk.Close()

path := "/gozk-test"
tests := []struct {
name string
createFlags int32
wantErr string
}{
{
name: "valid create container",
createFlags: FlagContainer,
},
{
name: "valid create container hard coded flag int",
createFlags: 4,
// container flag, ensure matches ZK Create Mode (change detector test)
},
{
name: "invalid create mode",
createFlags: 999,
wantErr: "invalid flag value: [999]",
},
{
name: "invalid containers cannot be persistant",
createFlags: FlagPersistent,
wantErr: ErrInvalidFlags.Error(),
},
{
name: "invalid containers cannot be ephemeral",
createFlags: FlagEphemeral,
wantErr: ErrInvalidFlags.Error(),
},
{
name: "invalid containers cannot be sequential",
createFlags: FlagSequence,
wantErr: ErrInvalidFlags.Error(),
},
{
name: "invalid container and sequential",
createFlags: FlagContainer | FlagSequence,
wantErr: ErrInvalidFlags.Error(),
},
{
name: "invliad TTLs cannot be used with Container znodes",
createFlags: FlagTTL,
wantErr: ErrInvalidFlags.Error(),
},
}

const testPath = "/container_test_znode"
// create sub node to create per test in avoiding using the root path.
_, err = zk.Create(testPath, nil /* data */, FlagPersistent, WorldACL(PermAll))
requireNoError(t, err)

for idx, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
path := filepath.Join(testPath, fmt.Sprint(idx))
_, err := zk.CreateContainer(path, []byte{12}, tt.createFlags, WorldACL(PermAll))
if tt.wantErr == "" {
requireNoError(t, err, fmt.Sprintf("error not expected: path; %q; flags %v", path, tt.createFlags))
return
}

if err := zk.Delete(path, -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
if _, err := zk.CreateContainer("", []byte{1, 2, 3, 4}, FlagTTL, WorldACL(PermAll)); err != ErrInvalidPath {
t.Fatalf("Create path check failed")
}
if _, err := zk.CreateContainer(path, []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != ErrInvalidFlags {
t.Fatalf("Create flags check failed")
}
if p, err := zk.CreateContainer(path, []byte{1, 2, 3, 4}, FlagTTL, WorldACL(PermAll)); err != nil {
t.Fatalf("Create returned error: %+v", err)
} else if p != path {
t.Fatalf("Create returned different path '%s' != '%s'", p, path)
}
if data, stat, err := zk.Get(path); err != nil {
t.Fatalf("Get returned error: %+v", err)
} else if stat == nil {
t.Fatal("Get returned nil stat")
} else if len(data) < 4 {
t.Fatal("Get returned wrong size data")
// want an error
if err == nil {
t.Fatalf("did not get expected error: %q", tt.wantErr)
}
if !strings.Contains(err.Error(), tt.wantErr) {
t.Fatalf("wanted error not found: %v; got: %v", tt.wantErr, err.Error())
}
})
}
}

Expand Down

0 comments on commit a726174

Please sign in to comment.