Skip to content

Commit

Permalink
perf(metarepos): add a pool for []*mrpb.Report
Browse files Browse the repository at this point in the history
This change adds the pool for []*mrpb.Report, which is reportQueuePool.

```
BenchmarkReportQueuePool/WithoutPool-16         	  763526	      1573 ns/op	    8192 B/op	       1 allocs/op
BenchmarkReportQueuePool/WithoutPool-16         	  745612	      1574 ns/op	    8192 B/op	       1 allocs/op
BenchmarkReportQueuePool/WithoutPool-16         	  738848	      1569 ns/op	    8192 B/op	       1 allocs/op
BenchmarkReportQueuePool/WithoutPool-16         	  765339	      1563 ns/op	    8192 B/op	       1 allocs/op
BenchmarkReportQueuePool/WithoutPool-16         	  739354	      1567 ns/op	    8192 B/op	       1 allocs/op
BenchmarkReportQueuePool/WithPool-16            	 4445330	       270.2 ns/op	      24 B/op	       1 allocs/op
BenchmarkReportQueuePool/WithPool-16            	 4342182	       271.9 ns/op	      24 B/op	       1 allocs/op
BenchmarkReportQueuePool/WithPool-16            	 4517366	       267.3 ns/op	      24 B/op	       1 allocs/op
BenchmarkReportQueuePool/WithPool-16            	 4440441	       263.5 ns/op	      24 B/op	       1 allocs/op
BenchmarkReportQueuePool/WithPool-16            	 4666981	       263.1 ns/op	      24 B/op	       1 allocs/op

```
  • Loading branch information
ijsong committed Jul 18, 2023
1 parent ce1e927 commit afbb1ce
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 3 deletions.
4 changes: 2 additions & 2 deletions internal/metarepos/raft_metadata_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func NewRaftMetadataRepository(opts ...Option) *RaftMetadataRepository {
commitC: make(chan *committedEntry, 4096),
rnConfChangeC: make(chan raftpb.ConfChange, 1),
rnProposeC: make(chan []byte),
reportQueue: make([]*mrpb.Report, 0, 1024),
reportQueue: newReportQueue(),
runner: runner.New("mr", cfg.logger),
sw: stopwaiter.New(),
tmStub: tmStub,
Expand Down Expand Up @@ -351,7 +351,7 @@ func (mr *RaftMetadataRepository) processReport(ctx context.Context) {
if num > 0 {
reports = mrpb.NewReports(mr.nodeID, time.Now())
reports.Reports = mr.reportQueue
mr.reportQueue = make([]*mrpb.Report, 0, 1024)
mr.reportQueue = newReportQueue()
}
mr.muReportQueue.Unlock()

Expand Down
30 changes: 30 additions & 0 deletions internal/metarepos/report_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package metarepos

import (
"sync"

"github.com/kakao/varlog/proto/mrpb"
)

const (
reportQueueSize = 1024
)

type reportQueue []*mrpb.Report

var reportQueuePool = sync.Pool{
New: func() any {
q := make(reportQueue, 0, reportQueueSize)
return &q
},
}

func newReportQueue() reportQueue {
q := reportQueuePool.Get().(*reportQueue)
return *q
}

func (rq *reportQueue) release() {
*rq = (*rq)[0:0:reportQueueSize]
reportQueuePool.Put(rq)
}
29 changes: 29 additions & 0 deletions internal/metarepos/report_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package metarepos

import (
"math/rand"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/kakao/varlog/proto/mrpb"
)

func TestReportQueuePool(t *testing.T) {
rng := rand.New(rand.NewSource(time.Now().UnixNano()))

for i := 0; i < 1e4; i++ {
queue := newReportQueue()
require.Empty(t, queue)
require.Equal(t, reportQueueSize, cap(queue))

numReports := rng.Intn(reportQueueSize*2) + 1
for j := 0; j < numReports; j++ {
queue = append(queue, &mrpb.Report{})
}
require.Len(t, queue, numReports)

queue.release()
}
}
27 changes: 26 additions & 1 deletion proto/mrpb/raft_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package mrpb

import (
"sync"
time "time"
"time"

"github.com/kakao/varlog/pkg/types"
)
Expand All @@ -22,7 +22,32 @@ func NewReports(nodeID types.NodeID, ts time.Time) *Reports {

func (rs *Reports) Release() {
if rs != nil {
rq := (ReportQueue)(rs.Reports)
rq.Release()
*rs = Reports{}
reportsPool.Put(rs)
}
}

const (
reportQueueSize = 1024
)

type ReportQueue []*Report

var reportQueuePool = sync.Pool{
New: func() any {
q := make(ReportQueue, 0, reportQueueSize)
return &q
},
}

func NewReportQueue() ReportQueue {
rq := reportQueuePool.Get().(*ReportQueue)
return *rq
}

func (rq *ReportQueue) Release() {
*rq = (*rq)[0:0:reportQueueSize]
reportQueuePool.Put(rq)
}
84 changes: 84 additions & 0 deletions proto/mrpb/raft_entry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package mrpb

import (
"math/rand"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/kakao/varlog/pkg/types"
)

func TestReports(t *testing.T) {
const nid = types.NodeID(1)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))

for i := 0; i < 1e4; i++ {
reports := NewReports(nid, time.Now())
require.Empty(t, reports.Reports)

queue := NewReportQueue()
require.Empty(t, queue)
require.Equal(t, reportQueueSize, cap(queue))
numReports := rng.Intn(reportQueueSize*2) + 1
for j := 0; j < numReports; j++ {
queue = append(queue, &Report{})
}
require.Len(t, queue, numReports)
reports.Reports = queue

reports.Release()
}
}

func TestReportQueuePool(t *testing.T) {
rng := rand.New(rand.NewSource(time.Now().UnixNano()))

for i := 0; i < 1e4; i++ {
queue := NewReportQueue()
require.Empty(t, queue)
require.Equal(t, reportQueueSize, cap(queue))

numReports := rng.Intn(reportQueueSize*2) + 1
for j := 0; j < numReports; j++ {
queue = append(queue, &Report{})
}
require.Len(t, queue, numReports)

queue.Release()
}
}

func BenchmarkReportQueuePool(b *testing.B) {
const numReports = 128
predefinedReports := make([]*Report, numReports)
for i := 0; i < numReports; i++ {
predefinedReports[i] = &Report{}
}
reports := &Reports{}

reports.Reports = make([]*Report, 0, reportQueueSize)
b.ResetTimer()
b.Run("WithoutPool", func(b *testing.B) {
for n := 0; n < b.N; n++ {
for j := 0; j < numReports; j++ {
reports.Reports = append(reports.Reports, predefinedReports[j])
}
reports.Reports = make([]*Report, 0, reportQueueSize)
}
})

reports.Reports = NewReportQueue()
b.ResetTimer()
b.Run("WithPool", func(b *testing.B) {
for n := 0; n < b.N; n++ {
for j := 0; j < numReports; j++ {
reports.Reports = append(reports.Reports, predefinedReports[j])
}
rq := (ReportQueue)(reports.Reports)
rq.Release()
reports.Reports = NewReportQueue()
}
})
}

0 comments on commit afbb1ce

Please sign in to comment.