diff --git a/internal/metarepos/raft_metadata_repository.go b/internal/metarepos/raft_metadata_repository.go index a1477ee37..1557d94e3 100644 --- a/internal/metarepos/raft_metadata_repository.go +++ b/internal/metarepos/raft_metadata_repository.go @@ -12,6 +12,7 @@ import ( "syscall" "time" + "github.com/gogo/status" "go.etcd.io/etcd/pkg/fileutil" "go.etcd.io/etcd/raft" "go.etcd.io/etcd/raft/raftpb" @@ -21,8 +22,6 @@ import ( "google.golang.org/grpc/health" "google.golang.org/grpc/health/grpc_health_v1" - "github.com/gogo/status" - "github.com/kakao/varlog/internal/reportcommitter" "github.com/kakao/varlog/pkg/types" "github.com/kakao/varlog/pkg/util/container/set" @@ -111,7 +110,7 @@ func NewRaftMetadataRepository(opts ...Option) *RaftMetadataRepository { commitC: make(chan *committedEntry, 4096), rnConfChangeC: make(chan raftpb.ConfChange, 1), rnProposeC: make(chan []byte), - reportQueue: make([]*mrpb.Report, 0, 1024), + reportQueue: mrpb.NewReportQueue(), runner: runner.New("mr", cfg.logger), sw: stopwaiter.New(), tmStub: tmStub, @@ -351,7 +350,7 @@ func (mr *RaftMetadataRepository) processReport(ctx context.Context) { if num > 0 { reports = mrpb.NewReports(mr.nodeID, time.Now()) reports.Reports = mr.reportQueue - mr.reportQueue = make([]*mrpb.Report, 0, 1024) + mr.reportQueue = mrpb.NewReportQueue() } mr.muReportQueue.Unlock() diff --git a/proto/mrpb/raft_entry.go b/proto/mrpb/raft_entry.go index 0f2a226b8..caecc30e7 100644 --- a/proto/mrpb/raft_entry.go +++ b/proto/mrpb/raft_entry.go @@ -22,7 +22,34 @@ func NewReports(nodeID types.NodeID, ts time.Time) *Reports { func (rs *Reports) Release() { if rs != nil { + rq := (ReportQueue)(rs.Reports) + rq.Release() *rs = Reports{} reportsPool.Put(rs) } } + +const ( + reportQueueSize = 1024 +) + +type ReportQueue []*Report + +var reportQueuePool = sync.Pool{ + New: func() any { + q := make(ReportQueue, 0, reportQueueSize) + return &q + }, +} + +func NewReportQueue() ReportQueue { + rq := reportQueuePool.Get().(*ReportQueue) + return *rq +} + +func (rq *ReportQueue) Release() { + if rq != nil { + *rq = (*rq)[0:0:reportQueueSize] + reportQueuePool.Put(rq) + } +} diff --git a/proto/mrpb/raft_entry_test.go b/proto/mrpb/raft_entry_test.go new file mode 100644 index 000000000..f88cbc6af --- /dev/null +++ b/proto/mrpb/raft_entry_test.go @@ -0,0 +1,84 @@ +package mrpb + +import ( + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/kakao/varlog/pkg/types" +) + +func TestReports(t *testing.T) { + const nid = types.NodeID(1) + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + + for i := 0; i < 1e4; i++ { + reports := NewReports(nid, time.Now()) + require.Empty(t, reports.Reports) + + queue := NewReportQueue() + require.Empty(t, queue) + require.Equal(t, reportQueueSize, cap(queue)) + numReports := rng.Intn(reportQueueSize*2) + 1 + for j := 0; j < numReports; j++ { + queue = append(queue, &Report{}) + } + require.Len(t, queue, numReports) + reports.Reports = queue + + reports.Release() + } +} + +func TestReportQueuePool(t *testing.T) { + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + + for i := 0; i < 1e4; i++ { + queue := NewReportQueue() + require.Empty(t, queue) + require.Equal(t, reportQueueSize, cap(queue)) + + numReports := rng.Intn(reportQueueSize*2) + 1 + for j := 0; j < numReports; j++ { + queue = append(queue, &Report{}) + } + require.Len(t, queue, numReports) + + queue.Release() + } +} + +func BenchmarkReportQueuePool(b *testing.B) { + const numReports = 128 + predefinedReports := make([]*Report, numReports) + for i := 0; i < numReports; i++ { + predefinedReports[i] = &Report{} + } + reports := &Reports{} + + reports.Reports = make([]*Report, 0, reportQueueSize) + b.ResetTimer() + b.Run("WithoutPool", func(b *testing.B) { + for n := 0; n < b.N; n++ { + for j := 0; j < numReports; j++ { + reports.Reports = append(reports.Reports, predefinedReports[j]) + } + reports.Reports = make([]*Report, 0, reportQueueSize) + } + }) + + reports.Reports = NewReportQueue() + b.ResetTimer() + b.Run("WithPool", func(b *testing.B) { + for n := 0; n < b.N; n++ { + for j := 0; j < numReports; j++ { + reports.Reports = append(reports.Reports, predefinedReports[j]) + } + rq := (ReportQueue)(reports.Reports) + rq.Release() + reports.Reports = NewReportQueue() + } + }) +}