diff --git a/cmd/varlogmr/flags.go b/cmd/varlogmr/flags.go index 9063c3462..467648cc8 100644 --- a/cmd/varlogmr/flags.go +++ b/cmd/varlogmr/flags.go @@ -1,7 +1,10 @@ package main import ( + "github.com/urfave/cli/v2" + "github.com/kakao/varlog/internal/flags" + "github.com/kakao/varlog/internal/metarepos" ) var ( @@ -123,6 +126,12 @@ var ( Envs: []string{"REPORTCOMMITTER_WRITE_BUFFER_SIZE"}, } + flagMaxTopicsCount = &cli.IntFlag{ + Name: "max-topics-count", + Usage: "Maximum number of topics, infinity if it is negative", + Value: metarepos.DefaultMaxTopicsCount, + } + flagTelemetryCollectorName = flags.FlagDesc{ Name: "telemetry-collector-name", Aliases: []string{"collector-name"}, diff --git a/cmd/varlogmr/metadata_repository.go b/cmd/varlogmr/metadata_repository.go index 7f2782546..d0742f85f 100644 --- a/cmd/varlogmr/metadata_repository.go +++ b/cmd/varlogmr/metadata_repository.go @@ -67,6 +67,7 @@ func start(c *cli.Context) error { metarepos.WithReportCommitterReadBufferSize(int(readBufferSize)), metarepos.WithReportCommitterWriteBufferSize(int(writeBufferSize)), metarepos.WithPeers(c.StringSlice(flagPeers.Name)...), + metarepos.WithMaxTopicsCount(int32(c.Int(flagMaxTopicsCount.Name))), metarepos.WithTelemetryCollectorName(c.String(flagTelemetryCollectorName.Name)), metarepos.WithTelemetryCollectorEndpoint(c.String(flagTelemetryCollectorEndpoint.Name)), metarepos.WithLogger(logger), @@ -119,6 +120,7 @@ func initCLI() *cli.App { flagPeers.StringSliceFlag(false, nil), flagReportCommitterReadBufferSize.StringFlag(false, units.ToByteSizeString(metarepos.DefaultReportCommitterReadBufferSize)), flagReportCommitterWriteBufferSize.StringFlag(false, units.ToByteSizeString(metarepos.DefaultReportCommitterWriteBufferSize)), + flagMaxTopicsCount, flagTelemetryCollectorName.StringFlag(false, metarepos.DefaultTelemetryCollectorName), flagTelemetryCollectorEndpoint.StringFlag(false, metarepos.DefaultTelmetryCollectorEndpoint), flagLogDir.StringFlag(false, metarepos.DefaultLogDir), diff --git a/internal/metarepos/config.go b/internal/metarepos/config.go index e24af87e1..781e2a8de 100644 --- a/internal/metarepos/config.go +++ b/internal/metarepos/config.go @@ -37,6 +37,8 @@ const ( DefaultReportCommitterReadBufferSize = 32 * 1024 // 32KB DefaultReportCommitterWriteBufferSize = 32 * 1024 // 32KB + DefaultMaxTopicsCount = -1 + UnusedRequestIndex uint64 = 0 ) @@ -78,6 +80,7 @@ type config struct { reporterClientFac ReporterClientFactory reportCommitterReadBufferSize int reportCommitterWriteBufferSize int + maxTopicsCount int32 telemetryCollectorName string telemetryCollectorEndpoint string logger *zap.Logger @@ -104,6 +107,7 @@ func newConfig(opts []Option) (config, error) { promoteTick: DefaultPromoteTick, reportCommitterReadBufferSize: DefaultReportCommitterReadBufferSize, reportCommitterWriteBufferSize: DefaultReportCommitterWriteBufferSize, + maxTopicsCount: DefaultMaxTopicsCount, telemetryCollectorName: DefaultTelemetryCollectorName, telemetryCollectorEndpoint: DefaultTelmetryCollectorEndpoint, logger: zap.NewNop(), @@ -333,6 +337,12 @@ func WithReportCommitterWriteBufferSize(writeBufferSize int) Option { }) } +func WithMaxTopicsCount(maxTopicsCount int32) Option { + return newFuncOption(func(cfg *config) { + cfg.maxTopicsCount = maxTopicsCount + }) +} + func WithTelemetryCollectorName(telemetryCollectorName string) Option { return newFuncOption(func(cfg *config) { cfg.telemetryCollectorName = telemetryCollectorName diff --git a/internal/metarepos/raft_metadata_repository.go b/internal/metarepos/raft_metadata_repository.go index 6aa1b3270..8dd161274 100644 --- a/internal/metarepos/raft_metadata_repository.go +++ b/internal/metarepos/raft_metadata_repository.go @@ -118,6 +118,7 @@ func NewRaftMetadataRepository(opts ...Option) *RaftMetadataRepository { } mr.storage = NewMetadataStorage(mr.sendAck, cfg.snapCount, mr.logger.Named("storage")) + mr.storage.limits.maxTopicsCount = mr.maxTopicsCount mr.membership = mr.storage mr.listenNotifyC = make(chan struct{}) diff --git a/internal/metarepos/raft_metadata_repository_test.go b/internal/metarepos/raft_metadata_repository_test.go index 4494586a6..4ff87fc43 100644 --- a/internal/metarepos/raft_metadata_repository_test.go +++ b/internal/metarepos/raft_metadata_repository_test.go @@ -16,7 +16,9 @@ import ( "go.uber.org/goleak" "go.uber.org/multierr" "go.uber.org/zap" + "google.golang.org/grpc/codes" "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" "github.com/kakao/varlog/pkg/rpc" "github.com/kakao/varlog/pkg/types" @@ -2439,6 +2441,52 @@ func TestMRUnregisterTopic(t *testing.T) { }) } +func TestMetadataRepository_MaxTopicsCount(t *testing.T) { + const numNodes = 1 + const repFactor = 1 + const increaseUncommit = false + + Convey("MaxTopicsCount", t, func(C) { + clus := newMetadataRepoCluster(numNodes, repFactor, increaseUncommit) + Reset(func() { + clus.closeNoErrors(t) + }) + + So(clus.Start(), ShouldBeNil) + So(testutil.CompareWaitN(10, func() bool { + return clus.healthCheckAll() + }), ShouldBeTrue) + + mr := clus.nodes[0] + + Convey("Limit is zero", func(C) { + mr.storage.limits.maxTopicsCount = 0 + err := mr.RegisterTopic(context.Background(), 1) + So(err, ShouldNotBeNil) + }) + + Convey("Limit is one", func(C) { + mr.storage.limits.maxTopicsCount = 1 + + err := mr.RegisterTopic(context.Background(), 1) + So(err, ShouldBeNil) + + err = mr.RegisterTopic(context.Background(), 2) + So(err, ShouldNotBeNil) + So(status.Code(err), ShouldEqual, codes.ResourceExhausted) + + err = mr.RegisterTopic(context.Background(), 1) + So(err, ShouldBeNil) + + err = mr.UnregisterTopic(context.TODO(), 1) + So(err, ShouldBeNil) + + err = mr.RegisterTopic(context.Background(), 2) + So(err, ShouldBeNil) + }) + }) +} + func TestMRTopicLastHighWatermark(t *testing.T) { Convey("given metadata repository with multiple topics", t, func(ctx C) { nrTopics := 3 diff --git a/internal/metarepos/storage.go b/internal/metarepos/storage.go index e4200ac3a..2eba3c283 100644 --- a/internal/metarepos/storage.go +++ b/internal/metarepos/storage.go @@ -10,9 +10,11 @@ import ( "sync/atomic" "github.com/gogo/protobuf/proto" + "github.com/gogo/status" "go.etcd.io/etcd/raft" "go.etcd.io/etcd/raft/raftpb" "go.uber.org/zap" + "google.golang.org/grpc/codes" "github.com/kakao/varlog/pkg/types" "github.com/kakao/varlog/pkg/util/mathutil" @@ -127,6 +129,10 @@ type MetadataStorage struct { runner *runner.Runner running atomicutil.AtomicBool + limits struct { + maxTopicsCount int32 + } + logger *zap.Logger } @@ -141,6 +147,7 @@ func NewMetadataStorage(cb func(uint64, uint64, error), snapCount uint64, logger logger: logger, } ms.snapCount = snapCount + ms.limits.maxTopicsCount = DefaultMaxTopicsCount ms.origStateMachine = &mrpb.MetadataRepositoryDescriptor{} ms.origStateMachine.Metadata = &varlogpb.MetadataDescriptor{} @@ -603,11 +610,31 @@ func (ms *MetadataStorage) registerTopic(topic *varlogpb.TopicDescriptor) error return nil } - _, cur := ms.getStateMachine() + tpids := make(map[types.TopicID]struct{}) + pre, cur := ms.getStateMachine() ms.mtMu.Lock() defer ms.mtMu.Unlock() + for _, tp := range pre.Metadata.Topics { + if !tp.Status.Deleted() { + tpids[tp.TopicID] = struct{}{} + } + } + if pre != cur { + for _, tp := range cur.Metadata.Topics { + if tp.Status.Deleted() { + delete(tpids, tp.TopicID) + continue + } + tpids[tp.TopicID] = struct{}{} + } + } + + if ms.limits.maxTopicsCount >= 0 && len(tpids) >= int(ms.limits.maxTopicsCount) { + return status.Errorf(codes.ResourceExhausted, "too many topics, limits %d", len(tpids)) + } + if err := cur.Metadata.UpsertTopic(topic); err != nil { return err } diff --git a/internal/metarepos/storage_test.go b/internal/metarepos/storage_test.go index 2bc7172a4..3e3d8438d 100644 --- a/internal/metarepos/storage_test.go +++ b/internal/metarepos/storage_test.go @@ -9,8 +9,12 @@ import ( "time" . "github.com/smartystreets/goconvey/convey" + "github.com/stretchr/testify/require" "go.etcd.io/etcd/raft/raftpb" "go.uber.org/zap" + "go.uber.org/zap/zaptest" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/kakao/varlog/pkg/types" "github.com/kakao/varlog/pkg/util/testutil" @@ -1919,6 +1923,53 @@ func TestStoragUnregisterTopic(t *testing.T) { }) } +func TestStorage_MaxTopicsCount(t *testing.T) { + tcs := []struct { + name string + maxTopicsCount int32 + testf func(t *testing.T, ms *MetadataStorage) + }{ + { + name: "LimitOne", + maxTopicsCount: 1, + testf: func(t *testing.T, ms *MetadataStorage) { + err := ms.registerTopic(&varlogpb.TopicDescriptor{TopicID: 1}) + require.NoError(t, err) + + err = ms.registerTopic(&varlogpb.TopicDescriptor{TopicID: 2}) + require.Error(t, err) + require.Equal(t, codes.ResourceExhausted, status.Code(err)) + + err = ms.registerTopic(&varlogpb.TopicDescriptor{TopicID: 1}) + require.NoError(t, err) + + err = ms.unregisterTopic(1) + require.NoError(t, err) + + err = ms.registerTopic(&varlogpb.TopicDescriptor{TopicID: 2}) + require.NoError(t, err) + }, + }, + { + name: "LimitZero", + maxTopicsCount: 0, + testf: func(t *testing.T, ms *MetadataStorage) { + err := ms.registerTopic(&varlogpb.TopicDescriptor{TopicID: 1}) + require.Error(t, err) + require.Equal(t, codes.ResourceExhausted, status.Code(err)) + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + ms := NewMetadataStorage(nil, DefaultSnapshotCount, zaptest.NewLogger(t)) + ms.limits.maxTopicsCount = tc.maxTopicsCount + tc.testf(t, ms) + }) + } +} + func TestStorageSortedTopicLogStreamIDs(t *testing.T) { Convey("UncommitReport should be committed", t, func(ctx C) { /* diff --git a/pkg/mrc/metadata_repository_client.go b/pkg/mrc/metadata_repository_client.go index 435a98a4c..108c397d5 100644 --- a/pkg/mrc/metadata_repository_client.go +++ b/pkg/mrc/metadata_repository_client.go @@ -7,7 +7,9 @@ import ( "github.com/pkg/errors" "go.uber.org/multierr" + "google.golang.org/grpc/codes" "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" "github.com/kakao/varlog/pkg/rpc" "github.com/kakao/varlog/pkg/types" @@ -100,7 +102,13 @@ func (c *metadataRepositoryClient) RegisterTopic(ctx context.Context, topicID ty } _, err := c.client.RegisterTopic(ctx, req) - return verrors.FromStatusError(errors.WithStack(err)) + if err != nil { + if code := status.Code(err); code == codes.ResourceExhausted { + return err + } + return verrors.FromStatusError(errors.WithStack(err)) + } + return nil } func (c *metadataRepositoryClient) UnregisterTopic(ctx context.Context, topicID types.TopicID) error {