From 7120500369aa492f72d3d66a0744ce7d7a23dcc9 Mon Sep 17 00:00:00 2001 From: Dawid Rusnak Date: Fri, 9 Aug 2024 17:19:11 +0200 Subject: [PATCH] fix: concurrency problems when having a lot of parallel updates (#5754) * fix: panic: sync: WaitGroup is reused before previous Wait has returned * fix: fatal error: concurrent map read and map write --- .../artifacts/cloud_uploader.go | 3 + .../testworkflowcontroller/notifier.go | 74 ++++++++++++++----- 2 files changed, 58 insertions(+), 19 deletions(-) diff --git a/cmd/testworkflow-toolkit/artifacts/cloud_uploader.go b/cmd/testworkflow-toolkit/artifacts/cloud_uploader.go index dcc6de57e86..62aa587da94 100644 --- a/cmd/testworkflow-toolkit/artifacts/cloud_uploader.go +++ b/cmd/testworkflow-toolkit/artifacts/cloud_uploader.go @@ -41,6 +41,7 @@ type cloudUploader struct { parallelism int error atomic.Bool reqEnhancers []CloudUploaderRequestEnhancer + waitMu sync.Mutex } func (d *cloudUploader) Start() (err error) { @@ -140,6 +141,8 @@ func (d *cloudUploader) Add(path string, file io.Reader, size int64) error { } func (d *cloudUploader) End() error { + d.waitMu.Lock() + defer d.waitMu.Unlock() d.wg.Wait() if d.error.Load() { return fmt.Errorf("upload failed") diff --git a/pkg/testworkflows/testworkflowcontroller/notifier.go b/pkg/testworkflows/testworkflowcontroller/notifier.go index 1b27ae0e2dd..83e2985aeb1 100644 --- a/pkg/testworkflows/testworkflowcontroller/notifier.go +++ b/pkg/testworkflows/testworkflowcontroller/notifier.go @@ -26,9 +26,10 @@ type notifier struct { scheduledAt time.Time lastTs map[string]time.Time - resultMu sync.Mutex - resultCh chan struct{} - resultScheduled bool + resultMu sync.RWMutex + flushMu sync.Mutex + flushCh chan struct{} + flushScheduled bool } func (n *notifier) send(value Notification) { @@ -47,7 +48,7 @@ func (n *notifier) error(err error) { n.ch <- ChannelMessage[Notification]{Error: err} } -func (n *notifier) GetLastTimestamp(ref string) time.Time { +func (n *notifier) unsafeGetLastTimestamp(ref string) time.Time { last := n.lastTs[ref] if n.result.Steps[ref].FinishedAt.After(last) { return n.result.Steps[ref].FinishedAt @@ -61,37 +62,47 @@ func (n *notifier) GetLastTimestamp(ref string) time.Time { return last } +func (n *notifier) GetLastTimestamp(ref string) time.Time { + n.resultMu.RLock() + defer n.resultMu.RUnlock() + return n.unsafeGetLastTimestamp(ref) +} + func (n *notifier) RegisterTimestamp(ref string, t time.Time) { if t.After(n.GetLastTimestamp(ref)) { + n.resultMu.Lock() n.lastTs[ref] = t.UTC() + n.resultMu.Unlock() } } func (n *notifier) Flush() { - n.resultMu.Lock() - defer n.resultMu.Unlock() - if !n.resultScheduled { + n.flushMu.Lock() + defer n.flushMu.Unlock() + if !n.flushScheduled { return } + n.resultMu.RLock() + defer n.resultMu.RUnlock() n.send(Notification{Timestamp: n.result.LatestTimestamp(), Result: n.result.Clone()}) - n.resultScheduled = false + n.flushScheduled = false } func (n *notifier) scheduleFlush() { - n.resultMu.Lock() - defer n.resultMu.Unlock() + n.flushMu.Lock() + defer n.flushMu.Unlock() // Inform existing scheduler about the next result - if n.resultScheduled { + if n.flushScheduled { select { - case n.resultCh <- struct{}{}: + case n.flushCh <- struct{}{}: default: } return } // Run the scheduler - n.resultScheduled = true + n.flushScheduled = true go func() { flushTimer := time.NewTimer(FlushResultMaxTime) flushTimerEnabled := false @@ -111,7 +122,7 @@ func (n *notifier) scheduleFlush() { case <-time.After(FlushResultTime): n.Flush() flushTimerEnabled = false - case <-n.resultCh: + case <-n.flushCh: if !flushTimerEnabled { flushTimerEnabled = true flushTimer.Reset(FlushResultMaxTime) @@ -160,13 +171,14 @@ func (n *notifier) Event(ref string, ts time.Time, level, reason, message string } func (n *notifier) recompute() { - if !n.result.Initialization.FinishedAt.IsZero() && n.result.Initialization.FinishedAt.Before(n.GetLastTimestamp("")) { - n.result.Initialization.FinishedAt = n.GetLastTimestamp("") + lastTs := n.unsafeGetLastTimestamp("") + if !n.result.Initialization.FinishedAt.IsZero() && n.result.Initialization.FinishedAt.Before(lastTs) { + n.result.Initialization.FinishedAt = lastTs } for k := range n.result.Steps { - if !n.result.Steps[k].FinishedAt.IsZero() && n.result.Steps[k].FinishedAt.Before(n.GetLastTimestamp("")) { + if !n.result.Steps[k].FinishedAt.IsZero() && n.result.Steps[k].FinishedAt.Before(lastTs) { step := n.result.Steps[k] - step.FinishedAt = n.GetLastTimestamp("") + step.FinishedAt = lastTs n.result.Steps[k] = step } } @@ -205,6 +217,8 @@ func (n *notifier) queueStep(ref string, ts time.Time) { } func (n *notifier) Queue(ref string, ts time.Time) { + n.resultMu.Lock() + defer n.resultMu.Unlock() if ref == "" { n.queue(ts) } else if ref == InitContainerName { @@ -250,6 +264,9 @@ func (n *notifier) startStep(ref string, ts time.Time) { } func (n *notifier) Start(ref string, ts time.Time) { + n.resultMu.Lock() + defer n.resultMu.Unlock() + if ref == "" { n.start(ts) } else if ref == InitContainerName { @@ -260,18 +277,23 @@ func (n *notifier) Start(ref string, ts time.Time) { } func (n *notifier) Output(ref string, ts time.Time, output *instructions.Instruction) { + n.resultMu.RLock() if ref == InitContainerName { ref = "" } if _, ok := n.result.Steps[ref]; !ok && ref != "" { + n.resultMu.RUnlock() return } + n.resultMu.RUnlock() n.RegisterTimestamp(ref, ts) n.Flush() n.send(Notification{Timestamp: ts.UTC(), Ref: ref, Output: output}) } func (n *notifier) Finish(ts time.Time) { + n.resultMu.Lock() + defer n.resultMu.Unlock() if !n.result.FinishedAt.Before(ts) { return } @@ -280,6 +302,8 @@ func (n *notifier) Finish(ts time.Time) { } func (n *notifier) UpdateStepStatus(ref string, status testkube.TestWorkflowStepStatus) { + n.resultMu.Lock() + defer n.resultMu.Unlock() if _, ok := n.result.Steps[ref]; !ok || (n.result.Steps[ref].Status != nil || *n.result.Steps[ref].Status == status) { return } @@ -299,6 +323,8 @@ func (n *notifier) finishInit(status ContainerResultStep) { } func (n *notifier) IsAnyAborted() bool { + n.resultMu.RLock() + defer n.resultMu.RUnlock() if n.result.Initialization.Status != nil && *n.result.Initialization.Status == testkube.ABORTED_TestWorkflowStepStatus { return true } @@ -311,6 +337,8 @@ func (n *notifier) IsAnyAborted() bool { } func (n *notifier) IsFinished(ref string) bool { + n.resultMu.RLock() + defer n.resultMu.RUnlock() if ref == InitContainerName { return !n.result.Initialization.FinishedAt.IsZero() } @@ -318,6 +346,8 @@ func (n *notifier) IsFinished(ref string) bool { } func (n *notifier) FinishStep(ref string, status ContainerResultStep) { + n.resultMu.Lock() + defer n.resultMu.Unlock() if ref == InitContainerName { n.finishInit(status) return @@ -335,6 +365,8 @@ func (n *notifier) FinishStep(ref string, status ContainerResultStep) { } func (n *notifier) Pause(ref string, ts time.Time) { + n.resultMu.Lock() + defer n.resultMu.Unlock() if n.result.Steps[ref].Status != nil && *n.result.Steps[ref].Status == testkube.PAUSED_TestWorkflowStepStatus { return } @@ -343,11 +375,15 @@ func (n *notifier) Pause(ref string, ts time.Time) { } func (n *notifier) Resume(ref string, ts time.Time) { + n.resultMu.Lock() + defer n.resultMu.Unlock() n.result.PauseEnd(n.sig, n.scheduledAt, ref, ts) n.emit() } func (n *notifier) GetStepResult(ref string) testkube.TestWorkflowStepResult { + n.resultMu.RLock() + defer n.resultMu.RUnlock() if ref == InitContainerName { return *n.result.Initialization } @@ -385,6 +421,6 @@ func newNotifier(ctx context.Context, signature []stage.Signature, scheduledAt t result: result, lastTs: make(map[string]time.Time), - resultCh: make(chan struct{}, 1), + flushCh: make(chan struct{}, 1), } }