From 2cfc8bf8364092d3d93c21f49f544431ddde3a7e Mon Sep 17 00:00:00 2001 From: Injun Song Date: Thu, 22 Dec 2022 17:33:09 +0900 Subject: [PATCH] feat(storagenode): add an upper limit of log stream replicas count in a storage node This patch adds a new CLI flag, `--max-logstream-replicas-count,` which limits the number of log stream replicas in a storage node. The default value is -1, meaning no upper limit, and a storage node cannot create a log stream replica if it is zero. A storage node keeps a count variable for the number of log stream replicas. Whenever the storage node creates a new replica, it checks if the number of log stream replicas exceeds the upper limit. If there is no problem, the storage node creates a new replica and increases the count variable. Conversely, the storage node decreases the count variable whenever a log stream replica is removed. This approach has a limitation: it cannot distinguish between standard and garbage replicas. Fundamentally, a storage node has no method to distinguish them. So, this PR won't consider that. Resolves #293 --- cmd/varlogsn/cli.go | 1 + cmd/varlogsn/flags.go | 9 +++ cmd/varlogsn/varlogsn.go | 1 + .../storagenode/client/management_client.go | 2 +- internal/storagenode/config.go | 9 +++ internal/storagenode/storagenode.go | 13 ++++ internal/storagenode/storagenode_test.go | 62 +++++++++++++++++++ 7 files changed, 96 insertions(+), 1 deletion(-) diff --git a/cmd/varlogsn/cli.go b/cmd/varlogsn/cli.go index efe9db103..5c2be02c7 100644 --- a/cmd/varlogsn/cli.go +++ b/cmd/varlogsn/cli.go @@ -54,6 +54,7 @@ func newStartCommand() *cli.Command { flagLogStreamExecutorWriteQueueCapacity.IntFlag(false, logstream.DefaultWriteQueueCapacity), flagLogStreamExecutorCommitQueueCapacity.IntFlag(false, logstream.DefaultCommitQueueCapacity), flagLogStreamExecutorReplicateclientQueueCapacity.IntFlag(false, logstream.DefaultReplicateClientQueueCapacity), + flagMaxLogStreamReplicasCount, // storage options flagStorageDisableWAL.BoolFlag(), diff --git a/cmd/varlogsn/flags.go b/cmd/varlogsn/flags.go index a39a31e6c..16decb243 100644 --- a/cmd/varlogsn/flags.go +++ b/cmd/varlogsn/flags.go @@ -1,7 +1,10 @@ package main import ( + "github.com/urfave/cli/v2" + "github.com/kakao/varlog/internal/flags" + "github.com/kakao/varlog/internal/storagenode" ) var ( @@ -28,6 +31,12 @@ var ( Envs: []string{"VOLUMES", "VOLUME"}, } + flagMaxLogStreamReplicasCount = &cli.IntFlag{ + Name: "max-logstream-replicas-count", + Usage: "The maximum number of log stream replicas in a storage node, infinity if a negative value", + Value: storagenode.DefaultMaxLogStreamReplicasCount, + } + // flags for grpc options. flagServerReadBufferSize = flags.FlagDesc{ Name: "server-read-buffer-size", diff --git a/cmd/varlogsn/varlogsn.go b/cmd/varlogsn/varlogsn.go index 87e56d5ed..5e3eb9c30 100644 --- a/cmd/varlogsn/varlogsn.go +++ b/cmd/varlogsn/varlogsn.go @@ -171,6 +171,7 @@ func start(c *cli.Context) error { logstream.WithCommitQueueCapacity(c.Int(flagLogStreamExecutorCommitQueueCapacity.Name)), logstream.WithReplicateClientQueueCapacity(c.Int(flagLogStreamExecutorReplicateclientQueueCapacity.Name)), ), + storagenode.WithMaxLogStreamReplicasCount(int32(c.Int(flagMaxLogStreamReplicasCount.Name))), storagenode.WithDefaultStorageOptions(storageOpts...), storagenode.WithLogger(logger), ) diff --git a/internal/storagenode/client/management_client.go b/internal/storagenode/client/management_client.go index 47cf4479d..c38b1b91e 100644 --- a/internal/storagenode/client/management_client.go +++ b/internal/storagenode/client/management_client.go @@ -96,7 +96,7 @@ func (c *ManagementClient) AddLogStreamReplica(ctx context.Context, tpid types.T StorageNodePath: snpath, }) if err != nil { - return snpb.LogStreamReplicaMetadataDescriptor{}, errors.Wrap(verrors.FromStatusError(err), "snmcl") + return snpb.LogStreamReplicaMetadataDescriptor{}, err } return rsp.LogStreamReplica, nil } diff --git a/internal/storagenode/config.go b/internal/storagenode/config.go index 558db6dbe..443a1f8e2 100644 --- a/internal/storagenode/config.go +++ b/internal/storagenode/config.go @@ -20,6 +20,7 @@ const ( DefaultServerMaxRecvSize = 4 << 20 DefaultReplicateClientReadBufferSize = 32 << 10 DefaultReplicateClientWriteBufferSize = 32 << 10 + DefaultMaxLogStreamReplicasCount = -1 ) type config struct { @@ -33,6 +34,7 @@ type config struct { grpcServerMaxRecvMsgSize int64 replicateClientReadBufferSize int64 replicateClientWriteBufferSize int64 + maxLogStreamReplicasCount int32 volumes []string defaultLogStreamExecutorOptions []logstream.ExecutorOption pprofOpts []pprof.Option @@ -47,6 +49,7 @@ func newConfig(opts []Option) (config, error) { grpcServerMaxRecvMsgSize: DefaultServerMaxRecvSize, replicateClientReadBufferSize: DefaultReplicateClientReadBufferSize, replicateClientWriteBufferSize: DefaultReplicateClientWriteBufferSize, + maxLogStreamReplicasCount: DefaultMaxLogStreamReplicasCount, logger: zap.NewNop(), } for _, opt := range opts { @@ -182,6 +185,12 @@ func WithDefaultLogStreamExecutorOptions(defaultLSEOptions ...logstream.Executor }) } +func WithMaxLogStreamReplicasCount(maxLogStreamReplicasCount int32) Option { + return newFuncOption(func(cfg *config) { + cfg.maxLogStreamReplicasCount = maxLogStreamReplicasCount + }) +} + func WithVolumes(volumes ...string) Option { return newFuncOption(func(cfg *config) { cfg.volumes = volumes diff --git a/internal/storagenode/storagenode.go b/internal/storagenode/storagenode.go index 3ec97a4c6..b3aab79fc 100644 --- a/internal/storagenode/storagenode.go +++ b/internal/storagenode/storagenode.go @@ -9,6 +9,7 @@ import ( "path" "strings" "sync" + "sync/atomic" "time" grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware" @@ -21,8 +22,10 @@ import ( "golang.org/x/sync/errgroup" "golang.org/x/sync/singleflight" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/health" "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" "github.com/kakao/varlog/internal/storage" "github.com/kakao/varlog/internal/storagenode/executorsmap" @@ -67,6 +70,10 @@ type StorageNode struct { sf singleflight.Group startTime time.Time + + limits struct { + logStreamReplicasCount atomic.Int32 + } } func NewStorageNode(opts ...Option) (*StorageNode, error) { @@ -301,6 +308,11 @@ func (sn *StorageNode) addLogStreamReplica(ctx context.Context, tpid types.Topic } func (sn *StorageNode) runLogStreamReplica(_ context.Context, tpid types.TopicID, lsid types.LogStreamID, lsPath string) (*logstream.Executor, error) { + if added := sn.limits.logStreamReplicasCount.Add(1); sn.maxLogStreamReplicasCount >= 0 && added > sn.maxLogStreamReplicasCount { + sn.limits.logStreamReplicasCount.Add(-1) + return nil, status.Errorf(codes.ResourceExhausted, "storagenode: too many logstream replicas (tpid=%d, lsid=%d)", tpid, lsid) + } + lsm, err := telemetry.RegisterLogStreamMetrics(sn.metrics, lsid) if err != nil { return nil, err @@ -362,6 +374,7 @@ func (sn *StorageNode) removeLogStreamReplica(_ context.Context, tpid types.Topi if err := os.RemoveAll(lse.Path()); err != nil { sn.logger.Warn("error while removing log stream path") } + sn.limits.logStreamReplicasCount.Add(-1) return nil } diff --git a/internal/storagenode/storagenode_test.go b/internal/storagenode/storagenode_test.go index f9eb88c80..7e0282a6c 100644 --- a/internal/storagenode/storagenode_test.go +++ b/internal/storagenode/storagenode_test.go @@ -13,6 +13,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/kakao/varlog/internal/reportcommitter" "github.com/kakao/varlog/internal/storage" @@ -1131,3 +1133,63 @@ func TestStorageNode_Sync(t *testing.T) { }) } } + +func TestStorageNode_MaxLogStreamReplicasCount(t *testing.T) { + ctx := context.Background() + + tcs := []struct { + name string + maxLogStreamReplicasCount int32 + testf func(t *testing.T, snpath string, mc *client.ManagementClient) + }{ + { + name: "LimitOne", + maxLogStreamReplicasCount: 1, + testf: func(t *testing.T, snpath string, mc *client.ManagementClient) { + _, err := mc.AddLogStreamReplica(ctx, 1, 1, snpath) + require.NoError(t, err) + + _, err = mc.AddLogStreamReplica(ctx, 1, 2, snpath) + require.Error(t, err) + require.Equal(t, codes.ResourceExhausted, status.Code(err)) + + err = mc.RemoveLogStream(ctx, 1, 1) + require.NoError(t, err) + + _, err = mc.AddLogStreamReplica(ctx, 1, 2, snpath) + require.NoError(t, err) + }, + }, + { + name: "LimitZero", + maxLogStreamReplicasCount: 0, + testf: func(t *testing.T, snpath string, mc *client.ManagementClient) { + _, err := mc.AddLogStreamReplica(ctx, 1, 1, snpath) + require.Error(t, err) + require.Equal(t, codes.ResourceExhausted, status.Code(err)) + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + sn := TestNewSimpleStorageNode(t, WithMaxLogStreamReplicasCount(tc.maxLogStreamReplicasCount)) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + _ = sn.Serve() + }() + defer func() { + assert.NoError(t, sn.Close()) + wg.Wait() + }() + + addr := TestGetAdvertiseAddress(t, sn) + mc, mcClose := TestNewManagementClient(t, sn.cid, sn.snid, addr) + defer mcClose() + + tc.testf(t, sn.snPaths[0], mc) + }) + } +}