diff --git a/internal/metarepos/report_collector.go b/internal/metarepos/report_collector.go index 31740958b..8151eb75c 100644 --- a/internal/metarepos/report_collector.go +++ b/internal/metarepos/report_collector.go @@ -656,9 +656,8 @@ func (rce *reportCollectExecutor) processReport(response *snpb.GetReportResponse report.Sort() - prevReport := rce.reportCtx.getReport() - rce.reportCtx.saveReport(report) - if prevReport == nil || rce.reportCtx.isExpire() { + prevReport, ok := rce.reportCtx.swapReport(report) + if !ok || rce.reportCtx.isExpire() { rce.reportCtx.reload() return report } @@ -746,8 +745,8 @@ func (rce *reportCollectExecutor) getClient(ctx context.Context) (reportcommitte } func (rce *reportCollectExecutor) getReportedVersion(lsID types.LogStreamID) (types.Version, bool) { - report := rce.reportCtx.getReport() - if report == nil { + report, ok := rce.reportCtx.getReport() + if !ok { return types.InvalidVersion, false } @@ -922,11 +921,32 @@ func (rc *reportContext) saveReport(report *mrpb.StorageNodeUncommitReport) { rc.report.UncommitReports = report.UncommitReports } -func (rc *reportContext) getReport() *mrpb.StorageNodeUncommitReport { +func (rc *reportContext) swapReport(newReport *mrpb.StorageNodeUncommitReport) (old mrpb.StorageNodeUncommitReport, ok bool) { + rc.mu.Lock() + defer rc.mu.Unlock() + + if rc.report != nil { + old = *rc.report + ok = true + rc.report.Release() + } + + rc.report = mrpb.NewStoragenodeUncommitReport(newReport.StorageNodeID) + rc.report.UncommitReports = newReport.UncommitReports + + return old, ok +} + +func (rc *reportContext) getReport() (report mrpb.StorageNodeUncommitReport, ok bool) { rc.mu.RLock() defer rc.mu.RUnlock() - return rc.report + if rc.report != nil { + report = *rc.report + ok = true + } + + return report, ok } func (rc *reportContext) reload() { diff --git a/internal/metarepos/report_collector_test.go b/internal/metarepos/report_collector_test.go index 3c14566c7..75fe4a761 100644 --- a/internal/metarepos/report_collector_test.go +++ b/internal/metarepos/report_collector_test.go @@ -879,7 +879,7 @@ func TestCommit(t *testing.T) { trimVer := types.MaxVersion reportCollector.mu.RLock() for _, executor := range reportCollector.executors { - reports := executor.reportCtx.getReport() + reports, _ := executor.reportCtx.getReport() for _, report := range reports.UncommitReports { if !report.Version.Invalid() && report.Version < trimVer { trimVer = report.Version @@ -983,7 +983,8 @@ func TestCommitWithDelay(t *testing.T) { // check report So(testutil.CompareWaitN(10, func() bool { - return executor.reportCtx.getReport() != nil + _, ok := executor.reportCtx.getReport() + return ok }), ShouldBeTrue) dummySN := a.lookupClient(sn.StorageNodeID) @@ -997,9 +998,11 @@ func TestCommitWithDelay(t *testing.T) { reportCollector.Commit() So(testutil.CompareWaitN(10, func() bool { - return executor.reportCtx.getReport().UncommitReports[0].Version == knownVer + report, ok := executor.reportCtx.getReport() + return ok && report.UncommitReports[0].Version == knownVer }), ShouldBeTrue) - reportedVer := executor.reportCtx.getReport().UncommitReports[0].Version + report, _ := executor.reportCtx.getReport() + reportedVer := report.UncommitReports[0].Version dummySN.DisableReport() @@ -1019,7 +1022,8 @@ func TestCommitWithDelay(t *testing.T) { }), ShouldBeTrue) time.Sleep(10 * time.Millisecond) - So(executor.reportCtx.getReport().UncommitReports[0].Version, ShouldEqual, reportedVer) + report, _ = executor.reportCtx.getReport() + So(report.UncommitReports[0].Version, ShouldEqual, reportedVer) Convey("set commit delay & enable report to trim during catchup", func() { dummySN.SetCommitDelay(100 * time.Millisecond) @@ -1029,8 +1033,8 @@ func TestCommitWithDelay(t *testing.T) { dummySN.EnableReport() So(testutil.CompareWaitN(10, func() bool { - reports := executor.reportCtx.getReport() - return reports.UncommitReports[0].Version == knownVer + reports, ok := executor.reportCtx.getReport() + return ok && reports.UncommitReports[0].Version == knownVer }), ShouldBeTrue) mr.trimGLS(knownVer) @@ -1044,8 +1048,8 @@ func TestCommitWithDelay(t *testing.T) { reportCollector.Commit() So(testutil.CompareWaitN(10, func() bool { - reports := executor.reportCtx.getReport() - return reports.UncommitReports[0].Version == knownVer + reports, ok := executor.reportCtx.getReport() + return ok && reports.UncommitReports[0].Version == knownVer }), ShouldBeTrue) }) }) @@ -1185,3 +1189,68 @@ func TestReporterClientReconnect(t *testing.T) { }) }) } + +type testReportContextPtr struct { + report *mrpb.StorageNodeUncommitReport + mu sync.RWMutex +} + +func (rc *testReportContextPtr) saveReport(report *mrpb.StorageNodeUncommitReport) { + rc.mu.Lock() + defer rc.mu.Unlock() + + rc.report = mrpb.NewStoragenodeUncommitReport(report.StorageNodeID) + rc.report.UncommitReports = report.UncommitReports +} + +func (rc *testReportContextPtr) getReport() *mrpb.StorageNodeUncommitReport { + rc.mu.RLock() + defer rc.mu.RUnlock() + + return rc.report +} + +func BenchmarkSwapReport(b *testing.B) { + rcPtr := &testReportContextPtr{} + rc := &reportContext{} + report := &mrpb.StorageNodeUncommitReport{ + StorageNodeID: 1, + UncommitReports: []snpb.LogStreamUncommitReport{ + { + LogStreamID: 2, + UncommittedLLSNOffset: 3, + UncommittedLLSNLength: 4, + Version: 5, + HighWatermark: 6, + }, + }, + } + + b.ResetTimer() + b.Run("getAndSavePtr", func(b *testing.B) { + for i := 0; i < b.N; i++ { + old := rcPtr.getReport() + rcPtr.saveReport(report) + _ = old + } + }) + + b.ResetTimer() + b.Run("getAndSave", func(b *testing.B) { + for i := 0; i < b.N; i++ { + old, ok := rc.getReport() + rc.saveReport(report) + _ = old + _ = ok + } + }) + + b.ResetTimer() + b.Run("swap", func(b *testing.B) { + for i := 0; i < b.N; i++ { + old, ok := rc.swapReport(report) + _ = old + _ = ok + } + }) +}