Skip to content

Commit

Permalink
perf(metarepos): reuse mrpb.StorageNodeUncommitReport while changed
Browse files Browse the repository at this point in the history
This change makes mrpb.StorageNodeUncommitReport is reused when it is replaced with a new one.
Previous PR, #446, introduced a pool for mrpb.StorageNodeUncommitReport. However, the old
StorageNodeUncommitReport stored to reportContext was not reused, and this PR improves it.

To release the StorageNodeUncommitReport registered to reportContext, some methods of reportContext
are changed:

- (*reportContext).getReport returns a value of StorageNodeUncommitReport rather than a pointer. It
  makes releasing the StorageNodeUncommitReport happen at any time.
- add (*reportContext).swapReport to set a new StorageNodeUncommitReport and get the old one
  simultaneously.
- getReport and swapReport return the boolean result to indicate whether the result report is zero
  value.

BenchmarkSwapReport evaluates the performance improvements of this change.

- getAndSavePtr: Previous implementation.
- getAndSave: Call getReport and saveReport.
- swap: Call swapReport.

```
BenchmarkSwapReport/getAndSavePtr-16         	 5947423	       207.8 ns/op	      32 B/op	       1 allocs/op
BenchmarkSwapReport/getAndSavePtr-16         	 6220454	       198.7 ns/op	      32 B/op	       1 allocs/op
BenchmarkSwapReport/getAndSavePtr-16         	 6133489	       198.5 ns/op	      32 B/op	       1 allocs/op
BenchmarkSwapReport/getAndSavePtr-16         	 6131910	       199.8 ns/op	      32 B/op	       1 allocs/op
BenchmarkSwapReport/getAndSavePtr-16         	 5592459	       201.6 ns/op	      32 B/op	       1 allocs/op
BenchmarkSwapReport/getAndSavePtr-16         	 6180243	       200.1 ns/op	      32 B/op	       1 allocs/op
BenchmarkSwapReport/getAndSavePtr-16         	 6172357	       199.2 ns/op	      32 B/op	       1 allocs/op
BenchmarkSwapReport/getAndSavePtr-16         	 6211966	       198.6 ns/op	      32 B/op	       1 allocs/op
BenchmarkSwapReport/getAndSavePtr-16         	 6095289	       198.3 ns/op	      32 B/op	       1 allocs/op
BenchmarkSwapReport/getAndSavePtr-16         	 6043491	       197.6 ns/op	      32 B/op	       1 allocs/op
BenchmarkSwapReport/getAndSave-16            	 6143407	       200.6 ns/op	      32 B/op	       1 allocs/op
BenchmarkSwapReport/getAndSave-16            	 5938540	       203.6 ns/op	      32 B/op	       1 allocs/op
BenchmarkSwapReport/getAndSave-16            	 6162986	       198.5 ns/op	      32 B/op	       1 allocs/op
BenchmarkSwapReport/getAndSave-16            	 6172598	       197.8 ns/op	      32 B/op	       1 allocs/op
BenchmarkSwapReport/getAndSave-16            	 6105918	       197.0 ns/op	      32 B/op	       1 allocs/op
BenchmarkSwapReport/getAndSave-16            	 6077460	       196.5 ns/op	      32 B/op	       1 allocs/op
BenchmarkSwapReport/getAndSave-16            	 5953898	       197.8 ns/op	      32 B/op	       1 allocs/op
BenchmarkSwapReport/getAndSave-16            	 6131799	       199.9 ns/op	      32 B/op	       1 allocs/op
BenchmarkSwapReport/getAndSave-16            	 6139296	       200.5 ns/op	      32 B/op	       1 allocs/op
BenchmarkSwapReport/getAndSave-16            	 6115653	       198.5 ns/op	      32 B/op	       1 allocs/op
BenchmarkSwapReport/swap-16                  	35501544	        33.83 ns/op	       0 B/op	       0 allocs/op
BenchmarkSwapReport/swap-16                  	35982945	        34.05 ns/op	       0 B/op	       0 allocs/op
BenchmarkSwapReport/swap-16                  	36518960	        34.21 ns/op	       0 B/op	       0 allocs/op
BenchmarkSwapReport/swap-16                  	34620783	        34.52 ns/op	       0 B/op	       0 allocs/op
BenchmarkSwapReport/swap-16                  	35967453	        33.88 ns/op	       0 B/op	       0 allocs/op
BenchmarkSwapReport/swap-16                  	36135454	        34.17 ns/op	       0 B/op	       0 allocs/op
BenchmarkSwapReport/swap-16                  	34492548	        35.16 ns/op	       0 B/op	       0 allocs/op
BenchmarkSwapReport/swap-16                  	35581888	        34.14 ns/op	       0 B/op	       0 allocs/op
BenchmarkSwapReport/swap-16                  	32028504	        33.78 ns/op	       0 B/op	       0 allocs/op
BenchmarkSwapReport/swap-16                  	36024037	        33.94 ns/op	       0 B/op	       0 allocs/op
```
  • Loading branch information
ijsong committed Jul 20, 2023
1 parent 22667c3 commit 519cd60
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 16 deletions.
34 changes: 27 additions & 7 deletions internal/metarepos/report_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,9 +656,8 @@ func (rce *reportCollectExecutor) processReport(response *snpb.GetReportResponse

report.Sort()

prevReport := rce.reportCtx.getReport()
rce.reportCtx.saveReport(report)
if prevReport == nil || rce.reportCtx.isExpire() {
prevReport, ok := rce.reportCtx.swapReport(report)
if !ok || rce.reportCtx.isExpire() {
rce.reportCtx.reload()
return report
}
Expand Down Expand Up @@ -746,8 +745,8 @@ func (rce *reportCollectExecutor) getClient(ctx context.Context) (reportcommitte
}

func (rce *reportCollectExecutor) getReportedVersion(lsID types.LogStreamID) (types.Version, bool) {
report := rce.reportCtx.getReport()
if report == nil {
report, ok := rce.reportCtx.getReport()
if !ok {
return types.InvalidVersion, false
}

Expand Down Expand Up @@ -922,11 +921,32 @@ func (rc *reportContext) saveReport(report *mrpb.StorageNodeUncommitReport) {
rc.report.UncommitReports = report.UncommitReports
}

func (rc *reportContext) getReport() *mrpb.StorageNodeUncommitReport {
func (rc *reportContext) swapReport(newReport *mrpb.StorageNodeUncommitReport) (old mrpb.StorageNodeUncommitReport, ok bool) {
rc.mu.Lock()
defer rc.mu.Unlock()

if rc.report != nil {
old = *rc.report
ok = true
rc.report.Release()
}

rc.report = mrpb.NewStoragenodeUncommitReport(newReport.StorageNodeID)
rc.report.UncommitReports = newReport.UncommitReports

return old, ok
}

func (rc *reportContext) getReport() (report mrpb.StorageNodeUncommitReport, ok bool) {
rc.mu.RLock()
defer rc.mu.RUnlock()

return rc.report
if rc.report != nil {
report = *rc.report
ok = true
}

return report, ok
}

func (rc *reportContext) reload() {
Expand Down
87 changes: 78 additions & 9 deletions internal/metarepos/report_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ func TestCommit(t *testing.T) {
trimVer := types.MaxVersion
reportCollector.mu.RLock()
for _, executor := range reportCollector.executors {
reports := executor.reportCtx.getReport()
reports, _ := executor.reportCtx.getReport()
for _, report := range reports.UncommitReports {
if !report.Version.Invalid() && report.Version < trimVer {
trimVer = report.Version
Expand Down Expand Up @@ -983,7 +983,8 @@ func TestCommitWithDelay(t *testing.T) {

// check report
So(testutil.CompareWaitN(10, func() bool {
return executor.reportCtx.getReport() != nil
_, ok := executor.reportCtx.getReport()
return ok
}), ShouldBeTrue)

dummySN := a.lookupClient(sn.StorageNodeID)
Expand All @@ -997,9 +998,11 @@ func TestCommitWithDelay(t *testing.T) {
reportCollector.Commit()

So(testutil.CompareWaitN(10, func() bool {
return executor.reportCtx.getReport().UncommitReports[0].Version == knownVer
report, ok := executor.reportCtx.getReport()
return ok && report.UncommitReports[0].Version == knownVer
}), ShouldBeTrue)
reportedVer := executor.reportCtx.getReport().UncommitReports[0].Version
report, _ := executor.reportCtx.getReport()
reportedVer := report.UncommitReports[0].Version

dummySN.DisableReport()

Expand All @@ -1019,7 +1022,8 @@ func TestCommitWithDelay(t *testing.T) {
}), ShouldBeTrue)

time.Sleep(10 * time.Millisecond)
So(executor.reportCtx.getReport().UncommitReports[0].Version, ShouldEqual, reportedVer)
report, _ = executor.reportCtx.getReport()
So(report.UncommitReports[0].Version, ShouldEqual, reportedVer)

Convey("set commit delay & enable report to trim during catchup", func() {
dummySN.SetCommitDelay(100 * time.Millisecond)
Expand All @@ -1029,8 +1033,8 @@ func TestCommitWithDelay(t *testing.T) {
dummySN.EnableReport()

So(testutil.CompareWaitN(10, func() bool {
reports := executor.reportCtx.getReport()
return reports.UncommitReports[0].Version == knownVer
reports, ok := executor.reportCtx.getReport()
return ok && reports.UncommitReports[0].Version == knownVer
}), ShouldBeTrue)

mr.trimGLS(knownVer)
Expand All @@ -1044,8 +1048,8 @@ func TestCommitWithDelay(t *testing.T) {
reportCollector.Commit()

So(testutil.CompareWaitN(10, func() bool {
reports := executor.reportCtx.getReport()
return reports.UncommitReports[0].Version == knownVer
reports, ok := executor.reportCtx.getReport()
return ok && reports.UncommitReports[0].Version == knownVer
}), ShouldBeTrue)
})
})
Expand Down Expand Up @@ -1185,3 +1189,68 @@ func TestReporterClientReconnect(t *testing.T) {
})
})
}

type testReportContextPtr struct {
report *mrpb.StorageNodeUncommitReport
mu sync.RWMutex
}

func (rc *testReportContextPtr) saveReport(report *mrpb.StorageNodeUncommitReport) {
rc.mu.Lock()
defer rc.mu.Unlock()

rc.report = mrpb.NewStoragenodeUncommitReport(report.StorageNodeID)
rc.report.UncommitReports = report.UncommitReports
}

func (rc *testReportContextPtr) getReport() *mrpb.StorageNodeUncommitReport {
rc.mu.RLock()
defer rc.mu.RUnlock()

return rc.report
}

func BenchmarkSwapReport(b *testing.B) {
rcPtr := &testReportContextPtr{}
rc := &reportContext{}
report := &mrpb.StorageNodeUncommitReport{
StorageNodeID: 1,
UncommitReports: []snpb.LogStreamUncommitReport{
{
LogStreamID: 2,
UncommittedLLSNOffset: 3,
UncommittedLLSNLength: 4,
Version: 5,
HighWatermark: 6,
},
},
}

b.ResetTimer()
b.Run("getAndSavePtr", func(b *testing.B) {
for i := 0; i < b.N; i++ {
old := rcPtr.getReport()
rcPtr.saveReport(report)
_ = old
}
})

b.ResetTimer()
b.Run("getAndSave", func(b *testing.B) {
for i := 0; i < b.N; i++ {
old, ok := rc.getReport()
rc.saveReport(report)
_ = old
_ = ok
}
})

b.ResetTimer()
b.Run("swap", func(b *testing.B) {
for i := 0; i < b.N; i++ {
old, ok := rc.swapReport(report)
_ = old
_ = ok
}
})
}

0 comments on commit 519cd60

Please sign in to comment.