From 2df4bb96b971bbafde8e1fc21e108c16e2d3eee8 Mon Sep 17 00:00:00 2001
From: Devesh Sarda <32046390+sarda-devesh@users.noreply.github.com>
Date: Mon, 30 May 2022 15:25:44 -0700
Subject: [PATCH 01/77] Read exception being received but not thrown
---
.../MultiplexingStream.Channel.cs | 45 ++++++++-
.../MultiplexingStream.ControlCode.cs | 6 ++
src/Nerdbank.Streams/MultiplexingStream.cs | 94 ++++++++++++++++++-
.../MultiplexingStreamTests.cs | 22 +++++
4 files changed, 162 insertions(+), 5 deletions(-)
diff --git a/src/Nerdbank.Streams/MultiplexingStream.Channel.cs b/src/Nerdbank.Streams/MultiplexingStream.Channel.cs
index 099e92e3..1654922b 100644
--- a/src/Nerdbank.Streams/MultiplexingStream.Channel.cs
+++ b/src/Nerdbank.Streams/MultiplexingStream.Channel.cs
@@ -96,6 +96,11 @@ public class Channel : IDisposableObservable, IDuplexPipe
///
private bool isDisposed;
+ ///
+ /// Indicates whether we closed the writing channel due to an exception.
+ ///
+ private bool receivedContentWriteError;
+
///
/// The to use to get data to be transmitted over the .
///
@@ -449,8 +454,29 @@ internal async ValueTask OnContentAsync(FrameHeader header, ReadOnlySequence
/// Called by the when when it will not be writing any more data to the channel.
///
- internal void OnContentWritingCompleted()
+ /// If we are closing the writing channel due to us receiving an error, defaults to null.
+ internal void OnContentWritingCompleted(Exception? error = null)
{
+ if (this.receivedContentWriteError)
+ {
+ if (this.TraceSource!.Switch.ShouldTrace(TraceEventType.Information))
+ {
+ this.TraceSource.TraceEvent(TraceEventType.Information, (int)TraceEventId.WriteError, "Exiting from writing completed since it was already called");
+ }
+
+ // We received a content write error so we have already closed the channel
+ return;
+ }
+
+ if (error != null)
+ {
+ this.receivedContentWriteError = true;
+ if (this.TraceSource!.Switch.ShouldTrace(TraceEventType.Information))
+ {
+ this.TraceSource.TraceEvent(TraceEventType.Information, (int)TraceEventId.WriteError, "Exception {0} passed into writing complete", error);
+ }
+ }
+
this.DisposeSelfOnFailure(Task.Run(async delegate
{
if (!this.IsDisposed)
@@ -458,13 +484,13 @@ internal void OnContentWritingCompleted()
try
{
PipeWriter? writer = this.GetReceivedMessagePipeWriter();
- await writer.CompleteAsync().ConfigureAwait(false);
+ await writer.CompleteAsync(error).ConfigureAwait(false);
}
catch (ObjectDisposedException)
{
if (this.mxStreamIOWriter != null)
{
- await this.mxStreamIOWriter.CompleteAsync().ConfigureAwait(false);
+ await this.mxStreamIOWriter.CompleteAsync(error).ConfigureAwait(false);
}
}
}
@@ -472,7 +498,7 @@ internal void OnContentWritingCompleted()
{
if (this.mxStreamIOWriter != null)
{
- await this.mxStreamIOWriter.CompleteAsync().ConfigureAwait(false);
+ await this.mxStreamIOWriter.CompleteAsync(error).ConfigureAwait(false);
}
}
@@ -711,6 +737,11 @@ private async Task ProcessOutboundTransmissionsAsync()
break;
}
+ if (this.TraceSource!.Switch.ShouldTrace(TraceEventType.Information))
+ {
+ this.TraceSource.TraceEvent(TraceEventType.Information, (int)TraceEventId.FrameReceived, "Received buffer of length {0} inside process outbound", result.Buffer.Length);
+ }
+
if (result.IsCanceled)
{
// We've been asked to cancel. Presumably the channel has been disposed.
@@ -785,6 +816,12 @@ private async Task ProcessOutboundTransmissionsAsync()
catch (Exception ex)
{
await this.mxStreamIOReader!.CompleteAsync(ex).ConfigureAwait(false);
+ if (this.TraceSource!.Switch.ShouldTrace(TraceEventType.Information))
+ {
+ this.TraceSource.TraceEvent(TraceEventType.Information, (int)TraceEventId.WriteError, "Caught exception when processing outbound data");
+ }
+
+ this.MultiplexingStream.OnChannelWritingError(this, ex);
throw;
}
finally
diff --git a/src/Nerdbank.Streams/MultiplexingStream.ControlCode.cs b/src/Nerdbank.Streams/MultiplexingStream.ControlCode.cs
index 91a9b364..fb4bbdfa 100644
--- a/src/Nerdbank.Streams/MultiplexingStream.ControlCode.cs
+++ b/src/Nerdbank.Streams/MultiplexingStream.ControlCode.cs
@@ -44,6 +44,12 @@ internal enum ControlCode : byte
/// allowing them to send more data.
///
ContentProcessed,
+
+ ///
+ /// Sent when we encounter error writing data on a given channel and is sent before a
+ /// to indicate the reason for the content writing closure.
+ ///
+ ContentWritingError,
}
}
}
diff --git a/src/Nerdbank.Streams/MultiplexingStream.cs b/src/Nerdbank.Streams/MultiplexingStream.cs
index 00e4769f..e0a8bb0f 100644
--- a/src/Nerdbank.Streams/MultiplexingStream.cs
+++ b/src/Nerdbank.Streams/MultiplexingStream.cs
@@ -187,6 +187,7 @@ private enum TraceEventId
FrameReceived,
FrameSentPayload,
FrameReceivedPayload,
+ WriteError,
///
/// Raised when content arrives for a channel that has been disposed locally, resulting in discarding the content.
@@ -827,6 +828,9 @@ private async Task ReadStreamAsync()
case ControlCode.ContentWritingCompleted:
this.OnContentWritingCompleted(header.RequiredChannelId);
break;
+ case ControlCode.ContentWritingError:
+ this.OnContentWritingError(header.RequiredChannelId, frame.Value.Payload);
+ break;
case ControlCode.ChannelTerminated:
await this.OnChannelTerminatedAsync(header.RequiredChannelId).ConfigureAwait(false);
break;
@@ -900,6 +904,58 @@ private void OnContentWritingCompleted(QualifiedChannelId channelId)
channel.OnContentWritingCompleted();
}
+ private void OnContentWritingError(QualifiedChannelId channelId, ReadOnlySequence message)
+ {
+ if (this.TraceSource!.Switch.ShouldTrace(TraceEventType.Information))
+ {
+ this.TraceSource.TraceEvent(TraceEventType.Information, (int)TraceEventId.WriteError, "Received Write Error from channel {0}", channelId);
+ }
+
+ Channel? channel;
+ lock (this.syncObject)
+ {
+ if (this.openChannels.ContainsKey(channelId))
+ {
+ channel = this.openChannels[channelId];
+ }
+ else
+ {
+ channel = null;
+ }
+ }
+
+ if (channel == null)
+ {
+ if (this.TraceSource!.Switch.ShouldTrace(TraceEventType.Information))
+ {
+ this.TraceSource.TraceEvent(TraceEventType.Information, (int)TraceEventId.WriteError, "Found no open channels {0}", channelId);
+ }
+
+ // This is not an open channel so ignore the error message
+ return;
+ }
+
+ if (channelId.Source == ChannelSource.Local && !channel.IsAccepted)
+ {
+ throw new MultiplexingProtocolException($"Remote party indicated error writing to channel {channelId} before accepting it.");
+ }
+
+ // First close the channel and then throw the exception
+ string errorMessage = Encoding.Unicode.GetString(message.ToArray());
+ Exception remoteException = new MultiplexingProtocolException($"Remote party indicated writing error: {errorMessage}");
+
+ if (!this.channelsPendingTermination.Contains(channelId))
+ {
+ if (this.TraceSource!.Switch.ShouldTrace(TraceEventType.Information))
+ {
+ this.TraceSource.TraceEvent(TraceEventType.Information, (int)TraceEventId.WriteError, "Calling write complete for {0}", channel);
+ }
+
+ // We haven't already sent a termination frame so close the channel
+ channel.OnContentWritingCompleted(remoteException);
+ }
+ }
+
private async ValueTask OnContentAsync(FrameHeader header, ReadOnlySequence payload, CancellationToken cancellationToken)
{
Channel channel;
@@ -1088,7 +1144,6 @@ private void OnChannelDisposed(Channel channel)
/// Indicates that the local end will not be writing any more data to this channel,
/// leading to the transmission of a frame being sent for this channel.
///
- /// The channel whose writing has finished.
private void OnChannelWritingCompleted(Channel channel)
{
Requires.NotNull(channel, nameof(channel));
@@ -1102,6 +1157,43 @@ private void OnChannelWritingCompleted(Channel channel)
}
}
+ ///
+ /// Indicate that the local end encountered an error writing data to this channel,
+ /// leading to the transmission of a frame being sent to this channel.
+ ///
+ /// The channel we encountered writing the message to.
+ /// The error we encountered when trying to write to the channel.
+ private void OnChannelWritingError(Channel channel, Exception error)
+ {
+ Requires.NotNull(channel, nameof(channel));
+ lock (this.syncObject)
+ {
+ // Only inform the remote side if this channel has not already been terminated.
+ if (!this.channelsPendingTermination.Contains(channel.QualifiedId) && this.openChannels.ContainsKey(channel.QualifiedId))
+ {
+ string errorMessage = error.Message;
+ byte[] messageBytes = Encoding.Unicode.GetBytes(errorMessage);
+ ReadOnlySequence messageToSend = new ReadOnlySequence(messageBytes);
+ FrameHeader header = new FrameHeader
+ {
+ Code = ControlCode.ContentWritingError,
+ ChannelId = channel.QualifiedId,
+ };
+ if (this.TraceSource!.Switch.ShouldTrace(TraceEventType.Information))
+ {
+ this.TraceSource.TraceEvent(
+ TraceEventType.Information,
+ (int)TraceEventId.WriteError,
+ "Sending write error header {0} for channel {1}",
+ header,
+ channel);
+ }
+
+ this.SendFrame(header, messageToSend, CancellationToken.None);
+ }
+ }
+ }
+
private void SendFrame(ControlCode code, QualifiedChannelId channelId)
{
var header = new FrameHeader
diff --git a/test/Nerdbank.Streams.Tests/MultiplexingStreamTests.cs b/test/Nerdbank.Streams.Tests/MultiplexingStreamTests.cs
index 4a8c9f54..1a6c3c12 100644
--- a/test/Nerdbank.Streams.Tests/MultiplexingStreamTests.cs
+++ b/test/Nerdbank.Streams.Tests/MultiplexingStreamTests.cs
@@ -121,6 +121,28 @@ public async Task OfferReadOnlyDuplexPipe()
await Task.WhenAll(ch1.Completion, ch2.Completion).WithCancellation(this.TimeoutToken);
}
+ [Fact]
+ public async Task OfferPipeWithError()
+ {
+ try
+ {
+ // Prepare a readonly pipe that is already fully populated with data for the other end to read.
+ var pipe = new Pipe();
+ await pipe.Writer.WriteAsync(new byte[] { 1, 2, 3 }, this.TimeoutToken);
+ var writeException = new NullReferenceException("Write Error exception");
+ pipe.Writer.Complete(writeException);
+
+ MultiplexingStream.Channel? ch1 = this.mx1.CreateChannel(new MultiplexingStream.ChannelOptions { ExistingPipe = new DuplexPipe(pipe.Reader) });
+ await this.WaitForEphemeralChannelOfferToPropagateAsync();
+ MultiplexingStream.Channel? ch2 = this.mx2.AcceptChannel(ch1.QualifiedId.Id);
+ ReadResult readResult = await ch2.Input.ReadAsync(this.TimeoutToken);
+ }
+ catch (Exception error)
+ {
+ this.Logger.WriteLine("Encountered error inside offer pipe with error: " + error.Message);
+ }
+ }
+
[Fact]
public async Task OfferReadOnlyPipe()
{
From 439906b983bb829c1046c6c3072726e9d5e24d30 Mon Sep 17 00:00:00 2001
From: Devesh Sarda <32046390+sarda-devesh@users.noreply.github.com>
Date: Mon, 30 May 2022 17:13:17 -0700
Subject: [PATCH 02/77] Basic Error Test
---
.../MultiplexingStream.Channel.cs | 21 ++++---------------
.../MultiplexingStreamTests.cs | 6 +++++-
2 files changed, 9 insertions(+), 18 deletions(-)
diff --git a/src/Nerdbank.Streams/MultiplexingStream.Channel.cs b/src/Nerdbank.Streams/MultiplexingStream.Channel.cs
index 1654922b..47ea6d71 100644
--- a/src/Nerdbank.Streams/MultiplexingStream.Channel.cs
+++ b/src/Nerdbank.Streams/MultiplexingStream.Channel.cs
@@ -96,11 +96,6 @@ public class Channel : IDisposableObservable, IDuplexPipe
///
private bool isDisposed;
- ///
- /// Indicates whether we closed the writing channel due to an exception.
- ///
- private bool receivedContentWriteError;
-
///
/// The to use to get data to be transmitted over the .
///
@@ -457,20 +452,9 @@ internal async ValueTask OnContentAsync(FrameHeader header, ReadOnlySequenceIf we are closing the writing channel due to us receiving an error, defaults to null.
internal void OnContentWritingCompleted(Exception? error = null)
{
- if (this.receivedContentWriteError)
- {
- if (this.TraceSource!.Switch.ShouldTrace(TraceEventType.Information))
- {
- this.TraceSource.TraceEvent(TraceEventType.Information, (int)TraceEventId.WriteError, "Exiting from writing completed since it was already called");
- }
-
- // We received a content write error so we have already closed the channel
- return;
- }
if (error != null)
{
- this.receivedContentWriteError = true;
if (this.TraceSource!.Switch.ShouldTrace(TraceEventType.Information))
{
this.TraceSource.TraceEvent(TraceEventType.Information, (int)TraceEventId.WriteError, "Exception {0} passed into writing complete", error);
@@ -896,7 +880,10 @@ private void Fault(Exception exception)
}
this.mxStreamIOReader?.Complete(exception);
- this.Dispose();
+ if (!this.IsDisposed)
+ {
+ this.Dispose();
+ }
}
private void DisposeSelfOnFailure(Task task)
diff --git a/test/Nerdbank.Streams.Tests/MultiplexingStreamTests.cs b/test/Nerdbank.Streams.Tests/MultiplexingStreamTests.cs
index 1a6c3c12..2951ace9 100644
--- a/test/Nerdbank.Streams.Tests/MultiplexingStreamTests.cs
+++ b/test/Nerdbank.Streams.Tests/MultiplexingStreamTests.cs
@@ -124,6 +124,7 @@ public async Task OfferReadOnlyDuplexPipe()
[Fact]
public async Task OfferPipeWithError()
{
+ bool errorThrown = false;
try
{
// Prepare a readonly pipe that is already fully populated with data for the other end to read.
@@ -139,8 +140,11 @@ public async Task OfferPipeWithError()
}
catch (Exception error)
{
- this.Logger.WriteLine("Encountered error inside offer pipe with error: " + error.Message);
+ this.Logger.WriteLine("Encountered error inside Offer Pipe with error: " + error.Message);
+ errorThrown = true;
}
+
+ Assert.True(errorThrown);
}
[Fact]
From 50aeb0f81d196fc833178bd6f3a49c66458ac756 Mon Sep 17 00:00:00 2001
From: Devesh Sarda <32046390+sarda-devesh@users.noreply.github.com>
Date: Mon, 30 May 2022 17:23:36 -0700
Subject: [PATCH 03/77] Update test to make sure error message is received
---
.../MultiplexingStreamTests.cs | 26 ++++++++++---------
1 file changed, 14 insertions(+), 12 deletions(-)
diff --git a/test/Nerdbank.Streams.Tests/MultiplexingStreamTests.cs b/test/Nerdbank.Streams.Tests/MultiplexingStreamTests.cs
index 2951ace9..072e08f9 100644
--- a/test/Nerdbank.Streams.Tests/MultiplexingStreamTests.cs
+++ b/test/Nerdbank.Streams.Tests/MultiplexingStreamTests.cs
@@ -125,23 +125,25 @@ public async Task OfferReadOnlyDuplexPipe()
public async Task OfferPipeWithError()
{
bool errorThrown = false;
+ string errorMessage = "Hello World";
+
+ // Prepare a readonly pipe that is already fully populated with data for the other end to read.
+ var pipe = new Pipe();
+ await pipe.Writer.WriteAsync(new byte[] { 1, 2, 3 }, this.TimeoutToken);
+ var writeException = new NullReferenceException(errorMessage);
+ pipe.Writer.Complete(writeException);
+
+ MultiplexingStream.Channel? ch1 = this.mx1.CreateChannel(new MultiplexingStream.ChannelOptions { ExistingPipe = new DuplexPipe(pipe.Reader) });
+ await this.WaitForEphemeralChannelOfferToPropagateAsync();
+ MultiplexingStream.Channel? ch2 = this.mx2.AcceptChannel(ch1.QualifiedId.Id);
+
try
{
- // Prepare a readonly pipe that is already fully populated with data for the other end to read.
- var pipe = new Pipe();
- await pipe.Writer.WriteAsync(new byte[] { 1, 2, 3 }, this.TimeoutToken);
- var writeException = new NullReferenceException("Write Error exception");
- pipe.Writer.Complete(writeException);
-
- MultiplexingStream.Channel? ch1 = this.mx1.CreateChannel(new MultiplexingStream.ChannelOptions { ExistingPipe = new DuplexPipe(pipe.Reader) });
- await this.WaitForEphemeralChannelOfferToPropagateAsync();
- MultiplexingStream.Channel? ch2 = this.mx2.AcceptChannel(ch1.QualifiedId.Id);
ReadResult readResult = await ch2.Input.ReadAsync(this.TimeoutToken);
}
- catch (Exception error)
+ catch (MultiplexingProtocolException error)
{
- this.Logger.WriteLine("Encountered error inside Offer Pipe with error: " + error.Message);
- errorThrown = true;
+ errorThrown = error.Message.Contains(errorMessage);
}
Assert.True(errorThrown);
From 5410649d50a7ae647e4df4b3a4d54f3ee1ed2cd1 Mon Sep 17 00:00:00 2001
From: Devesh Sarda <32046390+sarda-devesh@users.noreply.github.com>
Date: Wed, 1 Jun 2022 16:10:57 -0700
Subject: [PATCH 04/77] Added while loop in error test
---
.../MultiplexingStreamTests.cs | 28 ++++++++++++++-----
1 file changed, 21 insertions(+), 7 deletions(-)
diff --git a/test/Nerdbank.Streams.Tests/MultiplexingStreamTests.cs b/test/Nerdbank.Streams.Tests/MultiplexingStreamTests.cs
index 072e08f9..336c374d 100644
--- a/test/Nerdbank.Streams.Tests/MultiplexingStreamTests.cs
+++ b/test/Nerdbank.Streams.Tests/MultiplexingStreamTests.cs
@@ -127,7 +127,7 @@ public async Task OfferPipeWithError()
bool errorThrown = false;
string errorMessage = "Hello World";
- // Prepare a readonly pipe that is already fully populated with data for the other end to read.
+ // Prepare a readonly pipe that is already fully populated with data but is completed with an exception
var pipe = new Pipe();
await pipe.Writer.WriteAsync(new byte[] { 1, 2, 3 }, this.TimeoutToken);
var writeException = new NullReferenceException(errorMessage);
@@ -137,13 +137,27 @@ public async Task OfferPipeWithError()
await this.WaitForEphemeralChannelOfferToPropagateAsync();
MultiplexingStream.Channel? ch2 = this.mx2.AcceptChannel(ch1.QualifiedId.Id);
- try
- {
- ReadResult readResult = await ch2.Input.ReadAsync(this.TimeoutToken);
- }
- catch (MultiplexingProtocolException error)
+ bool continueReading = true;
+ while (continueReading)
{
- errorThrown = error.Message.Contains(errorMessage);
+ try
+ {
+ ReadResult readResult = await ch2.Input.ReadAsync(this.TimeoutToken);
+ if (readResult.IsCanceled || readResult.IsCompleted)
+ {
+ continueReading = false;
+ }
+
+ ch2.Input.AdvanceTo(readResult.Buffer.End);
+ }
+ catch (MultiplexingProtocolException error)
+ {
+ errorThrown = error.Message.Contains(errorMessage);
+ if (errorThrown)
+ {
+ continueReading = false;
+ }
+ }
}
Assert.True(errorThrown);
From 80b899e47491dec8dc6e9c4fa4580b4821908a57 Mon Sep 17 00:00:00 2001
From: Devesh Sarda <32046390+sarda-devesh@users.noreply.github.com>
Date: Sun, 5 Jun 2022 00:01:18 -0700
Subject: [PATCH 05/77] Implemented basic interface for error serialization
---
.../MultiplexingStream.Channel.cs | 1 -
.../MultiplexingStream.ControlCode.cs | 3 +-
.../MultiplexingStream.WriteError.cs | 36 +++++++++++++++++++
src/Nerdbank.Streams/MultiplexingStream.cs | 6 ++--
src/Nerdbank.Streams/Nerdbank.Streams.csproj | 2 ++
.../netstandard2.0/PublicAPI.Unshipped.txt | 3 ++
test/IsolatedTestHost/IsolatedTestHost.csproj | 2 ++
7 files changed, 49 insertions(+), 4 deletions(-)
create mode 100644 src/Nerdbank.Streams/MultiplexingStream.WriteError.cs
diff --git a/src/Nerdbank.Streams/MultiplexingStream.Channel.cs b/src/Nerdbank.Streams/MultiplexingStream.Channel.cs
index 47ea6d71..32368536 100644
--- a/src/Nerdbank.Streams/MultiplexingStream.Channel.cs
+++ b/src/Nerdbank.Streams/MultiplexingStream.Channel.cs
@@ -452,7 +452,6 @@ internal async ValueTask OnContentAsync(FrameHeader header, ReadOnlySequenceIf we are closing the writing channel due to us receiving an error, defaults to null.
internal void OnContentWritingCompleted(Exception? error = null)
{
-
if (error != null)
{
if (this.TraceSource!.Switch.ShouldTrace(TraceEventType.Information))
diff --git a/src/Nerdbank.Streams/MultiplexingStream.ControlCode.cs b/src/Nerdbank.Streams/MultiplexingStream.ControlCode.cs
index fb4bbdfa..94df33d3 100644
--- a/src/Nerdbank.Streams/MultiplexingStream.ControlCode.cs
+++ b/src/Nerdbank.Streams/MultiplexingStream.ControlCode.cs
@@ -47,7 +47,8 @@ internal enum ControlCode : byte
///
/// Sent when we encounter error writing data on a given channel and is sent before a
- /// to indicate the reason for the content writing closure.
+ /// to indicate the reason
+ /// for the content writing closure.
///
ContentWritingError,
}
diff --git a/src/Nerdbank.Streams/MultiplexingStream.WriteError.cs b/src/Nerdbank.Streams/MultiplexingStream.WriteError.cs
new file mode 100644
index 00000000..08de0066
--- /dev/null
+++ b/src/Nerdbank.Streams/MultiplexingStream.WriteError.cs
@@ -0,0 +1,36 @@
+// Copyright (c) Andrew Arnott. All rights reserved.
+// Licensed under the MIT license. See LICENSE file in the project root for full license information.
+
+namespace Nerdbank.Streams
+{
+ using MessagePack;
+
+ ///
+ /// Contains the nested type.
+ ///
+ public partial class MultiplexingStream
+ {
+ ///
+ /// A class containing information about a write error and which is sent to the
+ /// remote alongside .
+ ///
+ [MessagePackObject]
+ public class WriteError
+ {
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The error message we want to send to the receiver.
+ public WriteError(string message)
+ {
+ this.ErrorMessage = message;
+ }
+
+ ///
+ /// Gets the error message that we want to send to receiver.
+ ///
+ [Key(0)]
+ public string ErrorMessage { get; }
+ }
+ }
+}
diff --git a/src/Nerdbank.Streams/MultiplexingStream.cs b/src/Nerdbank.Streams/MultiplexingStream.cs
index e0a8bb0f..750f744e 100644
--- a/src/Nerdbank.Streams/MultiplexingStream.cs
+++ b/src/Nerdbank.Streams/MultiplexingStream.cs
@@ -941,7 +941,8 @@ private void OnContentWritingError(QualifiedChannelId channelId, ReadOnlySequenc
}
// First close the channel and then throw the exception
- string errorMessage = Encoding.Unicode.GetString(message.ToArray());
+ WriteError errorClass = MessagePackSerializer.Deserialize(message);
+ string errorMessage = errorClass.ErrorMessage;
Exception remoteException = new MultiplexingProtocolException($"Remote party indicated writing error: {errorMessage}");
if (!this.channelsPendingTermination.Contains(channelId))
@@ -1172,7 +1173,8 @@ private void OnChannelWritingError(Channel channel, Exception error)
if (!this.channelsPendingTermination.Contains(channel.QualifiedId) && this.openChannels.ContainsKey(channel.QualifiedId))
{
string errorMessage = error.Message;
- byte[] messageBytes = Encoding.Unicode.GetBytes(errorMessage);
+ WriteError errorClass = new WriteError(errorMessage);
+ byte[] messageBytes = MessagePackSerializer.Serialize(errorClass);
ReadOnlySequence messageToSend = new ReadOnlySequence(messageBytes);
FrameHeader header = new FrameHeader
{
diff --git a/src/Nerdbank.Streams/Nerdbank.Streams.csproj b/src/Nerdbank.Streams/Nerdbank.Streams.csproj
index c223bca1..d895bab5 100644
--- a/src/Nerdbank.Streams/Nerdbank.Streams.csproj
+++ b/src/Nerdbank.Streams/Nerdbank.Streams.csproj
@@ -9,6 +9,8 @@
+
+
diff --git a/src/Nerdbank.Streams/netstandard2.0/PublicAPI.Unshipped.txt b/src/Nerdbank.Streams/netstandard2.0/PublicAPI.Unshipped.txt
index b00dc10d..207e2a19 100644
--- a/src/Nerdbank.Streams/netstandard2.0/PublicAPI.Unshipped.txt
+++ b/src/Nerdbank.Streams/netstandard2.0/PublicAPI.Unshipped.txt
@@ -1,4 +1,7 @@
Nerdbank.Streams.BufferWriterExtensions
+Nerdbank.Streams.MultiplexingStream.WriteError
+Nerdbank.Streams.MultiplexingStream.WriteError.ErrorMessage.get -> string!
+Nerdbank.Streams.MultiplexingStream.WriteError.WriteError(string! message) -> void
Nerdbank.Streams.ReadOnlySequenceExtensions
Nerdbank.Streams.StreamPipeReader
Nerdbank.Streams.StreamPipeReader.Read() -> System.IO.Pipelines.ReadResult
diff --git a/test/IsolatedTestHost/IsolatedTestHost.csproj b/test/IsolatedTestHost/IsolatedTestHost.csproj
index e6b5dd16..ac1cd87f 100644
--- a/test/IsolatedTestHost/IsolatedTestHost.csproj
+++ b/test/IsolatedTestHost/IsolatedTestHost.csproj
@@ -6,6 +6,8 @@
+
+
From e9b529f024260e5c3876f942b571603116c1f772 Mon Sep 17 00:00:00 2001
From: Devesh Sarda <32046390+sarda-devesh@users.noreply.github.com>
Date: Thu, 9 Jun 2022 14:46:25 -0700
Subject: [PATCH 06/77] Added in locking for Dispose in
MultiplexingStream.Channel and reverted back to not using message pack
---
.../MultiplexingStream.Channel.cs | 143 +++++++++++-------
.../MultiplexingStream.WriteError.cs | 2 -
src/Nerdbank.Streams/MultiplexingStream.cs | 6 +-
src/Nerdbank.Streams/Nerdbank.Streams.csproj | 2 -
.../MultiplexingStreamTests.cs | 5 +-
5 files changed, 88 insertions(+), 70 deletions(-)
diff --git a/src/Nerdbank.Streams/MultiplexingStream.Channel.cs b/src/Nerdbank.Streams/MultiplexingStream.Channel.cs
index 32368536..687679fc 100644
--- a/src/Nerdbank.Streams/MultiplexingStream.Channel.cs
+++ b/src/Nerdbank.Streams/MultiplexingStream.Channel.cs
@@ -317,65 +317,75 @@ private long RemoteWindowRemaining
///
public void Dispose()
{
- if (!this.IsDisposed)
+
+ bool hasBeenDisposed;
+ lock (this.SyncObject)
{
- // The code in this delegate needs to happen in several branches including possibly asynchronously.
- // We carefully define it here with no closure so that the C# compiler generates a static field for the delegate
- // thus avoiding any extra allocations from reusing code in this way.
- Action