diff --git a/pkg/varlogtest/admin.go b/pkg/varlogtest/admin.go index c1453211e..de7aa4cf9 100644 --- a/pkg/varlogtest/admin.go +++ b/pkg/varlogtest/admin.go @@ -245,10 +245,11 @@ func (c *testAdmin) AddLogStream(_ context.Context, topicID types.TopicID, logSt LogStreamID: logStreamID, TopicID: topicID, Status: varlogpb.LogStreamStatusRunning, - Replicas: make([]*varlogpb.ReplicaDescriptor, c.vt.replicationFactor), + Replicas: logStreamReplicas, } - if logStreamReplicas == nil { + if lsd.Replicas == nil { + lsd.Replicas = make([]*varlogpb.ReplicaDescriptor, c.vt.replicationFactor) snids := c.vt.storageNodeIDs() for i, j := range c.vt.rng.Perm(len(snids))[:c.vt.replicationFactor] { snid := snids[j] @@ -260,8 +261,19 @@ func (c *testAdmin) AddLogStream(_ context.Context, topicID types.TopicID, logSt DataPath: dataPath, } } - } else { - lsd.Replicas = logStreamReplicas + } + + if err := lsd.Validate(); err != nil { + return nil, err + } + if len(lsd.Replicas) < c.vt.replicationFactor { + return nil, errors.New("not enough replicas") + } + for _, rd := range lsd.Replicas { + _, ok := c.vt.storageNodes[rd.StorageNodeID] + if !ok { + return nil, fmt.Errorf("unknown storage node %d", rd.StorageNodeID) + } } c.vt.logStreams[logStreamID] = lsd diff --git a/pkg/varlogtest/varlogtest_test.go b/pkg/varlogtest/varlogtest_test.go index dd951a0ef..b55fe81c5 100644 --- a/pkg/varlogtest/varlogtest_test.go +++ b/pkg/varlogtest/varlogtest_test.go @@ -6,6 +6,7 @@ import ( "io" "math/rand" "slices" + "strconv" "sync" "testing" "time" @@ -20,9 +21,128 @@ import ( "github.com/kakao/varlog/pkg/varlog/x/mlsa" "github.com/kakao/varlog/pkg/varlogtest" "github.com/kakao/varlog/pkg/verrors" + "github.com/kakao/varlog/proto/snpb" "github.com/kakao/varlog/proto/varlogpb" ) +func TestVarlogTestAddLogStream(t *testing.T) { + const ( + cid = types.MinClusterID + repfactor = 3 + ) + + ctx := context.Background() + + addStorageNodes := func(t *testing.T, adm varlog.Admin) []snpb.StorageNodeMetadataDescriptor { + snmds := make([]snpb.StorageNodeMetadataDescriptor, repfactor) + for i := 0; i < repfactor; i++ { + snid := types.MinStorageNodeID + types.StorageNodeID(i) + snaddr := "sn" + strconv.Itoa(i+1) + snm, err := adm.AddStorageNode(ctx, snid, snaddr) + require.NoError(t, err) + snmds[i] = snm.StorageNodeMetadataDescriptor + } + return snmds + } + + tcs := []struct { + testf func(t *testing.T, adm varlog.Admin, tpid types.TopicID) + name string + }{ + { + name: "Succeed", + testf: func(t *testing.T, adm varlog.Admin, tpid types.TopicID) { + _ = addStorageNodes(t, adm) + + _, err := adm.AddLogStream(ctx, tpid, nil) + require.NoError(t, err) + }, + }, + { + name: "CouldNotAddLogStream_NoSuchTopic", + testf: func(t *testing.T, adm varlog.Admin, tpid types.TopicID) { + _, err := adm.AddLogStream(ctx, tpid+1, nil) + require.Error(t, err) + }, + }, + { + name: "CouldNotAddLogStream_NotEnoughStorageNodes", + testf: func(t *testing.T, adm varlog.Admin, tpid types.TopicID) { + _, err := adm.AddLogStream(ctx, tpid, nil) + require.Error(t, err) + }, + }, + { + name: "CouldNotAddLogStream_DuplicateStorageNodeInReplicas", + testf: func(t *testing.T, adm varlog.Admin, tpid types.TopicID) { + snmds := addStorageNodes(t, adm) + + replicas := make([]*varlogpb.ReplicaDescriptor, repfactor) + for i := range replicas { + replicas[i] = &varlogpb.ReplicaDescriptor{ + StorageNodeID: snmds[0].StorageNodeID, + StorageNodePath: snmds[0].Storages[0].Path, + } + } + _, err := adm.AddLogStream(ctx, tpid, replicas) + require.Error(t, err) + }, + }, + { + name: "CouldNotAddLogStream_UnknownStorageNode", + testf: func(t *testing.T, adm varlog.Admin, tpid types.TopicID) { + snmds := addStorageNodes(t, adm) + + replicas := make([]*varlogpb.ReplicaDescriptor, repfactor) + for i := range replicas { + replicas[i] = &varlogpb.ReplicaDescriptor{ + StorageNodeID: snmds[i].StorageNodeID + 10, + StorageNodePath: snmds[i].Storages[0].Path, + } + } + _, err := adm.AddLogStream(ctx, tpid, replicas) + require.Error(t, err) + }, + }, + { + name: "CouldNotAddLogStream_ReplicasLengthLessThanReplicationFactor", + testf: func(t *testing.T, adm varlog.Admin, tpid types.TopicID) { + snmds := addStorageNodes(t, adm) + + replicas := make([]*varlogpb.ReplicaDescriptor, repfactor-1) + for i := 0; i < repfactor-1; i++ { + replicas[i] = &varlogpb.ReplicaDescriptor{ + StorageNodeID: snmds[i].StorageNodeID, + StorageNodePath: snmds[i].Storages[0].Path, + } + } + _, err := adm.AddLogStream(ctx, tpid, replicas) + require.Error(t, err) + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + vt, err := varlogtest.New( + varlogtest.WithClusterID(cid), + varlogtest.WithReplicationFactor(repfactor), + ) + require.NoError(t, err) + + adm := vt.Admin() + t.Cleanup(func() { + require.NoError(t, adm.Close()) + }) + + td, err := adm.AddTopic(ctx) + require.NoError(t, err) + + tc.testf(t, adm, td.TopicID) + }) + } +} + func TestVarlotTest_LogStreamAppender(t *testing.T) { const ( cid = types.ClusterID(1) diff --git a/proto/varlogpb/metadata.go b/proto/varlogpb/metadata.go index fb6c44ea4..87ec0c1fa 100644 --- a/proto/varlogpb/metadata.go +++ b/proto/varlogpb/metadata.go @@ -88,6 +88,28 @@ func (s TopicStatus) Deleted() bool { return s == TopicStatusDeleted } +func (lsd LogStreamDescriptor) Validate() error { + if len(lsd.Replicas) == 0 { + return errors.New("log stream descriptor: no replicas") + } + + const size = 3 + snids := make(map[types.StorageNodeID]struct{}, size) + for idx := range lsd.Replicas { + if lsd.Replicas[idx] == nil { + return errors.New("log stream descriptor: nil replica") + } + if err := lsd.Replicas[idx].Validate(); err != nil { + return fmt.Errorf("log stream descriptor: %w", err) + } + snids[lsd.Replicas[idx].StorageNodeID] = struct{}{} + } + if len(snids) != len(lsd.Replicas) { + return errors.New("log stream descriptor: duplicate storage nodes") + } + return nil +} + func (l *LogStreamDescriptor) Valid() bool { if l == nil || len(l.Replicas) == 0 { return false @@ -112,6 +134,16 @@ func (l *LogStreamDescriptor) IsReplica(snID types.StorageNodeID) bool { return false } +func (rd ReplicaDescriptor) Validate() error { + if rd.StorageNodeID.Invalid() { + return errors.New("replica descriptor: invalid storage node id") + } + if len(rd.StorageNodePath) == 0 { + return fmt.Errorf("replica descriptor: no path in storage node %d", rd.StorageNodeID) + } + return nil +} + func (r *ReplicaDescriptor) valid() bool { return r != nil && len(r.StorageNodePath) != 0 }