From 519cd60f2e0bffdcba5ee4f2af4f57548fa49220 Mon Sep 17 00:00:00 2001 From: Injun Song Date: Thu, 20 Jul 2023 23:09:09 +0900 Subject: [PATCH] perf(metarepos): reuse mrpb.StorageNodeUncommitReport while changed This change makes mrpb.StorageNodeUncommitReport is reused when it is replaced with a new one. Previous PR, #446, introduced a pool for mrpb.StorageNodeUncommitReport. However, the old StorageNodeUncommitReport stored to reportContext was not reused, and this PR improves it. To release the StorageNodeUncommitReport registered to reportContext, some methods of reportContext are changed: - (*reportContext).getReport returns a value of StorageNodeUncommitReport rather than a pointer. It makes releasing the StorageNodeUncommitReport happen at any time. - add (*reportContext).swapReport to set a new StorageNodeUncommitReport and get the old one simultaneously. - getReport and swapReport return the boolean result to indicate whether the result report is zero value. BenchmarkSwapReport evaluates the performance improvements of this change. - getAndSavePtr: Previous implementation. - getAndSave: Call getReport and saveReport. - swap: Call swapReport. ``` BenchmarkSwapReport/getAndSavePtr-16 5947423 207.8 ns/op 32 B/op 1 allocs/op BenchmarkSwapReport/getAndSavePtr-16 6220454 198.7 ns/op 32 B/op 1 allocs/op BenchmarkSwapReport/getAndSavePtr-16 6133489 198.5 ns/op 32 B/op 1 allocs/op BenchmarkSwapReport/getAndSavePtr-16 6131910 199.8 ns/op 32 B/op 1 allocs/op BenchmarkSwapReport/getAndSavePtr-16 5592459 201.6 ns/op 32 B/op 1 allocs/op BenchmarkSwapReport/getAndSavePtr-16 6180243 200.1 ns/op 32 B/op 1 allocs/op BenchmarkSwapReport/getAndSavePtr-16 6172357 199.2 ns/op 32 B/op 1 allocs/op BenchmarkSwapReport/getAndSavePtr-16 6211966 198.6 ns/op 32 B/op 1 allocs/op BenchmarkSwapReport/getAndSavePtr-16 6095289 198.3 ns/op 32 B/op 1 allocs/op BenchmarkSwapReport/getAndSavePtr-16 6043491 197.6 ns/op 32 B/op 1 allocs/op BenchmarkSwapReport/getAndSave-16 6143407 200.6 ns/op 32 B/op 1 allocs/op BenchmarkSwapReport/getAndSave-16 5938540 203.6 ns/op 32 B/op 1 allocs/op BenchmarkSwapReport/getAndSave-16 6162986 198.5 ns/op 32 B/op 1 allocs/op BenchmarkSwapReport/getAndSave-16 6172598 197.8 ns/op 32 B/op 1 allocs/op BenchmarkSwapReport/getAndSave-16 6105918 197.0 ns/op 32 B/op 1 allocs/op BenchmarkSwapReport/getAndSave-16 6077460 196.5 ns/op 32 B/op 1 allocs/op BenchmarkSwapReport/getAndSave-16 5953898 197.8 ns/op 32 B/op 1 allocs/op BenchmarkSwapReport/getAndSave-16 6131799 199.9 ns/op 32 B/op 1 allocs/op BenchmarkSwapReport/getAndSave-16 6139296 200.5 ns/op 32 B/op 1 allocs/op BenchmarkSwapReport/getAndSave-16 6115653 198.5 ns/op 32 B/op 1 allocs/op BenchmarkSwapReport/swap-16 35501544 33.83 ns/op 0 B/op 0 allocs/op BenchmarkSwapReport/swap-16 35982945 34.05 ns/op 0 B/op 0 allocs/op BenchmarkSwapReport/swap-16 36518960 34.21 ns/op 0 B/op 0 allocs/op BenchmarkSwapReport/swap-16 34620783 34.52 ns/op 0 B/op 0 allocs/op BenchmarkSwapReport/swap-16 35967453 33.88 ns/op 0 B/op 0 allocs/op BenchmarkSwapReport/swap-16 36135454 34.17 ns/op 0 B/op 0 allocs/op BenchmarkSwapReport/swap-16 34492548 35.16 ns/op 0 B/op 0 allocs/op BenchmarkSwapReport/swap-16 35581888 34.14 ns/op 0 B/op 0 allocs/op BenchmarkSwapReport/swap-16 32028504 33.78 ns/op 0 B/op 0 allocs/op BenchmarkSwapReport/swap-16 36024037 33.94 ns/op 0 B/op 0 allocs/op ``` --- internal/metarepos/report_collector.go | 34 ++++++-- internal/metarepos/report_collector_test.go | 87 ++++++++++++++++++--- 2 files changed, 105 insertions(+), 16 deletions(-) diff --git a/internal/metarepos/report_collector.go b/internal/metarepos/report_collector.go index 31740958b..8151eb75c 100644 --- a/internal/metarepos/report_collector.go +++ b/internal/metarepos/report_collector.go @@ -656,9 +656,8 @@ func (rce *reportCollectExecutor) processReport(response *snpb.GetReportResponse report.Sort() - prevReport := rce.reportCtx.getReport() - rce.reportCtx.saveReport(report) - if prevReport == nil || rce.reportCtx.isExpire() { + prevReport, ok := rce.reportCtx.swapReport(report) + if !ok || rce.reportCtx.isExpire() { rce.reportCtx.reload() return report } @@ -746,8 +745,8 @@ func (rce *reportCollectExecutor) getClient(ctx context.Context) (reportcommitte } func (rce *reportCollectExecutor) getReportedVersion(lsID types.LogStreamID) (types.Version, bool) { - report := rce.reportCtx.getReport() - if report == nil { + report, ok := rce.reportCtx.getReport() + if !ok { return types.InvalidVersion, false } @@ -922,11 +921,32 @@ func (rc *reportContext) saveReport(report *mrpb.StorageNodeUncommitReport) { rc.report.UncommitReports = report.UncommitReports } -func (rc *reportContext) getReport() *mrpb.StorageNodeUncommitReport { +func (rc *reportContext) swapReport(newReport *mrpb.StorageNodeUncommitReport) (old mrpb.StorageNodeUncommitReport, ok bool) { + rc.mu.Lock() + defer rc.mu.Unlock() + + if rc.report != nil { + old = *rc.report + ok = true + rc.report.Release() + } + + rc.report = mrpb.NewStoragenodeUncommitReport(newReport.StorageNodeID) + rc.report.UncommitReports = newReport.UncommitReports + + return old, ok +} + +func (rc *reportContext) getReport() (report mrpb.StorageNodeUncommitReport, ok bool) { rc.mu.RLock() defer rc.mu.RUnlock() - return rc.report + if rc.report != nil { + report = *rc.report + ok = true + } + + return report, ok } func (rc *reportContext) reload() { diff --git a/internal/metarepos/report_collector_test.go b/internal/metarepos/report_collector_test.go index 3c14566c7..75fe4a761 100644 --- a/internal/metarepos/report_collector_test.go +++ b/internal/metarepos/report_collector_test.go @@ -879,7 +879,7 @@ func TestCommit(t *testing.T) { trimVer := types.MaxVersion reportCollector.mu.RLock() for _, executor := range reportCollector.executors { - reports := executor.reportCtx.getReport() + reports, _ := executor.reportCtx.getReport() for _, report := range reports.UncommitReports { if !report.Version.Invalid() && report.Version < trimVer { trimVer = report.Version @@ -983,7 +983,8 @@ func TestCommitWithDelay(t *testing.T) { // check report So(testutil.CompareWaitN(10, func() bool { - return executor.reportCtx.getReport() != nil + _, ok := executor.reportCtx.getReport() + return ok }), ShouldBeTrue) dummySN := a.lookupClient(sn.StorageNodeID) @@ -997,9 +998,11 @@ func TestCommitWithDelay(t *testing.T) { reportCollector.Commit() So(testutil.CompareWaitN(10, func() bool { - return executor.reportCtx.getReport().UncommitReports[0].Version == knownVer + report, ok := executor.reportCtx.getReport() + return ok && report.UncommitReports[0].Version == knownVer }), ShouldBeTrue) - reportedVer := executor.reportCtx.getReport().UncommitReports[0].Version + report, _ := executor.reportCtx.getReport() + reportedVer := report.UncommitReports[0].Version dummySN.DisableReport() @@ -1019,7 +1022,8 @@ func TestCommitWithDelay(t *testing.T) { }), ShouldBeTrue) time.Sleep(10 * time.Millisecond) - So(executor.reportCtx.getReport().UncommitReports[0].Version, ShouldEqual, reportedVer) + report, _ = executor.reportCtx.getReport() + So(report.UncommitReports[0].Version, ShouldEqual, reportedVer) Convey("set commit delay & enable report to trim during catchup", func() { dummySN.SetCommitDelay(100 * time.Millisecond) @@ -1029,8 +1033,8 @@ func TestCommitWithDelay(t *testing.T) { dummySN.EnableReport() So(testutil.CompareWaitN(10, func() bool { - reports := executor.reportCtx.getReport() - return reports.UncommitReports[0].Version == knownVer + reports, ok := executor.reportCtx.getReport() + return ok && reports.UncommitReports[0].Version == knownVer }), ShouldBeTrue) mr.trimGLS(knownVer) @@ -1044,8 +1048,8 @@ func TestCommitWithDelay(t *testing.T) { reportCollector.Commit() So(testutil.CompareWaitN(10, func() bool { - reports := executor.reportCtx.getReport() - return reports.UncommitReports[0].Version == knownVer + reports, ok := executor.reportCtx.getReport() + return ok && reports.UncommitReports[0].Version == knownVer }), ShouldBeTrue) }) }) @@ -1185,3 +1189,68 @@ func TestReporterClientReconnect(t *testing.T) { }) }) } + +type testReportContextPtr struct { + report *mrpb.StorageNodeUncommitReport + mu sync.RWMutex +} + +func (rc *testReportContextPtr) saveReport(report *mrpb.StorageNodeUncommitReport) { + rc.mu.Lock() + defer rc.mu.Unlock() + + rc.report = mrpb.NewStoragenodeUncommitReport(report.StorageNodeID) + rc.report.UncommitReports = report.UncommitReports +} + +func (rc *testReportContextPtr) getReport() *mrpb.StorageNodeUncommitReport { + rc.mu.RLock() + defer rc.mu.RUnlock() + + return rc.report +} + +func BenchmarkSwapReport(b *testing.B) { + rcPtr := &testReportContextPtr{} + rc := &reportContext{} + report := &mrpb.StorageNodeUncommitReport{ + StorageNodeID: 1, + UncommitReports: []snpb.LogStreamUncommitReport{ + { + LogStreamID: 2, + UncommittedLLSNOffset: 3, + UncommittedLLSNLength: 4, + Version: 5, + HighWatermark: 6, + }, + }, + } + + b.ResetTimer() + b.Run("getAndSavePtr", func(b *testing.B) { + for i := 0; i < b.N; i++ { + old := rcPtr.getReport() + rcPtr.saveReport(report) + _ = old + } + }) + + b.ResetTimer() + b.Run("getAndSave", func(b *testing.B) { + for i := 0; i < b.N; i++ { + old, ok := rc.getReport() + rc.saveReport(report) + _ = old + _ = ok + } + }) + + b.ResetTimer() + b.Run("swap", func(b *testing.B) { + for i := 0; i < b.N; i++ { + old, ok := rc.swapReport(report) + _ = old + _ = ok + } + }) +}