Skip to content

Commit

Permalink
fix(varlogtest): validate log stream descriptor to create it
Browse files Browse the repository at this point in the history
This PR adds a validator to the AddLogStream API in the varlogtest.
Especially the validator checks the replicas requested. For example, if they have duplicate storage
nodes, their length is less than the replication factor, or they have unknown storage nodes.

Although the real varlogadm server's AddLogStream API is tested, the same thing in the varlogtest
package has not been tested, surprisingly. To fix it, all the implementations in the
`internal/admin` and `pkg/varlogtest` have to use the same test suite. However, it will lead to a
quite big refactoring. Therefore, this PR fixes the bug and adds some unit tests in the varlogtest
package.

Later we will revisit it to resolve #592.
  • Loading branch information
ijsong committed Sep 25, 2023
1 parent e1216ab commit 2867ea4
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 4 deletions.
20 changes: 16 additions & 4 deletions pkg/varlogtest/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,11 @@ func (c *testAdmin) AddLogStream(_ context.Context, topicID types.TopicID, logSt
LogStreamID: logStreamID,
TopicID: topicID,
Status: varlogpb.LogStreamStatusRunning,
Replicas: make([]*varlogpb.ReplicaDescriptor, c.vt.replicationFactor),
Replicas: logStreamReplicas,
}

if logStreamReplicas == nil {
if lsd.Replicas == nil {
lsd.Replicas = make([]*varlogpb.ReplicaDescriptor, c.vt.replicationFactor)
snids := c.vt.storageNodeIDs()
for i, j := range c.vt.rng.Perm(len(snids))[:c.vt.replicationFactor] {
snid := snids[j]
Expand All @@ -260,8 +261,19 @@ func (c *testAdmin) AddLogStream(_ context.Context, topicID types.TopicID, logSt
DataPath: dataPath,
}
}
} else {
lsd.Replicas = logStreamReplicas
}

if err := lsd.Validate(); err != nil {
return nil, err
}
if len(lsd.Replicas) < c.vt.replicationFactor {
return nil, errors.New("not enough replicas")
}
for _, rd := range lsd.Replicas {
_, ok := c.vt.storageNodes[rd.StorageNodeID]
if !ok {
return nil, fmt.Errorf("unknown storage node %d", rd.StorageNodeID)
}
}

c.vt.logStreams[logStreamID] = lsd
Expand Down
120 changes: 120 additions & 0 deletions pkg/varlogtest/varlogtest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"math/rand"
"slices"
"strconv"
"sync"
"testing"
"time"
Expand All @@ -20,9 +21,128 @@ import (
"github.com/kakao/varlog/pkg/varlog/x/mlsa"
"github.com/kakao/varlog/pkg/varlogtest"
"github.com/kakao/varlog/pkg/verrors"
"github.com/kakao/varlog/proto/snpb"
"github.com/kakao/varlog/proto/varlogpb"
)

func TestVarlogTestAddLogStream(t *testing.T) {
const (
cid = types.MinClusterID
repfactor = 3
)

ctx := context.Background()

addStorageNodes := func(t *testing.T, adm varlog.Admin) []snpb.StorageNodeMetadataDescriptor {
snmds := make([]snpb.StorageNodeMetadataDescriptor, repfactor)
for i := 0; i < repfactor; i++ {
snid := types.MinStorageNodeID + types.StorageNodeID(i)
snaddr := "sn" + strconv.Itoa(i+1)
snm, err := adm.AddStorageNode(ctx, snid, snaddr)
require.NoError(t, err)
snmds[i] = snm.StorageNodeMetadataDescriptor
}
return snmds
}

tcs := []struct {
testf func(t *testing.T, adm varlog.Admin, tpid types.TopicID)
name string
}{
{
name: "Succeed",
testf: func(t *testing.T, adm varlog.Admin, tpid types.TopicID) {
_ = addStorageNodes(t, adm)

_, err := adm.AddLogStream(ctx, tpid, nil)
require.NoError(t, err)
},
},
{
name: "CouldNotAddLogStream_NoSuchTopic",
testf: func(t *testing.T, adm varlog.Admin, tpid types.TopicID) {
_, err := adm.AddLogStream(ctx, tpid+1, nil)
require.Error(t, err)
},
},
{
name: "CouldNotAddLogStream_NotEnoughStorageNodes",
testf: func(t *testing.T, adm varlog.Admin, tpid types.TopicID) {
_, err := adm.AddLogStream(ctx, tpid, nil)
require.Error(t, err)
},
},
{
name: "CouldNotAddLogStream_DuplicateStorageNodeInReplicas",
testf: func(t *testing.T, adm varlog.Admin, tpid types.TopicID) {
snmds := addStorageNodes(t, adm)

replicas := make([]*varlogpb.ReplicaDescriptor, repfactor)
for i := range replicas {
replicas[i] = &varlogpb.ReplicaDescriptor{
StorageNodeID: snmds[0].StorageNodeID,
StorageNodePath: snmds[0].Storages[0].Path,
}
}
_, err := adm.AddLogStream(ctx, tpid, replicas)
require.Error(t, err)
},
},
{
name: "CouldNotAddLogStream_UnknownStorageNode",
testf: func(t *testing.T, adm varlog.Admin, tpid types.TopicID) {
snmds := addStorageNodes(t, adm)

replicas := make([]*varlogpb.ReplicaDescriptor, repfactor)
for i := range replicas {
replicas[i] = &varlogpb.ReplicaDescriptor{
StorageNodeID: snmds[i].StorageNodeID + 10,
StorageNodePath: snmds[i].Storages[0].Path,
}
}
_, err := adm.AddLogStream(ctx, tpid, replicas)
require.Error(t, err)
},
},
{
name: "CouldNotAddLogStream_ReplicasLengthLessThanReplicationFactor",
testf: func(t *testing.T, adm varlog.Admin, tpid types.TopicID) {
snmds := addStorageNodes(t, adm)

replicas := make([]*varlogpb.ReplicaDescriptor, repfactor-1)
for i := 0; i < repfactor-1; i++ {
replicas[i] = &varlogpb.ReplicaDescriptor{
StorageNodeID: snmds[i].StorageNodeID,
StorageNodePath: snmds[i].Storages[0].Path,
}
}
_, err := adm.AddLogStream(ctx, tpid, replicas)
require.Error(t, err)
},
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
vt, err := varlogtest.New(
varlogtest.WithClusterID(cid),
varlogtest.WithReplicationFactor(repfactor),
)
require.NoError(t, err)

adm := vt.Admin()
t.Cleanup(func() {
require.NoError(t, adm.Close())
})

td, err := adm.AddTopic(ctx)
require.NoError(t, err)

tc.testf(t, adm, td.TopicID)
})
}
}

func TestVarlotTest_LogStreamAppender(t *testing.T) {
const (
cid = types.ClusterID(1)
Expand Down
32 changes: 32 additions & 0 deletions proto/varlogpb/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,28 @@ func (s TopicStatus) Deleted() bool {
return s == TopicStatusDeleted
}

func (lsd LogStreamDescriptor) Validate() error {
if len(lsd.Replicas) == 0 {
return errors.New("log stream descriptor: no replicas")
}

const size = 3
snids := make(map[types.StorageNodeID]struct{}, size)
for idx := range lsd.Replicas {
if lsd.Replicas[idx] == nil {
return errors.New("log stream descriptor: nil replica")
}
if err := lsd.Replicas[idx].Validate(); err != nil {
return fmt.Errorf("log stream descriptor: %w", err)
}
snids[lsd.Replicas[idx].StorageNodeID] = struct{}{}
}
if len(snids) != len(lsd.Replicas) {
return errors.New("log stream descriptor: duplicate storage nodes")
}
return nil
}

func (l *LogStreamDescriptor) Valid() bool {
if l == nil || len(l.Replicas) == 0 {
return false
Expand All @@ -112,6 +134,16 @@ func (l *LogStreamDescriptor) IsReplica(snID types.StorageNodeID) bool {
return false
}

func (rd ReplicaDescriptor) Validate() error {
if rd.StorageNodeID.Invalid() {
return errors.New("replica descriptor: invalid storage node id")
}
if len(rd.StorageNodePath) == 0 {
return fmt.Errorf("replica descriptor: no path in storage node %d", rd.StorageNodeID)
}
return nil
}

func (r *ReplicaDescriptor) valid() bool {
return r != nil && len(r.StorageNodePath) != 0
}
Expand Down

0 comments on commit 2867ea4

Please sign in to comment.