diff --git a/enginerequests.go b/enginerequests.go index ea77455..05d3247 100644 --- a/enginerequests.go +++ b/enginerequests.go @@ -214,6 +214,8 @@ func (e *Engine) ExecuteQuery(ctx context.Context, query *sdp.Query, items chan< // Since we need to wait for only the processing of this query's executions, we need a separate WaitGroup here // Overall MaxParallelExecutions evaluation is handled by e.executionPool wg := sync.WaitGroup{} + expandedMutex := sync.RWMutex{} + expandedMutex.RLock() for q, adapters := range expanded { wg.Add(1) // localize values for the closure below @@ -239,7 +241,16 @@ func (e *Engine) ExecuteQuery(ctx context.Context, query *sdp.Query, items chan< ) p.Go(func() { defer LogRecoverToReturn(ctx, "ExecuteQuery inner") - defer wg.Done() + defer func() { + // Delete our query from the map so that we can track which + // ones are still running + expandedMutex.Lock() + defer expandedMutex.Unlock() + delete(expanded, localQ) + + // Mark the work as done + wg.Done() + }() defer func() { if localQ.GetMethod() == sdp.QueryMethod_LIST { listExecutionPoolCount.Add(-1) @@ -285,6 +296,7 @@ func (e *Engine) ExecuteQuery(ctx context.Context, query *sdp.Query, items chan< }) }() } + expandedMutex.RUnlock() waitGroupDone := make(chan struct{}) go func() { @@ -310,12 +322,21 @@ func (e *Engine) ExecuteQuery(ctx context.Context, query *sdp.Query, items chan< return case <-time.After(longRunningAdaptersTimeout): // If we're here, then the wait group didn't finish in time - log.WithContext(ctx).WithFields(log.Fields{ - "ovm.query.uuid": query.GetUUID(), - "ovm.query.type": query.GetType(), - "ovm.query.scope": query.GetScope(), - "ovm.query.method": query.GetMethod().String(), - }).Errorf("Wait group still running %v after context cancelled", longRunningAdaptersTimeout) + expandedMutex.RLock() + for q, adapters := range expanded { + adapterNames := make([]string, len(adapters)) + for i, a := range adapters { + adapterNames[i] = a.Name() + } + log.WithContext(ctx).WithFields(log.Fields{ + "ovm.query.uuid": q.ParseUuid().String(), + "ovm.query.type": q.GetType(), + "ovm.query.scope": q.GetScope(), + "ovm.query.method": q.GetMethod().String(), + "ovm.query.adapters": adapterNames, + }).Errorf("Wait group still running %v after context cancelled", longRunningAdaptersTimeout) + } + expandedMutex.RUnlock() } } }()