Skip to content

Commit

Permalink
Simplify task handling in MeteringCollector to avoid racy edge case (#…
Browse files Browse the repository at this point in the history
…4121)

Co-authored-by: Martin Taillefer <mataille@microsoft.com>
  • Loading branch information
geeknoid and Martin Taillefer authored Jun 27, 2023
1 parent 81af726 commit 5a8029d
Showing 1 changed file with 34 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,18 +130,19 @@ public void Dispose()
}

_disposed = true;
}

_meterListener.Dispose();
_measurements.Clear();

// wake up anybody still waiting and inform them of the bad news: their horse is dead...
foreach (var w in _waiters)
{
_ = w.TaskSource.TrySetException(MakeObjectDisposedException());
}
_meterListener.Dispose();
_measurements.Clear();

_waiters.Clear();
// wake up anybody still waiting and inform them of the bad news: their horse is dead...
foreach (var w in _waiters)
{
// trigger the task from outside the lock
_ = w.TaskSource.TrySetException(MakeObjectDisposedException());
}

_waiters.Clear();
}

/// <summary>
Expand Down Expand Up @@ -230,14 +231,15 @@ public Task WaitForMeasurementsAsync(int minCount, CancellationToken cancellatio
{
lock (_measurements)
{
_waiters.Remove(w);
}

// trigger the task from outside the lock
#if NET6_0_OR_GREATER
w.TaskSource.TrySetCanceled(cancellationToken);
w.TaskSource.TrySetCanceled(cancellationToken);
#else
w.TaskSource.TrySetCanceled();
w.TaskSource.TrySetCanceled();
#endif

_waiters.Remove(w);
}
});
}

Expand Down Expand Up @@ -298,6 +300,7 @@ private void OnMeasurementRecorded(Instrument instrument, T measurement, ReadOnl
{
var m = new CollectedMeasurement<T>(measurement, tags, _timeProvider.GetUtcNow());

List<Waiter>? toBeWoken = null;
lock (_measurements)
{
if (!_disposed)
Expand All @@ -310,13 +313,23 @@ private void OnMeasurementRecorded(Instrument instrument, T measurement, ReadOnl
{
if (_measurements.Count >= _waiters[i].MinCount)
{
// we use TrySetResult since the task may already be in the Cancelled state due to a timeout.
_ = _waiters[i].TaskSource.TrySetResult(true);
toBeWoken ??= new();
toBeWoken.Add(_waiters[i]);
_waiters.RemoveAt(i);
}
}
}
}

if (toBeWoken != null)
{
// trigger the task from outside the lock
foreach (var w in toBeWoken)
{
// we use TrySetResult since the task may already be in the Cancelled state due to a timeout.
_ = w.TaskSource.TrySetResult(true);
}
}
}

private void ThrowIfDisposed()
Expand All @@ -341,8 +354,10 @@ public Waiter(int minCount)

public int MinCount { get; }

// The TCS is modified in a lock.
// Use RunContinuationsAsynchronously to ensure continuations don't run when holding the lock.
public TaskCompletionSource<bool> TaskSource { get; } = new(TaskCreationOptions.RunContinuationsAsynchronously);
// NOTE: In order to avoid potential dead locks, this task should
// be completed when the main lock is not being held. Otherwise,
// application code being woken up by the task could potentially
// call back into the MetricCollector code and thus trigger a deadlock.
public TaskCompletionSource<bool> TaskSource { get; } = new();
}
}

0 comments on commit 5a8029d

Please sign in to comment.