Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix CPU is consumed by polling frequently when there is no subscriber #4735

Merged
merged 12 commits into from
Nov 14, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Threading;
using System.Threading.Tasks;

using Microsoft.VisualStudio.TestPlatform.Utilities;

namespace Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces;

public interface ICommunicationChannel : IDisposable
{
/// <summary>
/// Event raised when data is received on the communication channel.
/// </summary>
event EventHandler<MessageReceivedEventArgs> MessageReceived;
TrackableEvent<MessageReceivedEventArgs> MessageReceived { get; }

/// <summary>
/// Frames and sends the provided data over communication channel.
Expand All @@ -24,5 +27,66 @@ public interface ICommunicationChannel : IDisposable
/// Notification from server/client that data is available.
/// </summary>
/// <returns>A <see cref="Task"/> implying async nature of the function.</returns>
Task NotifyDataAvailable();
Task NotifyDataAvailable(CancellationToken cancellationToken);
}

#pragma warning disable CA1001 // Types that own disposable fields should be disposable
nohwnd marked this conversation as resolved.
Show resolved Hide resolved
public class TrackableEvent<T>
#pragma warning restore CA1001 // Types that own disposable fields should be disposable
{
private readonly ManualResetEventSlim _slim;

internal event EventHandler<T>? Event;

public TrackableEvent()
{
_slim = new ManualResetEventSlim(Event != null);
}

public void Notify(object sender, T eventArgs, string traceDisplayName)
{
var e = Event;
if (e != null)
{
e.SafeInvoke(sender, eventArgs!, traceDisplayName);
}
}

public bool WaitForSubscriber(int timeoutMilliseconds, CancellationToken cancellationToken)
{
var _ = timeoutMilliseconds;
var __ = cancellationToken;
return Event != null;
//return _slim.Wait(timeoutMilliseconds, cancellationToken);
nohwnd marked this conversation as resolved.
Show resolved Hide resolved
}

public void Subscribe(EventHandler<T> eventHandler)
{
Event += eventHandler;
if (Event != null)
{
_slim.Set();
}
}

public void Unsubscribe(EventHandler<T> eventHandler)
{
Event -= eventHandler;
if (Event == null)
{
_slim.Reset();
}
}
}

internal class UnsubscribeToken : IDisposable
{
private readonly Action _unsubscribeCallback;
internal UnsubscribeToken(Action value) => _unsubscribeCallback = value;

public void Unsubscribe() => _unsubscribeCallback();
public void Dispose()
{
_unsubscribeCallback();
}
cvpoienaru marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
using System;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces;
using Microsoft.VisualStudio.TestPlatform.ObjectModel;
using Microsoft.VisualStudio.TestPlatform.PlatformAbstractions;
using Microsoft.VisualStudio.TestPlatform.Utilities;

namespace Microsoft.VisualStudio.TestPlatform.CommunicationUtilities;

Expand Down Expand Up @@ -37,7 +37,7 @@ public LengthPrefixCommunicationChannel(Stream stream)
}

/// <inheritdoc />
public event EventHandler<MessageReceivedEventArgs>? MessageReceived;
public TrackableEvent<MessageReceivedEventArgs> MessageReceived { get; } = new TrackableEvent<MessageReceivedEventArgs>();

/// <inheritdoc />
public Task Send(string data)
Expand Down Expand Up @@ -71,7 +71,7 @@ public Task Send(string data)
}

/// <inheritdoc />
public Task NotifyDataAvailable()
public Task NotifyDataAvailable(CancellationToken cancellationToken)
{
try
{
Expand All @@ -85,14 +85,10 @@ public Task NotifyDataAvailable()
// Try read data even if no one is listening to the data stream. Some server
// implementations (like Sockets) depend on the read operation to determine if a
// connection is closed.
nohwnd marked this conversation as resolved.
Show resolved Hide resolved
if (MessageReceived != null)
if (MessageReceived.WaitForSubscriber(1000, cancellationToken))
{
var data = _reader.ReadString();
MessageReceived.SafeInvoke(this, new MessageReceivedEventArgs { Data = data }, "LengthPrefixCommunicationChannel: MessageReceived");
}
else
{
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This message is unnecessary, and way too much detail. with prestart it is normal that client is waiting for his turn and this would just add yet another message to the log that would always repeat.

EqtTrace.Verbose("LengthPrefixCommunicationChannel.NotifyDataAvailable: New data are waiting to be received, but there is no subscriber to be notified. Not reading them from the stream.");
MessageReceived.Notify(this, new MessageReceivedEventArgs { Data = data }, "LengthPrefixCommunicationChannel: MessageReceived");
}
}
catch (ObjectDisposedException ex) when (!_reader.BaseStream.CanRead)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces.Disconnect
Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces.DisconnectedEventArgs.Error.get -> System.Exception?
Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces.DisconnectedEventArgs.Error.set -> void
Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces.ICommunicationChannel
Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces.ICommunicationChannel.MessageReceived -> System.EventHandler<Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces.MessageReceivedEventArgs!>!
Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces.ICommunicationChannel.NotifyDataAvailable() -> System.Threading.Tasks.Task!
Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces.ICommunicationChannel.Send(string! data) -> System.Threading.Tasks.Task!
Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces.ICommunicationEndPoint
Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces.ICommunicationEndPoint.Connected -> System.EventHandler<Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces.ConnectedEventArgs!>!
Expand Down Expand Up @@ -189,8 +187,6 @@ Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.JsonDataSerializer.Se
Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.LengthPrefixCommunicationChannel
Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.LengthPrefixCommunicationChannel.Dispose() -> void
Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.LengthPrefixCommunicationChannel.LengthPrefixCommunicationChannel(System.IO.Stream! stream) -> void
Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.LengthPrefixCommunicationChannel.MessageReceived -> System.EventHandler<Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces.MessageReceivedEventArgs!>?
Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.LengthPrefixCommunicationChannel.NotifyDataAvailable() -> System.Threading.Tasks.Task!
Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.LengthPrefixCommunicationChannel.Send(string! data) -> System.Threading.Tasks.Task!
Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Message
Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Message.Message() -> void
Expand Down Expand Up @@ -392,3 +388,13 @@ static Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.JsonDataSerial
~static Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Resources.Resources.UnexpectedMessage.get -> string
~static Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Resources.Resources.VersionCheckFailed.get -> string
~static Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Resources.Resources.VersionCheckTimedout.get -> string
Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces.TrackableEvent<T>
Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces.TrackableEvent<T>.TrackableEvent() -> void
Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces.TrackableEvent<T>.Unsubscribe(System.EventHandler<T>! eventHandler) -> void
Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces.TrackableEvent<T>.WaitForSubscriber(int timeoutMilliseconds, System.Threading.CancellationToken cancellationToken) -> bool
Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces.TrackableEvent<T>.Notify(object! sender, T eventArgs, string! traceDisplayName) -> void
Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces.TrackableEvent<T>.Subscribe(System.EventHandler<T>! eventHandler) -> void
Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces.ICommunicationChannel.MessageReceived.get -> Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces.TrackableEvent<Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces.MessageReceivedEventArgs!>!
Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.LengthPrefixCommunicationChannel.MessageReceived.get -> Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces.TrackableEvent<Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces.MessageReceivedEventArgs!>!
Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces.ICommunicationChannel.NotifyDataAvailable(System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task!
Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.LengthPrefixCommunicationChannel.NotifyDataAvailable(System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task!
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Net.Sockets;
Expand Down Expand Up @@ -40,17 +41,30 @@ internal static Task MessageLoopAsync(
socketException);
}

// PERF: check if verbose is enabled once, and re-use for all calls in the tight loop below. The check for verbose is shows in perf traces
nohwnd marked this conversation as resolved.
Show resolved Hide resolved
// below and we are wasting resources re-checking when user does not have it open. Downside of this is that if you change the verbosity level
// during runtime (e.g. in VS options), you won't update here. Which is imho an okay tradeoff.
var isVerboseEnabled = EqtTrace.IsVerboseEnabled;

var sw = Stopwatch.StartNew();
// Set read timeout to avoid blocking receive raw message
while (channel != null && !cancellationToken.IsCancellationRequested)
{
EqtTrace.Verbose("TcpClientExtensions.MessageLoopAsync: Polling on remoteEndPoint: {0} localEndPoint: {1}", remoteEndPoint, localEndPoint);
if (isVerboseEnabled)
{
EqtTrace.Verbose("TcpClientExtensions.MessageLoopAsync: Polling on remoteEndPoint: {0} localEndPoint: {1} after {2} ms", remoteEndPoint, localEndPoint, sw.ElapsedMilliseconds);
sw.Restart();
}

try
{
if (client.Client.Poll(Streamreadtimeout, SelectMode.SelectRead))
{
EqtTrace.Verbose("TcpClientExtensions.MessageLoopAsync: NotifyDataAvailable remoteEndPoint: {0} localEndPoint: {1}", remoteEndPoint, localEndPoint);
channel.NotifyDataAvailable();
if (isVerboseEnabled)
{
EqtTrace.Verbose("TcpClientExtensions.MessageLoopAsync: NotifyDataAvailable remoteEndPoint: {0} localEndPoint: {1}", remoteEndPoint, localEndPoint);
}
channel.NotifyDataAvailable(cancellationToken);
}
}
catch (IOException ioException)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ private bool TrySetupMessageReceiver(
}

_onMessageReceived = onMessageReceived;
_channel.MessageReceived += _onMessageReceived;
_channel.MessageReceived.Subscribe(_onMessageReceived);

return true;
}
Expand Down Expand Up @@ -269,7 +269,7 @@ public void CheckVersionWithTestHost()

protocolNegotiated.Set();
};
_channel.MessageReceived += onMessageReceived;
_channel.MessageReceived.Subscribe(onMessageReceived);

try
{
Expand All @@ -290,7 +290,7 @@ public void CheckVersionWithTestHost()
}
finally
{
_channel.MessageReceived -= onMessageReceived;
_channel.MessageReceived.Unsubscribe(onMessageReceived);
}
}

Expand Down Expand Up @@ -516,9 +516,9 @@ public void Close()
/// <inheritdoc />
public void Dispose()
{
if (_channel != null)
if (_channel != null && _onMessageReceived != null)
{
_channel.MessageReceived -= _onMessageReceived;
_channel.MessageReceived.Unsubscribe(_onMessageReceived);
}

_communicationEndpoint.Stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public virtual void InitializeCommunication()
throw connectedArgs.Fault;
}
_channel = connectedArgs.Channel;
_channel.MessageReceived += OnMessageReceived;
_channel.MessageReceived.Subscribe(OnMessageReceived);
_requestSenderConnected.Set();
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ public void SocketEndpointShouldNotifyChannelOnDataAvailable()
{
var message = string.Empty;
ManualResetEvent waitForMessage = new(false);
SetupChannel(out ConnectedEventArgs? _)!.MessageReceived += (s, e) =>
SetupChannel(out ConnectedEventArgs? _)!.MessageReceived.Subscribe((s, e) =>
{
message = e.Data;
waitForMessage.Set();
};
});

WriteData(Client!);

Expand Down
Loading