From 2867ea4ba21a731bf20dbe67491bb20e7a13e7c1 Mon Sep 17 00:00:00 2001 From: Injun Song Date: Mon, 25 Sep 2023 23:08:19 +0900 Subject: [PATCH] fix(varlogtest): validate log stream descriptor to create it This PR adds a validator to the AddLogStream API in the varlogtest. Especially the validator checks the replicas requested. For example, if they have duplicate storage nodes, their length is less than the replication factor, or they have unknown storage nodes. Although the real varlogadm server's AddLogStream API is tested, the same thing in the varlogtest package has not been tested, surprisingly. To fix it, all the implementations in the `internal/admin` and `pkg/varlogtest` have to use the same test suite. However, it will lead to a quite big refactoring. Therefore, this PR fixes the bug and adds some unit tests in the varlogtest package. Later we will revisit it to resolve #592. --- pkg/varlogtest/admin.go | 20 ++++- pkg/varlogtest/varlogtest_test.go | 120 ++++++++++++++++++++++++++++++ proto/varlogpb/metadata.go | 32 ++++++++ 3 files changed, 168 insertions(+), 4 deletions(-) diff --git a/pkg/varlogtest/admin.go b/pkg/varlogtest/admin.go index c1453211e..de7aa4cf9 100644 --- a/pkg/varlogtest/admin.go +++ b/pkg/varlogtest/admin.go @@ -245,10 +245,11 @@ func (c *testAdmin) AddLogStream(_ context.Context, topicID types.TopicID, logSt LogStreamID: logStreamID, TopicID: topicID, Status: varlogpb.LogStreamStatusRunning, - Replicas: make([]*varlogpb.ReplicaDescriptor, c.vt.replicationFactor), + Replicas: logStreamReplicas, } - if logStreamReplicas == nil { + if lsd.Replicas == nil { + lsd.Replicas = make([]*varlogpb.ReplicaDescriptor, c.vt.replicationFactor) snids := c.vt.storageNodeIDs() for i, j := range c.vt.rng.Perm(len(snids))[:c.vt.replicationFactor] { snid := snids[j] @@ -260,8 +261,19 @@ func (c *testAdmin) AddLogStream(_ context.Context, topicID types.TopicID, logSt DataPath: dataPath, } } - } else { - lsd.Replicas = logStreamReplicas + } + + if err := lsd.Validate(); err != nil { + return nil, err + } + if len(lsd.Replicas) < c.vt.replicationFactor { + return nil, errors.New("not enough replicas") + } + for _, rd := range lsd.Replicas { + _, ok := c.vt.storageNodes[rd.StorageNodeID] + if !ok { + return nil, fmt.Errorf("unknown storage node %d", rd.StorageNodeID) + } } c.vt.logStreams[logStreamID] = lsd diff --git a/pkg/varlogtest/varlogtest_test.go b/pkg/varlogtest/varlogtest_test.go index dd951a0ef..b55fe81c5 100644 --- a/pkg/varlogtest/varlogtest_test.go +++ b/pkg/varlogtest/varlogtest_test.go @@ -6,6 +6,7 @@ import ( "io" "math/rand" "slices" + "strconv" "sync" "testing" "time" @@ -20,9 +21,128 @@ import ( "github.com/kakao/varlog/pkg/varlog/x/mlsa" "github.com/kakao/varlog/pkg/varlogtest" "github.com/kakao/varlog/pkg/verrors" + "github.com/kakao/varlog/proto/snpb" "github.com/kakao/varlog/proto/varlogpb" ) +func TestVarlogTestAddLogStream(t *testing.T) { + const ( + cid = types.MinClusterID + repfactor = 3 + ) + + ctx := context.Background() + + addStorageNodes := func(t *testing.T, adm varlog.Admin) []snpb.StorageNodeMetadataDescriptor { + snmds := make([]snpb.StorageNodeMetadataDescriptor, repfactor) + for i := 0; i < repfactor; i++ { + snid := types.MinStorageNodeID + types.StorageNodeID(i) + snaddr := "sn" + strconv.Itoa(i+1) + snm, err := adm.AddStorageNode(ctx, snid, snaddr) + require.NoError(t, err) + snmds[i] = snm.StorageNodeMetadataDescriptor + } + return snmds + } + + tcs := []struct { + testf func(t *testing.T, adm varlog.Admin, tpid types.TopicID) + name string + }{ + { + name: "Succeed", + testf: func(t *testing.T, adm varlog.Admin, tpid types.TopicID) { + _ = addStorageNodes(t, adm) + + _, err := adm.AddLogStream(ctx, tpid, nil) + require.NoError(t, err) + }, + }, + { + name: "CouldNotAddLogStream_NoSuchTopic", + testf: func(t *testing.T, adm varlog.Admin, tpid types.TopicID) { + _, err := adm.AddLogStream(ctx, tpid+1, nil) + require.Error(t, err) + }, + }, + { + name: "CouldNotAddLogStream_NotEnoughStorageNodes", + testf: func(t *testing.T, adm varlog.Admin, tpid types.TopicID) { + _, err := adm.AddLogStream(ctx, tpid, nil) + require.Error(t, err) + }, + }, + { + name: "CouldNotAddLogStream_DuplicateStorageNodeInReplicas", + testf: func(t *testing.T, adm varlog.Admin, tpid types.TopicID) { + snmds := addStorageNodes(t, adm) + + replicas := make([]*varlogpb.ReplicaDescriptor, repfactor) + for i := range replicas { + replicas[i] = &varlogpb.ReplicaDescriptor{ + StorageNodeID: snmds[0].StorageNodeID, + StorageNodePath: snmds[0].Storages[0].Path, + } + } + _, err := adm.AddLogStream(ctx, tpid, replicas) + require.Error(t, err) + }, + }, + { + name: "CouldNotAddLogStream_UnknownStorageNode", + testf: func(t *testing.T, adm varlog.Admin, tpid types.TopicID) { + snmds := addStorageNodes(t, adm) + + replicas := make([]*varlogpb.ReplicaDescriptor, repfactor) + for i := range replicas { + replicas[i] = &varlogpb.ReplicaDescriptor{ + StorageNodeID: snmds[i].StorageNodeID + 10, + StorageNodePath: snmds[i].Storages[0].Path, + } + } + _, err := adm.AddLogStream(ctx, tpid, replicas) + require.Error(t, err) + }, + }, + { + name: "CouldNotAddLogStream_ReplicasLengthLessThanReplicationFactor", + testf: func(t *testing.T, adm varlog.Admin, tpid types.TopicID) { + snmds := addStorageNodes(t, adm) + + replicas := make([]*varlogpb.ReplicaDescriptor, repfactor-1) + for i := 0; i < repfactor-1; i++ { + replicas[i] = &varlogpb.ReplicaDescriptor{ + StorageNodeID: snmds[i].StorageNodeID, + StorageNodePath: snmds[i].Storages[0].Path, + } + } + _, err := adm.AddLogStream(ctx, tpid, replicas) + require.Error(t, err) + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + vt, err := varlogtest.New( + varlogtest.WithClusterID(cid), + varlogtest.WithReplicationFactor(repfactor), + ) + require.NoError(t, err) + + adm := vt.Admin() + t.Cleanup(func() { + require.NoError(t, adm.Close()) + }) + + td, err := adm.AddTopic(ctx) + require.NoError(t, err) + + tc.testf(t, adm, td.TopicID) + }) + } +} + func TestVarlotTest_LogStreamAppender(t *testing.T) { const ( cid = types.ClusterID(1) diff --git a/proto/varlogpb/metadata.go b/proto/varlogpb/metadata.go index fb6c44ea4..87ec0c1fa 100644 --- a/proto/varlogpb/metadata.go +++ b/proto/varlogpb/metadata.go @@ -88,6 +88,28 @@ func (s TopicStatus) Deleted() bool { return s == TopicStatusDeleted } +func (lsd LogStreamDescriptor) Validate() error { + if len(lsd.Replicas) == 0 { + return errors.New("log stream descriptor: no replicas") + } + + const size = 3 + snids := make(map[types.StorageNodeID]struct{}, size) + for idx := range lsd.Replicas { + if lsd.Replicas[idx] == nil { + return errors.New("log stream descriptor: nil replica") + } + if err := lsd.Replicas[idx].Validate(); err != nil { + return fmt.Errorf("log stream descriptor: %w", err) + } + snids[lsd.Replicas[idx].StorageNodeID] = struct{}{} + } + if len(snids) != len(lsd.Replicas) { + return errors.New("log stream descriptor: duplicate storage nodes") + } + return nil +} + func (l *LogStreamDescriptor) Valid() bool { if l == nil || len(l.Replicas) == 0 { return false @@ -112,6 +134,16 @@ func (l *LogStreamDescriptor) IsReplica(snID types.StorageNodeID) bool { return false } +func (rd ReplicaDescriptor) Validate() error { + if rd.StorageNodeID.Invalid() { + return errors.New("replica descriptor: invalid storage node id") + } + if len(rd.StorageNodePath) == 0 { + return fmt.Errorf("replica descriptor: no path in storage node %d", rd.StorageNodeID) + } + return nil +} + func (r *ReplicaDescriptor) valid() bool { return r != nil && len(r.StorageNodePath) != 0 }