Skip to content

Commit

Permalink
fix: concurrency problems when having a lot of parallel updates (#5754)
Browse files Browse the repository at this point in the history
* fix: panic: sync: WaitGroup is reused before previous Wait has returned
* fix: fatal error: concurrent map read and map write
  • Loading branch information
rangoo94 committed Aug 9, 2024
1 parent f322802 commit 7120500
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 19 deletions.
3 changes: 3 additions & 0 deletions cmd/testworkflow-toolkit/artifacts/cloud_uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type cloudUploader struct {
parallelism int
error atomic.Bool
reqEnhancers []CloudUploaderRequestEnhancer
waitMu sync.Mutex
}

func (d *cloudUploader) Start() (err error) {
Expand Down Expand Up @@ -140,6 +141,8 @@ func (d *cloudUploader) Add(path string, file io.Reader, size int64) error {
}

func (d *cloudUploader) End() error {
d.waitMu.Lock()
defer d.waitMu.Unlock()
d.wg.Wait()
if d.error.Load() {
return fmt.Errorf("upload failed")
Expand Down
74 changes: 55 additions & 19 deletions pkg/testworkflows/testworkflowcontroller/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ type notifier struct {
scheduledAt time.Time
lastTs map[string]time.Time

resultMu sync.Mutex
resultCh chan struct{}
resultScheduled bool
resultMu sync.RWMutex
flushMu sync.Mutex
flushCh chan struct{}
flushScheduled bool
}

func (n *notifier) send(value Notification) {
Expand All @@ -47,7 +48,7 @@ func (n *notifier) error(err error) {
n.ch <- ChannelMessage[Notification]{Error: err}
}

func (n *notifier) GetLastTimestamp(ref string) time.Time {
func (n *notifier) unsafeGetLastTimestamp(ref string) time.Time {
last := n.lastTs[ref]
if n.result.Steps[ref].FinishedAt.After(last) {
return n.result.Steps[ref].FinishedAt
Expand All @@ -61,37 +62,47 @@ func (n *notifier) GetLastTimestamp(ref string) time.Time {
return last
}

func (n *notifier) GetLastTimestamp(ref string) time.Time {
n.resultMu.RLock()
defer n.resultMu.RUnlock()
return n.unsafeGetLastTimestamp(ref)
}

func (n *notifier) RegisterTimestamp(ref string, t time.Time) {
if t.After(n.GetLastTimestamp(ref)) {
n.resultMu.Lock()
n.lastTs[ref] = t.UTC()
n.resultMu.Unlock()
}
}

func (n *notifier) Flush() {
n.resultMu.Lock()
defer n.resultMu.Unlock()
if !n.resultScheduled {
n.flushMu.Lock()
defer n.flushMu.Unlock()
if !n.flushScheduled {
return
}
n.resultMu.RLock()
defer n.resultMu.RUnlock()
n.send(Notification{Timestamp: n.result.LatestTimestamp(), Result: n.result.Clone()})
n.resultScheduled = false
n.flushScheduled = false
}

func (n *notifier) scheduleFlush() {
n.resultMu.Lock()
defer n.resultMu.Unlock()
n.flushMu.Lock()
defer n.flushMu.Unlock()

// Inform existing scheduler about the next result
if n.resultScheduled {
if n.flushScheduled {
select {
case n.resultCh <- struct{}{}:
case n.flushCh <- struct{}{}:
default:
}
return
}

// Run the scheduler
n.resultScheduled = true
n.flushScheduled = true
go func() {
flushTimer := time.NewTimer(FlushResultMaxTime)
flushTimerEnabled := false
Expand All @@ -111,7 +122,7 @@ func (n *notifier) scheduleFlush() {
case <-time.After(FlushResultTime):
n.Flush()
flushTimerEnabled = false
case <-n.resultCh:
case <-n.flushCh:
if !flushTimerEnabled {
flushTimerEnabled = true
flushTimer.Reset(FlushResultMaxTime)
Expand Down Expand Up @@ -160,13 +171,14 @@ func (n *notifier) Event(ref string, ts time.Time, level, reason, message string
}

func (n *notifier) recompute() {
if !n.result.Initialization.FinishedAt.IsZero() && n.result.Initialization.FinishedAt.Before(n.GetLastTimestamp("")) {
n.result.Initialization.FinishedAt = n.GetLastTimestamp("")
lastTs := n.unsafeGetLastTimestamp("")
if !n.result.Initialization.FinishedAt.IsZero() && n.result.Initialization.FinishedAt.Before(lastTs) {
n.result.Initialization.FinishedAt = lastTs
}
for k := range n.result.Steps {
if !n.result.Steps[k].FinishedAt.IsZero() && n.result.Steps[k].FinishedAt.Before(n.GetLastTimestamp("")) {
if !n.result.Steps[k].FinishedAt.IsZero() && n.result.Steps[k].FinishedAt.Before(lastTs) {
step := n.result.Steps[k]
step.FinishedAt = n.GetLastTimestamp("")
step.FinishedAt = lastTs
n.result.Steps[k] = step
}
}
Expand Down Expand Up @@ -205,6 +217,8 @@ func (n *notifier) queueStep(ref string, ts time.Time) {
}

func (n *notifier) Queue(ref string, ts time.Time) {
n.resultMu.Lock()
defer n.resultMu.Unlock()
if ref == "" {
n.queue(ts)
} else if ref == InitContainerName {
Expand Down Expand Up @@ -250,6 +264,9 @@ func (n *notifier) startStep(ref string, ts time.Time) {
}

func (n *notifier) Start(ref string, ts time.Time) {
n.resultMu.Lock()
defer n.resultMu.Unlock()

if ref == "" {
n.start(ts)
} else if ref == InitContainerName {
Expand All @@ -260,18 +277,23 @@ func (n *notifier) Start(ref string, ts time.Time) {
}

func (n *notifier) Output(ref string, ts time.Time, output *instructions.Instruction) {
n.resultMu.RLock()
if ref == InitContainerName {
ref = ""
}
if _, ok := n.result.Steps[ref]; !ok && ref != "" {
n.resultMu.RUnlock()
return
}
n.resultMu.RUnlock()
n.RegisterTimestamp(ref, ts)
n.Flush()
n.send(Notification{Timestamp: ts.UTC(), Ref: ref, Output: output})
}

func (n *notifier) Finish(ts time.Time) {
n.resultMu.Lock()
defer n.resultMu.Unlock()
if !n.result.FinishedAt.Before(ts) {
return
}
Expand All @@ -280,6 +302,8 @@ func (n *notifier) Finish(ts time.Time) {
}

func (n *notifier) UpdateStepStatus(ref string, status testkube.TestWorkflowStepStatus) {
n.resultMu.Lock()
defer n.resultMu.Unlock()
if _, ok := n.result.Steps[ref]; !ok || (n.result.Steps[ref].Status != nil || *n.result.Steps[ref].Status == status) {
return
}
Expand All @@ -299,6 +323,8 @@ func (n *notifier) finishInit(status ContainerResultStep) {
}

func (n *notifier) IsAnyAborted() bool {
n.resultMu.RLock()
defer n.resultMu.RUnlock()
if n.result.Initialization.Status != nil && *n.result.Initialization.Status == testkube.ABORTED_TestWorkflowStepStatus {
return true
}
Expand All @@ -311,13 +337,17 @@ func (n *notifier) IsAnyAborted() bool {
}

func (n *notifier) IsFinished(ref string) bool {
n.resultMu.RLock()
defer n.resultMu.RUnlock()
if ref == InitContainerName {
return !n.result.Initialization.FinishedAt.IsZero()
}
return !n.result.Steps[ref].FinishedAt.IsZero()
}

func (n *notifier) FinishStep(ref string, status ContainerResultStep) {
n.resultMu.Lock()
defer n.resultMu.Unlock()
if ref == InitContainerName {
n.finishInit(status)
return
Expand All @@ -335,6 +365,8 @@ func (n *notifier) FinishStep(ref string, status ContainerResultStep) {
}

func (n *notifier) Pause(ref string, ts time.Time) {
n.resultMu.Lock()
defer n.resultMu.Unlock()
if n.result.Steps[ref].Status != nil && *n.result.Steps[ref].Status == testkube.PAUSED_TestWorkflowStepStatus {
return
}
Expand All @@ -343,11 +375,15 @@ func (n *notifier) Pause(ref string, ts time.Time) {
}

func (n *notifier) Resume(ref string, ts time.Time) {
n.resultMu.Lock()
defer n.resultMu.Unlock()
n.result.PauseEnd(n.sig, n.scheduledAt, ref, ts)
n.emit()
}

func (n *notifier) GetStepResult(ref string) testkube.TestWorkflowStepResult {
n.resultMu.RLock()
defer n.resultMu.RUnlock()
if ref == InitContainerName {
return *n.result.Initialization
}
Expand Down Expand Up @@ -385,6 +421,6 @@ func newNotifier(ctx context.Context, signature []stage.Signature, scheduledAt t
result: result,
lastTs: make(map[string]time.Time),

resultCh: make(chan struct{}, 1),
flushCh: make(chan struct{}, 1),
}
}

0 comments on commit 7120500

Please sign in to comment.