Skip to content

Commit

Permalink
feat(metarepos): add an upper limit for the number of topics in a clu…
Browse files Browse the repository at this point in the history
…ster

This patch adds an upper limit for the number of topics in a cluster. To do that, it adds a new CLI
flag `--max-topics-count` to the varlogmr executable.

When the metadata storage tries registering a new topic, it compares the current number of topics in
the cluster against the given upper limit. If the metadata repository rejects the RegisterTopic RPC,
gRPC code of the response is ResourceExhausted.

Resolves #295
  • Loading branch information
ijsong committed Dec 28, 2022
1 parent 6b8414b commit 77c6ee4
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 2 deletions.
9 changes: 9 additions & 0 deletions cmd/varlogmr/flags.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package main

import (
"github.com/urfave/cli/v2"

"github.com/kakao/varlog/internal/flags"
"github.com/kakao/varlog/internal/metarepos"
)

var (
Expand Down Expand Up @@ -123,6 +126,12 @@ var (
Envs: []string{"REPORTCOMMITTER_WRITE_BUFFER_SIZE"},
}

flagMaxTopicsCount = &cli.IntFlag{
Name: "max-topics-count",
Usage: "Maximum number of topics, infinity if it is negative",
Value: metarepos.DefaultMaxTopicsCount,
}

flagTelemetryCollectorName = flags.FlagDesc{
Name: "telemetry-collector-name",
Aliases: []string{"collector-name"},
Expand Down
2 changes: 2 additions & 0 deletions cmd/varlogmr/metadata_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func start(c *cli.Context) error {
metarepos.WithReportCommitterReadBufferSize(int(readBufferSize)),
metarepos.WithReportCommitterWriteBufferSize(int(writeBufferSize)),
metarepos.WithPeers(c.StringSlice(flagPeers.Name)...),
metarepos.WithMaxTopicsCount(int32(c.Int(flagMaxTopicsCount.Name))),
metarepos.WithTelemetryCollectorName(c.String(flagTelemetryCollectorName.Name)),
metarepos.WithTelemetryCollectorEndpoint(c.String(flagTelemetryCollectorEndpoint.Name)),
metarepos.WithLogger(logger),
Expand Down Expand Up @@ -119,6 +120,7 @@ func initCLI() *cli.App {
flagPeers.StringSliceFlag(false, nil),
flagReportCommitterReadBufferSize.StringFlag(false, units.ToByteSizeString(metarepos.DefaultReportCommitterReadBufferSize)),
flagReportCommitterWriteBufferSize.StringFlag(false, units.ToByteSizeString(metarepos.DefaultReportCommitterWriteBufferSize)),
flagMaxTopicsCount,
flagTelemetryCollectorName.StringFlag(false, metarepos.DefaultTelemetryCollectorName),
flagTelemetryCollectorEndpoint.StringFlag(false, metarepos.DefaultTelmetryCollectorEndpoint),
flagLogDir.StringFlag(false, metarepos.DefaultLogDir),
Expand Down
10 changes: 10 additions & 0 deletions internal/metarepos/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const (
DefaultReportCommitterReadBufferSize = 32 * 1024 // 32KB
DefaultReportCommitterWriteBufferSize = 32 * 1024 // 32KB

DefaultMaxTopicsCount = -1

UnusedRequestIndex uint64 = 0
)

Expand Down Expand Up @@ -78,6 +80,7 @@ type config struct {
reporterClientFac ReporterClientFactory
reportCommitterReadBufferSize int
reportCommitterWriteBufferSize int
maxTopicsCount int32
telemetryCollectorName string
telemetryCollectorEndpoint string
logger *zap.Logger
Expand All @@ -104,6 +107,7 @@ func newConfig(opts []Option) (config, error) {
promoteTick: DefaultPromoteTick,
reportCommitterReadBufferSize: DefaultReportCommitterReadBufferSize,
reportCommitterWriteBufferSize: DefaultReportCommitterWriteBufferSize,
maxTopicsCount: DefaultMaxTopicsCount,
telemetryCollectorName: DefaultTelemetryCollectorName,
telemetryCollectorEndpoint: DefaultTelmetryCollectorEndpoint,
logger: zap.NewNop(),
Expand Down Expand Up @@ -333,6 +337,12 @@ func WithReportCommitterWriteBufferSize(writeBufferSize int) Option {
})
}

func WithMaxTopicsCount(maxTopicsCount int32) Option {
return newFuncOption(func(cfg *config) {
cfg.maxTopicsCount = maxTopicsCount
})
}

func WithTelemetryCollectorName(telemetryCollectorName string) Option {
return newFuncOption(func(cfg *config) {
cfg.telemetryCollectorName = telemetryCollectorName
Expand Down
1 change: 1 addition & 0 deletions internal/metarepos/raft_metadata_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func NewRaftMetadataRepository(opts ...Option) *RaftMetadataRepository {
}

mr.storage = NewMetadataStorage(mr.sendAck, cfg.snapCount, mr.logger.Named("storage"))
mr.storage.limits.maxTopicsCount = mr.maxTopicsCount
mr.membership = mr.storage

mr.listenNotifyC = make(chan struct{})
Expand Down
48 changes: 48 additions & 0 deletions internal/metarepos/raft_metadata_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import (
"go.uber.org/goleak"
"go.uber.org/multierr"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"

"github.com/kakao/varlog/pkg/rpc"
"github.com/kakao/varlog/pkg/types"
Expand Down Expand Up @@ -2439,6 +2441,52 @@ func TestMRUnregisterTopic(t *testing.T) {
})
}

func TestMetadataRepository_MaxTopicsCount(t *testing.T) {
const numNodes = 1
const repFactor = 1
const increaseUncommit = false

Convey("MaxTopicsCount", t, func(C) {
clus := newMetadataRepoCluster(numNodes, repFactor, increaseUncommit)
Reset(func() {
clus.closeNoErrors(t)
})

So(clus.Start(), ShouldBeNil)
So(testutil.CompareWaitN(10, func() bool {
return clus.healthCheckAll()
}), ShouldBeTrue)

mr := clus.nodes[0]

Convey("Limit is zero", func(C) {
mr.storage.limits.maxTopicsCount = 0
err := mr.RegisterTopic(context.Background(), 1)
So(err, ShouldNotBeNil)
})

Convey("Limit is one", func(C) {
mr.storage.limits.maxTopicsCount = 1

err := mr.RegisterTopic(context.Background(), 1)
So(err, ShouldBeNil)

err = mr.RegisterTopic(context.Background(), 2)
So(err, ShouldNotBeNil)
So(status.Code(err), ShouldEqual, codes.ResourceExhausted)

err = mr.RegisterTopic(context.Background(), 1)
So(err, ShouldBeNil)

err = mr.UnregisterTopic(context.TODO(), 1)
So(err, ShouldBeNil)

err = mr.RegisterTopic(context.Background(), 2)
So(err, ShouldBeNil)
})
})
}

func TestMRTopicLastHighWatermark(t *testing.T) {
Convey("given metadata repository with multiple topics", t, func(ctx C) {
nrTopics := 3
Expand Down
29 changes: 28 additions & 1 deletion internal/metarepos/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (
"sync/atomic"

"github.com/gogo/protobuf/proto"
"github.com/gogo/status"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"
"go.uber.org/zap"
"google.golang.org/grpc/codes"

"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/pkg/util/mathutil"
Expand Down Expand Up @@ -127,6 +129,10 @@ type MetadataStorage struct {
runner *runner.Runner
running atomicutil.AtomicBool

limits struct {
maxTopicsCount int32
}

logger *zap.Logger
}

Expand All @@ -141,6 +147,7 @@ func NewMetadataStorage(cb func(uint64, uint64, error), snapCount uint64, logger
logger: logger,
}
ms.snapCount = snapCount
ms.limits.maxTopicsCount = DefaultMaxTopicsCount

ms.origStateMachine = &mrpb.MetadataRepositoryDescriptor{}
ms.origStateMachine.Metadata = &varlogpb.MetadataDescriptor{}
Expand Down Expand Up @@ -603,11 +610,31 @@ func (ms *MetadataStorage) registerTopic(topic *varlogpb.TopicDescriptor) error
return nil
}

_, cur := ms.getStateMachine()
tpids := make(map[types.TopicID]struct{})
pre, cur := ms.getStateMachine()

ms.mtMu.Lock()
defer ms.mtMu.Unlock()

for _, tp := range pre.Metadata.Topics {
if !tp.Status.Deleted() {
tpids[tp.TopicID] = struct{}{}
}
}
if pre != cur {
for _, tp := range cur.Metadata.Topics {
if tp.Status.Deleted() {
delete(tpids, tp.TopicID)
continue
}
tpids[tp.TopicID] = struct{}{}
}
}

if ms.limits.maxTopicsCount >= 0 && len(tpids) >= int(ms.limits.maxTopicsCount) {
return status.Errorf(codes.ResourceExhausted, "too many topics, limits %d", len(tpids))
}

if err := cur.Metadata.UpsertTopic(topic); err != nil {
return err
}
Expand Down
51 changes: 51 additions & 0 deletions internal/metarepos/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ import (
"time"

. "github.com/smartystreets/goconvey/convey"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/raft/raftpb"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/pkg/util/testutil"
Expand Down Expand Up @@ -1919,6 +1923,53 @@ func TestStoragUnregisterTopic(t *testing.T) {
})
}

func TestStorage_MaxTopicsCount(t *testing.T) {
tcs := []struct {
name string
maxTopicsCount int32
testf func(t *testing.T, ms *MetadataStorage)
}{
{
name: "LimitOne",
maxTopicsCount: 1,
testf: func(t *testing.T, ms *MetadataStorage) {
err := ms.registerTopic(&varlogpb.TopicDescriptor{TopicID: 1})
require.NoError(t, err)

err = ms.registerTopic(&varlogpb.TopicDescriptor{TopicID: 2})
require.Error(t, err)
require.Equal(t, codes.ResourceExhausted, status.Code(err))

err = ms.registerTopic(&varlogpb.TopicDescriptor{TopicID: 1})
require.NoError(t, err)

err = ms.unregisterTopic(1)
require.NoError(t, err)

err = ms.registerTopic(&varlogpb.TopicDescriptor{TopicID: 2})
require.NoError(t, err)
},
},
{
name: "LimitZero",
maxTopicsCount: 0,
testf: func(t *testing.T, ms *MetadataStorage) {
err := ms.registerTopic(&varlogpb.TopicDescriptor{TopicID: 1})
require.Error(t, err)
require.Equal(t, codes.ResourceExhausted, status.Code(err))
},
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
ms := NewMetadataStorage(nil, DefaultSnapshotCount, zaptest.NewLogger(t))
ms.limits.maxTopicsCount = tc.maxTopicsCount
tc.testf(t, ms)
})
}
}

func TestStorageSortedTopicLogStreamIDs(t *testing.T) {
Convey("UncommitReport should be committed", t, func(ctx C) {
/*
Expand Down
10 changes: 9 additions & 1 deletion pkg/mrc/metadata_repository_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (

"github.com/pkg/errors"
"go.uber.org/multierr"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"

"github.com/kakao/varlog/pkg/rpc"
"github.com/kakao/varlog/pkg/types"
Expand Down Expand Up @@ -100,7 +102,13 @@ func (c *metadataRepositoryClient) RegisterTopic(ctx context.Context, topicID ty
}

_, err := c.client.RegisterTopic(ctx, req)
return verrors.FromStatusError(errors.WithStack(err))
if err != nil {
if code := status.Code(err); code == codes.ResourceExhausted {
return err
}
return verrors.FromStatusError(errors.WithStack(err))
}
return nil
}

func (c *metadataRepositoryClient) UnregisterTopic(ctx context.Context, topicID types.TopicID) error {
Expand Down

0 comments on commit 77c6ee4

Please sign in to comment.