Skip to content

Commit

Permalink
Use AsyncBatchingWorkQueue instead of TaskQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
sharwell committed Mar 11, 2024
1 parent 01b7c23 commit a66eace
Showing 1 changed file with 57 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.CodeAnalysis.Collections;
using Microsoft.CodeAnalysis.Common;
using Microsoft.CodeAnalysis.Host;
using Microsoft.CodeAnalysis.Host.Mef;
Expand All @@ -19,12 +20,13 @@
namespace Microsoft.CodeAnalysis.Diagnostics
{
[Export(typeof(IDiagnosticService)), Shared]
internal partial class DiagnosticService : IDiagnosticService
internal partial class DiagnosticService : IDiagnosticService, IDisposable
{
private const string DiagnosticsUpdatedEventName = "DiagnosticsUpdated";

private readonly EventMap _eventMap = new();
private readonly TaskQueue _eventQueue;
private readonly CancellationTokenSource _eventQueueCancellation = new();
private readonly AsyncBatchingWorkQueue<(BatchOperation operation, IDiagnosticUpdateSource source, ImmutableArray<DiagnosticsUpdatedArgs> argsCollection)> _eventQueue;

private readonly object _gate = new();
private readonly Dictionary<IDiagnosticUpdateSource, Dictionary<Workspace, Dictionary<object, Data>>> _map = [];
Expand All @@ -43,7 +45,17 @@ public DiagnosticService(
_updateSources = [];

// queue to serialize events.
_eventQueue = new TaskQueue(listenerProvider.GetListener(FeatureAttribute.DiagnosticService), TaskScheduler.Default);
_eventQueue = new AsyncBatchingWorkQueue<(BatchOperation operation, IDiagnosticUpdateSource source, ImmutableArray<DiagnosticsUpdatedArgs> argsCollection)>(
delay: TimeSpan.Zero,
ProcessEventsBatchAsync,
listenerProvider.GetListener(FeatureAttribute.DiagnosticService),
_eventQueueCancellation.Token);
}

private enum BatchOperation
{
DiagnosticsUpdated,
DiagnosticsCleared,
}

public event EventHandler<ImmutableArray<DiagnosticsUpdatedArgs>> DiagnosticsUpdated
Expand All @@ -59,41 +71,59 @@ public event EventHandler<ImmutableArray<DiagnosticsUpdatedArgs>> DiagnosticsUpd
}
}

private void RaiseDiagnosticsUpdated(IDiagnosticUpdateSource source, ImmutableArray<DiagnosticsUpdatedArgs> argsCollection)
void IDisposable.Dispose()
{
var ev = _eventMap.GetEventHandlers<EventHandler<ImmutableArray<DiagnosticsUpdatedArgs>>>(DiagnosticsUpdatedEventName);

_eventQueue.ScheduleTask(DiagnosticsUpdatedEventName, () =>
{
var updatedArgsCollection = UpdateDataMap(source, argsCollection);
if (updatedArgsCollection.IsEmpty)
{
// there is no change, nothing to raise events for.
return;
}

ev.RaiseEvent(static (handler, arg) => handler(arg.source, arg.updatedArgsCollection), (source, updatedArgsCollection));
}, CancellationToken.None);
_eventQueueCancellation.Cancel();
}

private void RaiseDiagnosticsCleared(IDiagnosticUpdateSource source)
private ValueTask ProcessEventsBatchAsync(ImmutableSegmentedList<(BatchOperation operation, IDiagnosticUpdateSource source, ImmutableArray<DiagnosticsUpdatedArgs> argsCollection)> batch, CancellationToken cancellationToken)
{
var ev = _eventMap.GetEventHandlers<EventHandler<ImmutableArray<DiagnosticsUpdatedArgs>>>(DiagnosticsUpdatedEventName);

_eventQueue.ScheduleTask(DiagnosticsUpdatedEventName, () =>
foreach (var (operation, source, argsCollection) in batch)
{
using var argsBuilder = TemporaryArray<DiagnosticsUpdatedArgs>.Empty;
if (operation == BatchOperation.DiagnosticsUpdated)
{
var updatedArgsCollection = UpdateDataMap(source, argsCollection);
if (updatedArgsCollection.IsEmpty)
{
// there is no change, nothing to raise events for.
continue;
}

ev.RaiseEvent(static (handler, arg) => handler(arg.source, arg.updatedArgsCollection), (source, updatedArgsCollection));
}
else if (operation == BatchOperation.DiagnosticsCleared)
{
using var argsBuilder = TemporaryArray<DiagnosticsUpdatedArgs>.Empty;

if (!ClearDiagnosticsReportedBySource(source, ref argsBuilder.AsRef()))
{
// there is no change, nothing to raise events for.
continue;
}

if (!ClearDiagnosticsReportedBySource(source, ref argsBuilder.AsRef()))
// don't create event listener if it haven't created yet. if there is a diagnostic to remove
// listener should have already created since all events are done in the serialized queue
ev.RaiseEvent(static (handler, arg) => handler(arg.source, arg.args), (source, args: argsBuilder.ToImmutableAndClear()));
}
else
{
// there is no change, nothing to raise events for.
return;
throw ExceptionUtilities.UnexpectedValue(operation);
}
}

// don't create event listener if it haven't created yet. if there is a diagnostic to remove
// listener should have already created since all events are done in the serialized queue
ev.RaiseEvent(static (handler, arg) => handler(arg.source, arg.args), (source, args: argsBuilder.ToImmutableAndClear()));
}, CancellationToken.None);
return ValueTaskFactory.CompletedTask;
}

private void RaiseDiagnosticsUpdated(IDiagnosticUpdateSource source, ImmutableArray<DiagnosticsUpdatedArgs> argsCollection)
{
_eventQueue.AddWork((BatchOperation.DiagnosticsUpdated, source, argsCollection));
}

private void RaiseDiagnosticsCleared(IDiagnosticUpdateSource source)
{
_eventQueue.AddWork((BatchOperation.DiagnosticsCleared, source, ImmutableArray<DiagnosticsUpdatedArgs>.Empty));
}

private ImmutableArray<DiagnosticsUpdatedArgs> UpdateDataMap(IDiagnosticUpdateSource source, ImmutableArray<DiagnosticsUpdatedArgs> argsCollection)
Expand Down

0 comments on commit a66eace

Please sign in to comment.