Skip to content

Commit

Permalink
Added a check for deadlocked adapters
Browse files Browse the repository at this point in the history
  • Loading branch information
dylanratcliffe committed Dec 7, 2024
1 parent 83e47ed commit 855243b
Showing 1 changed file with 28 additions and 7 deletions.
35 changes: 28 additions & 7 deletions enginerequests.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ func (e *Engine) ExecuteQuery(ctx context.Context, query *sdp.Query, items chan<
// Since we need to wait for only the processing of this query's executions, we need a separate WaitGroup here
// Overall MaxParallelExecutions evaluation is handled by e.executionPool
wg := sync.WaitGroup{}
expandedMutex := sync.RWMutex{}
expandedMutex.RLock()
for q, adapters := range expanded {
wg.Add(1)
// localize values for the closure below
Expand All @@ -239,7 +241,16 @@ func (e *Engine) ExecuteQuery(ctx context.Context, query *sdp.Query, items chan<
)
p.Go(func() {
defer LogRecoverToReturn(ctx, "ExecuteQuery inner")
defer wg.Done()
defer func() {
// Delete our query from the map so that we can track which
// ones are still running
expandedMutex.Lock()
defer expandedMutex.Unlock()
delete(expanded, localQ)

// Mark the work as done
wg.Done()
}()
defer func() {
if localQ.GetMethod() == sdp.QueryMethod_LIST {
listExecutionPoolCount.Add(-1)
Expand Down Expand Up @@ -285,6 +296,7 @@ func (e *Engine) ExecuteQuery(ctx context.Context, query *sdp.Query, items chan<
})
}()
}
expandedMutex.RUnlock()

waitGroupDone := make(chan struct{})
go func() {
Expand All @@ -310,12 +322,21 @@ func (e *Engine) ExecuteQuery(ctx context.Context, query *sdp.Query, items chan<
return
case <-time.After(longRunningAdaptersTimeout):
// If we're here, then the wait group didn't finish in time
log.WithContext(ctx).WithFields(log.Fields{
"ovm.query.uuid": query.GetUUID(),
"ovm.query.type": query.GetType(),
"ovm.query.scope": query.GetScope(),
"ovm.query.method": query.GetMethod().String(),
}).Errorf("Wait group still running %v after context cancelled", longRunningAdaptersTimeout)
expandedMutex.RLock()
for q, adapters := range expanded {
adapterNames := make([]string, len(adapters))
for i, a := range adapters {
adapterNames[i] = a.Name()
}
log.WithContext(ctx).WithFields(log.Fields{
"ovm.query.uuid": q.ParseUuid().String(),
"ovm.query.type": q.GetType(),
"ovm.query.scope": q.GetScope(),
"ovm.query.method": q.GetMethod().String(),
"ovm.query.adapters": adapterNames,
}).Errorf("Wait group still running %v after context cancelled", longRunningAdaptersTimeout)
}
expandedMutex.RUnlock()
}
}
}()
Expand Down

0 comments on commit 855243b

Please sign in to comment.