Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use AsyncBatchingWorkQueue instead of TaskQueue #72425

Merged
merged 1 commit into from
Mar 11, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.CodeAnalysis.Collections;
using Microsoft.CodeAnalysis.Common;
using Microsoft.CodeAnalysis.Host;
using Microsoft.CodeAnalysis.Host.Mef;
Expand All @@ -19,12 +20,13 @@
namespace Microsoft.CodeAnalysis.Diagnostics
{
[Export(typeof(IDiagnosticService)), Shared]
internal partial class DiagnosticService : IDiagnosticService
internal partial class DiagnosticService : IDiagnosticService, IDisposable
{
private const string DiagnosticsUpdatedEventName = "DiagnosticsUpdated";

private readonly EventMap _eventMap = new();
private readonly TaskQueue _eventQueue;
private readonly CancellationTokenSource _eventQueueCancellation = new();
private readonly AsyncBatchingWorkQueue<(BatchOperation operation, IDiagnosticUpdateSource source, ImmutableArray<DiagnosticsUpdatedArgs> argsCollection)> _eventQueue;

private readonly object _gate = new();
private readonly Dictionary<IDiagnosticUpdateSource, Dictionary<Workspace, Dictionary<object, Data>>> _map = [];
Expand All @@ -43,7 +45,17 @@ public DiagnosticService(
_updateSources = [];

// queue to serialize events.
_eventQueue = new TaskQueue(listenerProvider.GetListener(FeatureAttribute.DiagnosticService), TaskScheduler.Default);
_eventQueue = new AsyncBatchingWorkQueue<(BatchOperation operation, IDiagnosticUpdateSource source, ImmutableArray<DiagnosticsUpdatedArgs> argsCollection)>(
delay: TimeSpan.Zero,
CyrusNajmabadi marked this conversation as resolved.
Show resolved Hide resolved
ProcessEventsBatchAsync,
listenerProvider.GetListener(FeatureAttribute.DiagnosticService),
_eventQueueCancellation.Token);
}

private enum BatchOperation
{
DiagnosticsUpdated,
DiagnosticsCleared,
}

public event EventHandler<ImmutableArray<DiagnosticsUpdatedArgs>> DiagnosticsUpdated
Expand All @@ -59,41 +71,59 @@ public event EventHandler<ImmutableArray<DiagnosticsUpdatedArgs>> DiagnosticsUpd
}
}

private void RaiseDiagnosticsUpdated(IDiagnosticUpdateSource source, ImmutableArray<DiagnosticsUpdatedArgs> argsCollection)
void IDisposable.Dispose()
{
var ev = _eventMap.GetEventHandlers<EventHandler<ImmutableArray<DiagnosticsUpdatedArgs>>>(DiagnosticsUpdatedEventName);

_eventQueue.ScheduleTask(DiagnosticsUpdatedEventName, () =>
{
var updatedArgsCollection = UpdateDataMap(source, argsCollection);
if (updatedArgsCollection.IsEmpty)
{
// there is no change, nothing to raise events for.
return;
}

ev.RaiseEvent(static (handler, arg) => handler(arg.source, arg.updatedArgsCollection), (source, updatedArgsCollection));
}, CancellationToken.None);
_eventQueueCancellation.Cancel();
}

private void RaiseDiagnosticsCleared(IDiagnosticUpdateSource source)
private ValueTask ProcessEventsBatchAsync(ImmutableSegmentedList<(BatchOperation operation, IDiagnosticUpdateSource source, ImmutableArray<DiagnosticsUpdatedArgs> argsCollection)> batch, CancellationToken cancellationToken)
{
var ev = _eventMap.GetEventHandlers<EventHandler<ImmutableArray<DiagnosticsUpdatedArgs>>>(DiagnosticsUpdatedEventName);

_eventQueue.ScheduleTask(DiagnosticsUpdatedEventName, () =>
foreach (var (operation, source, argsCollection) in batch)
{
using var argsBuilder = TemporaryArray<DiagnosticsUpdatedArgs>.Empty;
if (operation == BatchOperation.DiagnosticsUpdated)
{
var updatedArgsCollection = UpdateDataMap(source, argsCollection);
if (updatedArgsCollection.IsEmpty)
{
// there is no change, nothing to raise events for.
continue;
}

ev.RaiseEvent(static (handler, arg) => handler(arg.source, arg.updatedArgsCollection), (source, updatedArgsCollection));
}
else if (operation == BatchOperation.DiagnosticsCleared)
{
using var argsBuilder = TemporaryArray<DiagnosticsUpdatedArgs>.Empty;

if (!ClearDiagnosticsReportedBySource(source, ref argsBuilder.AsRef()))
{
// there is no change, nothing to raise events for.
continue;
}

if (!ClearDiagnosticsReportedBySource(source, ref argsBuilder.AsRef()))
// don't create event listener if it haven't created yet. if there is a diagnostic to remove
// listener should have already created since all events are done in the serialized queue
ev.RaiseEvent(static (handler, arg) => handler(arg.source, arg.args), (source, args: argsBuilder.ToImmutableAndClear()));
}
else
{
// there is no change, nothing to raise events for.
return;
throw ExceptionUtilities.UnexpectedValue(operation);
}
}

// don't create event listener if it haven't created yet. if there is a diagnostic to remove
// listener should have already created since all events are done in the serialized queue
ev.RaiseEvent(static (handler, arg) => handler(arg.source, arg.args), (source, args: argsBuilder.ToImmutableAndClear()));
}, CancellationToken.None);
return ValueTaskFactory.CompletedTask;
}

private void RaiseDiagnosticsUpdated(IDiagnosticUpdateSource source, ImmutableArray<DiagnosticsUpdatedArgs> argsCollection)
{
_eventQueue.AddWork((BatchOperation.DiagnosticsUpdated, source, argsCollection));
}

private void RaiseDiagnosticsCleared(IDiagnosticUpdateSource source)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not really understanding who is calling this.

Copy link
Member Author

@sharwell sharwell Mar 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

➡️ The caller is abstracted a few layers. There is an event hookup here:

{
_eventQueue.AddWork((BatchOperation.DiagnosticsCleared, source, ImmutableArray<DiagnosticsUpdatedArgs>.Empty));
}

private ImmutableArray<DiagnosticsUpdatedArgs> UpdateDataMap(IDiagnosticUpdateSource source, ImmutableArray<DiagnosticsUpdatedArgs> argsCollection)
Expand Down
Loading