Skip to content

Commit

Permalink
Merge pull request #294 from ijsong/limits-lsreplica
Browse files Browse the repository at this point in the history
feat(storagenode): add an upper limit of log stream replicas count in a storage node
  • Loading branch information
ijsong authored Dec 28, 2022
2 parents 0a167a2 + 2cfc8bf commit 6b8414b
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 1 deletion.
1 change: 1 addition & 0 deletions cmd/varlogsn/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func newStartCommand() *cli.Command {
flagLogStreamExecutorWriteQueueCapacity.IntFlag(false, logstream.DefaultWriteQueueCapacity),
flagLogStreamExecutorCommitQueueCapacity.IntFlag(false, logstream.DefaultCommitQueueCapacity),
flagLogStreamExecutorReplicateclientQueueCapacity.IntFlag(false, logstream.DefaultReplicateClientQueueCapacity),
flagMaxLogStreamReplicasCount,

// storage options
flagStorageDisableWAL.BoolFlag(),
Expand Down
9 changes: 9 additions & 0 deletions cmd/varlogsn/flags.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package main

import (
"github.com/urfave/cli/v2"

"github.com/kakao/varlog/internal/flags"
"github.com/kakao/varlog/internal/storagenode"
)

var (
Expand All @@ -28,6 +31,12 @@ var (
Envs: []string{"VOLUMES", "VOLUME"},
}

flagMaxLogStreamReplicasCount = &cli.IntFlag{
Name: "max-logstream-replicas-count",
Usage: "The maximum number of log stream replicas in a storage node, infinity if a negative value",
Value: storagenode.DefaultMaxLogStreamReplicasCount,
}

// flags for grpc options.
flagServerReadBufferSize = flags.FlagDesc{
Name: "server-read-buffer-size",
Expand Down
1 change: 1 addition & 0 deletions cmd/varlogsn/varlogsn.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func start(c *cli.Context) error {
logstream.WithCommitQueueCapacity(c.Int(flagLogStreamExecutorCommitQueueCapacity.Name)),
logstream.WithReplicateClientQueueCapacity(c.Int(flagLogStreamExecutorReplicateclientQueueCapacity.Name)),
),
storagenode.WithMaxLogStreamReplicasCount(int32(c.Int(flagMaxLogStreamReplicasCount.Name))),
storagenode.WithDefaultStorageOptions(storageOpts...),
storagenode.WithLogger(logger),
)
Expand Down
2 changes: 1 addition & 1 deletion internal/storagenode/client/management_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (c *ManagementClient) AddLogStreamReplica(ctx context.Context, tpid types.T
StorageNodePath: snpath,
})
if err != nil {
return snpb.LogStreamReplicaMetadataDescriptor{}, errors.Wrap(verrors.FromStatusError(err), "snmcl")
return snpb.LogStreamReplicaMetadataDescriptor{}, err
}
return rsp.LogStreamReplica, nil
}
Expand Down
9 changes: 9 additions & 0 deletions internal/storagenode/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
DefaultServerMaxRecvSize = 4 << 20
DefaultReplicateClientReadBufferSize = 32 << 10
DefaultReplicateClientWriteBufferSize = 32 << 10
DefaultMaxLogStreamReplicasCount = -1
)

type config struct {
Expand All @@ -33,6 +34,7 @@ type config struct {
grpcServerMaxRecvMsgSize int64
replicateClientReadBufferSize int64
replicateClientWriteBufferSize int64
maxLogStreamReplicasCount int32
volumes []string
defaultLogStreamExecutorOptions []logstream.ExecutorOption
pprofOpts []pprof.Option
Expand All @@ -47,6 +49,7 @@ func newConfig(opts []Option) (config, error) {
grpcServerMaxRecvMsgSize: DefaultServerMaxRecvSize,
replicateClientReadBufferSize: DefaultReplicateClientReadBufferSize,
replicateClientWriteBufferSize: DefaultReplicateClientWriteBufferSize,
maxLogStreamReplicasCount: DefaultMaxLogStreamReplicasCount,
logger: zap.NewNop(),
}
for _, opt := range opts {
Expand Down Expand Up @@ -182,6 +185,12 @@ func WithDefaultLogStreamExecutorOptions(defaultLSEOptions ...logstream.Executor
})
}

func WithMaxLogStreamReplicasCount(maxLogStreamReplicasCount int32) Option {
return newFuncOption(func(cfg *config) {
cfg.maxLogStreamReplicasCount = maxLogStreamReplicasCount
})
}

func WithVolumes(volumes ...string) Option {
return newFuncOption(func(cfg *config) {
cfg.volumes = volumes
Expand Down
13 changes: 13 additions & 0 deletions internal/storagenode/storagenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"path"
"strings"
"sync"
"sync/atomic"
"time"

grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware"
Expand All @@ -21,8 +22,10 @@ import (
"golang.org/x/sync/errgroup"
"golang.org/x/sync/singleflight"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"

"github.com/kakao/varlog/internal/storage"
"github.com/kakao/varlog/internal/storagenode/executorsmap"
Expand Down Expand Up @@ -67,6 +70,10 @@ type StorageNode struct {
sf singleflight.Group

startTime time.Time

limits struct {
logStreamReplicasCount atomic.Int32
}
}

func NewStorageNode(opts ...Option) (*StorageNode, error) {
Expand Down Expand Up @@ -301,6 +308,11 @@ func (sn *StorageNode) addLogStreamReplica(ctx context.Context, tpid types.Topic
}

func (sn *StorageNode) runLogStreamReplica(_ context.Context, tpid types.TopicID, lsid types.LogStreamID, lsPath string) (*logstream.Executor, error) {
if added := sn.limits.logStreamReplicasCount.Add(1); sn.maxLogStreamReplicasCount >= 0 && added > sn.maxLogStreamReplicasCount {
sn.limits.logStreamReplicasCount.Add(-1)
return nil, status.Errorf(codes.ResourceExhausted, "storagenode: too many logstream replicas (tpid=%d, lsid=%d)", tpid, lsid)
}

lsm, err := telemetry.RegisterLogStreamMetrics(sn.metrics, lsid)
if err != nil {
return nil, err
Expand Down Expand Up @@ -362,6 +374,7 @@ func (sn *StorageNode) removeLogStreamReplica(_ context.Context, tpid types.Topi
if err := os.RemoveAll(lse.Path()); err != nil {
sn.logger.Warn("error while removing log stream path")
}
sn.limits.logStreamReplicasCount.Add(-1)
return nil
}

Expand Down
62 changes: 62 additions & 0 deletions internal/storagenode/storagenode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/kakao/varlog/internal/reportcommitter"
"github.com/kakao/varlog/internal/storage"
Expand Down Expand Up @@ -1131,3 +1133,63 @@ func TestStorageNode_Sync(t *testing.T) {
})
}
}

func TestStorageNode_MaxLogStreamReplicasCount(t *testing.T) {
ctx := context.Background()

tcs := []struct {
name string
maxLogStreamReplicasCount int32
testf func(t *testing.T, snpath string, mc *client.ManagementClient)
}{
{
name: "LimitOne",
maxLogStreamReplicasCount: 1,
testf: func(t *testing.T, snpath string, mc *client.ManagementClient) {
_, err := mc.AddLogStreamReplica(ctx, 1, 1, snpath)
require.NoError(t, err)

_, err = mc.AddLogStreamReplica(ctx, 1, 2, snpath)
require.Error(t, err)
require.Equal(t, codes.ResourceExhausted, status.Code(err))

err = mc.RemoveLogStream(ctx, 1, 1)
require.NoError(t, err)

_, err = mc.AddLogStreamReplica(ctx, 1, 2, snpath)
require.NoError(t, err)
},
},
{
name: "LimitZero",
maxLogStreamReplicasCount: 0,
testf: func(t *testing.T, snpath string, mc *client.ManagementClient) {
_, err := mc.AddLogStreamReplica(ctx, 1, 1, snpath)
require.Error(t, err)
require.Equal(t, codes.ResourceExhausted, status.Code(err))
},
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
sn := TestNewSimpleStorageNode(t, WithMaxLogStreamReplicasCount(tc.maxLogStreamReplicasCount))
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
_ = sn.Serve()
}()
defer func() {
assert.NoError(t, sn.Close())
wg.Wait()
}()

addr := TestGetAdvertiseAddress(t, sn)
mc, mcClose := TestNewManagementClient(t, sn.cid, sn.snid, addr)
defer mcClose()

tc.testf(t, sn.snPaths[0], mc)
})
}
}

0 comments on commit 6b8414b

Please sign in to comment.