Skip to content

Commit

Permalink
[WIP] Attempt to fix sync test flakiness (ava-labs#2760)
Browse files Browse the repository at this point in the history
Co-authored-by: Darioush Jalali <darioush.jalali@avalabs.org>
Co-authored-by: Dan Laine <daniel.laine@avalabs.org>
  • Loading branch information
3 people committed Mar 21, 2023
1 parent d93abe3 commit 1783891
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 17 deletions.
18 changes: 13 additions & 5 deletions x/merkledb/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,20 @@ func (n *node) removeChild(child *node) {
delete(n.children, child.key[len(n.key)])
}

// Returns a copy of [n].
// clone Returns a copy of [n].
// nodeBytes is intentionally not included because it can cause a race.
// nodes being evicted by the cache can write nodeBytes,
// so reading them during the cloning would be a data race.
func (n *node) clone() *node {
result := *n
result.children = maps.Clone(n.children)
result.value = Clone(n.value)
return &result
return &node{
id: n.id,
key: n.key,
dbNode: dbNode{
value: Clone(n.value),
children: maps.Clone(n.children),
},
valueDigest: Clone(n.valueDigest),
}
}

// Returns the ProofNode representation of this node.
Expand Down
23 changes: 11 additions & 12 deletions x/sync/syncmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (m *StateSyncManager) StartSyncing(ctx context.Context) error {

// Add work item to fetch the entire key range.
// Note that this will be the first work item to be processed.
m.enqueueWork(newWorkItem(ids.Empty, nil, nil, lowPriority))
m.unprocessedWork.Insert(newWorkItem(ids.Empty, nil, nil, lowPriority))

m.syncing = true
ctx, m.cancelCtx = context.WithCancel(ctx)
Expand Down Expand Up @@ -285,9 +285,6 @@ func (m *StateSyncManager) getAndApplyChangeProof(ctx context.Context, workItem
return
}

m.workLock.Lock()
defer m.workLock.Unlock()

select {
case <-m.syncDoneChan:
// If we're closed, don't apply the proof.
Expand Down Expand Up @@ -343,11 +340,6 @@ func (m *StateSyncManager) getAndApplyRangeProof(ctx context.Context, workItem *
return
}

// TODO danlaine: Do we need this or can we
// grab just before touching [m.unprocessedWork]?
m.workLock.Lock()
defer m.workLock.Unlock()

select {
case <-m.syncDoneChan:
// If we're closed, don't apply the proof.
Expand Down Expand Up @@ -549,7 +541,7 @@ func (m *StateSyncManager) setError(err error) {
}

// Mark the range [start, end] as synced up to [rootID].
// Assumes [m.workLock] is held.
// Assumes [m.workLock] is not held.
func (m *StateSyncManager) completeWorkItem(ctx context.Context, workItem *syncWorkItem, largestHandledKey []byte, rootID ids.ID, proofOfLargestKey []merkledb.ProofNode) {
// if the last key is equal to the end, then the full range is completed
if !bytes.Equal(largestHandledKey, workItem.end) {
Expand All @@ -576,6 +568,9 @@ func (m *StateSyncManager) completeWorkItem(ctx context.Context, workItem *syncW
zap.Binary("end", largestHandledKey),
)
if m.getTargetRoot() == rootID {
m.workLock.Lock()
defer m.workLock.Unlock()

m.processedWork.MergeInsert(newWorkItem(rootID, workItem.start, largestHandledKey, workItem.priority))
} else {
// the root has changed, so reinsert with high priority
Expand All @@ -586,9 +581,13 @@ func (m *StateSyncManager) completeWorkItem(ctx context.Context, workItem *syncW
// Queue the given key range to be fetched and applied.
// If there are sufficiently few unprocessed/processing work items,
// splits the range into two items and queues them both.
// Assumes [m.workLock] is held.
// Assumes [m.workLock] is not held.
func (m *StateSyncManager) enqueueWork(item *syncWorkItem) {
defer m.unprocessedWorkCond.Signal()
m.workLock.Lock()
defer func() {
m.workLock.Unlock()
m.unprocessedWorkCond.Signal()
}()

if m.processingWorkItems+m.unprocessedWork.Len() > 2*m.config.SimultaneousWorkLimit {
// There are too many work items already, don't split the range
Expand Down

0 comments on commit 1783891

Please sign in to comment.