From 94cd29ea76e26393cf074f0e7251abd2a3f064d8 Mon Sep 17 00:00:00 2001 From: Jason Brooks Date: Fri, 8 Nov 2024 15:45:10 +1100 Subject: [PATCH] Ensure synchronous handling does not block new requests and allow clearing SyncInfoCache --- src/ServiceWire/StreamingChannel.cs | 5 ++ src/ServiceWire/TcpIp/TcpHost.cs | 89 +++++++++++++++++------------ 2 files changed, 56 insertions(+), 38 deletions(-) diff --git a/src/ServiceWire/StreamingChannel.cs b/src/ServiceWire/StreamingChannel.cs index 6f7c6d2..de3ad51 100644 --- a/src/ServiceWire/StreamingChannel.cs +++ b/src/ServiceWire/StreamingChannel.cs @@ -28,6 +28,11 @@ public StreamingChannel(ISerializer serializer, ICompressor compressor) _parameterTransferHelper = new ParameterTransferHelper(_serializer, _compressor); } + public static void ClearCachedSyncInfo() + { + SyncInfoCache.Clear(); + } + protected virtual IChannelIdentifier ChannelIdentifier { get; } /// diff --git a/src/ServiceWire/TcpIp/TcpHost.cs b/src/ServiceWire/TcpIp/TcpHost.cs index 5b7015f..ead07d9 100644 --- a/src/ServiceWire/TcpIp/TcpHost.cs +++ b/src/ServiceWire/TcpIp/TcpHost.cs @@ -94,9 +94,11 @@ private void Listen() _acceptEventArg.AcceptSocket = null; try { + // If AcceptAsync returns false - it must be handled synchronously. + // https://learn.microsoft.com/en-us/dotnet/api/system.net.sockets.socket.acceptasync#system-net-sockets-socket-acceptasync(system-net-sockets-socketasynceventargs) if (!_listener.AcceptAsync(_acceptEventArg)) { - AcceptNewClient(_acceptEventArg); + AcceptNewClient(_acceptEventArg, true); } } catch (Exception ex) @@ -117,10 +119,10 @@ private void Listen() private void acceptEventArg_Completed(object sender, SocketAsyncEventArgs e) { - AcceptNewClient(e); + AcceptNewClient(e, false); } - private void AcceptNewClient(SocketAsyncEventArgs e) + private void AcceptNewClient(SocketAsyncEventArgs e, bool processRequestsOnTask) { try { @@ -130,54 +132,65 @@ private void AcceptNewClient(SocketAsyncEventArgs e) return; } - Socket activeSocket = null; - BufferedStream stream = null; - try - { - activeSocket = e.AcceptSocket; + Socket activeSocket = e.AcceptSocket; - // Signal the listening thread to continue. - _listenResetEvent.Set(); + // Signal the listening thread to continue. + _listenResetEvent.Set(); - stream = new BufferedStream(new NetworkStream(activeSocket), 8192); - base.ProcessRequest(stream); + if (processRequestsOnTask) + { + Task.Factory.StartNew(() => StartProcessingRequestsOnSocket(activeSocket), TaskCreationOptions.LongRunning); + } + else + { + StartProcessingRequestsOnSocket(activeSocket); } - catch (Exception ex) + } + catch (Exception fatalException) + { + _log.Fatal("AcceptNewClient fatal error: {0}", fatalException.ToString().Flatten()); + } + } + + private void StartProcessingRequestsOnSocket(Socket activeSocket) + { + BufferedStream stream = null; + try + { + stream = new BufferedStream(new NetworkStream(activeSocket), 8192); + base.ProcessRequest(stream); + } + catch (Exception ex) + { + _log.Error("AcceptNewClient_ProcessRequest error: {0}", ex.ToString().Flatten()); + } + finally + { + if (null != stream) { - _log.Error("AcceptNewClient_ProcessRequest error: {0}", ex.ToString().Flatten()); + stream.Close(); } - finally + if (null != activeSocket && activeSocket.Connected) { - if (null != stream) + try { - stream.Close(); + activeSocket.Shutdown(SocketShutdown.Both); } - if (null != activeSocket && activeSocket.Connected) + catch (Exception shutdownException) { - try - { - activeSocket.Shutdown(SocketShutdown.Both); - } - catch (Exception shutdownException) - { - _log.Error("AcceptNewClient_ActiveSocketShutdown error: {0}", shutdownException.ToString().Flatten()); - } + _log.Error("AcceptNewClient_ActiveSocketShutdown error: {0}", shutdownException.ToString().Flatten()); + } - try - { - activeSocket.Close(); - } - catch (Exception closeException) - { - _log.Error("AcceptNewClient_ActiveSocketClose error: {0}", closeException.ToString().Flatten()); - } + try + { + activeSocket.Close(); + } + catch (Exception closeException) + { + _log.Error("AcceptNewClient_ActiveSocketClose error: {0}", closeException.ToString().Flatten()); } } } - catch (Exception fatalException) - { - _log.Fatal("AcceptNewClient fatal error: {0}", fatalException.ToString().Flatten()); - } } #region IDisposable Members