Skip to content

Commit

Permalink
Merge pull request #94 from jbrookssmokeball/master
Browse files Browse the repository at this point in the history
Ensure synchronous handling does not block new requests and allow clearing SyncInfoCache
  • Loading branch information
tylerje authored Dec 1, 2024
2 parents 7d3c406 + 94cd29e commit 394cc5e
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 38 deletions.
5 changes: 5 additions & 0 deletions src/ServiceWire/StreamingChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ public StreamingChannel(ISerializer serializer, ICompressor compressor)
_parameterTransferHelper = new ParameterTransferHelper(_serializer, _compressor);
}

public static void ClearCachedSyncInfo()
{
SyncInfoCache.Clear();
}

protected virtual IChannelIdentifier ChannelIdentifier { get; }

/// <summary>
Expand Down
89 changes: 51 additions & 38 deletions src/ServiceWire/TcpIp/TcpHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,11 @@ private void Listen()
_acceptEventArg.AcceptSocket = null;
try
{
// If AcceptAsync returns false - it must be handled synchronously.
// https://learn.microsoft.com/en-us/dotnet/api/system.net.sockets.socket.acceptasync#system-net-sockets-socket-acceptasync(system-net-sockets-socketasynceventargs)
if (!_listener.AcceptAsync(_acceptEventArg))
{
AcceptNewClient(_acceptEventArg);
AcceptNewClient(_acceptEventArg, true);
}
}
catch (Exception ex)
Expand All @@ -117,10 +119,10 @@ private void Listen()

private void acceptEventArg_Completed(object sender, SocketAsyncEventArgs e)
{
AcceptNewClient(e);
AcceptNewClient(e, false);
}

private void AcceptNewClient(SocketAsyncEventArgs e)
private void AcceptNewClient(SocketAsyncEventArgs e, bool processRequestsOnTask)
{
try
{
Expand All @@ -130,54 +132,65 @@ private void AcceptNewClient(SocketAsyncEventArgs e)
return;
}

Socket activeSocket = null;
BufferedStream stream = null;
try
{
activeSocket = e.AcceptSocket;
Socket activeSocket = e.AcceptSocket;

// Signal the listening thread to continue.
_listenResetEvent.Set();
// Signal the listening thread to continue.
_listenResetEvent.Set();

stream = new BufferedStream(new NetworkStream(activeSocket), 8192);
base.ProcessRequest(stream);
if (processRequestsOnTask)
{
Task.Factory.StartNew(() => StartProcessingRequestsOnSocket(activeSocket), TaskCreationOptions.LongRunning);
}
else
{
StartProcessingRequestsOnSocket(activeSocket);
}
catch (Exception ex)
}
catch (Exception fatalException)
{
_log.Fatal("AcceptNewClient fatal error: {0}", fatalException.ToString().Flatten());
}
}

private void StartProcessingRequestsOnSocket(Socket activeSocket)
{
BufferedStream stream = null;
try
{
stream = new BufferedStream(new NetworkStream(activeSocket), 8192);
base.ProcessRequest(stream);
}
catch (Exception ex)
{
_log.Error("AcceptNewClient_ProcessRequest error: {0}", ex.ToString().Flatten());
}
finally
{
if (null != stream)
{
_log.Error("AcceptNewClient_ProcessRequest error: {0}", ex.ToString().Flatten());
stream.Close();
}
finally
if (null != activeSocket && activeSocket.Connected)
{
if (null != stream)
try
{
stream.Close();
activeSocket.Shutdown(SocketShutdown.Both);
}
if (null != activeSocket && activeSocket.Connected)
catch (Exception shutdownException)
{
try
{
activeSocket.Shutdown(SocketShutdown.Both);
}
catch (Exception shutdownException)
{
_log.Error("AcceptNewClient_ActiveSocketShutdown error: {0}", shutdownException.ToString().Flatten());
}
_log.Error("AcceptNewClient_ActiveSocketShutdown error: {0}", shutdownException.ToString().Flatten());
}

try
{
activeSocket.Close();
}
catch (Exception closeException)
{
_log.Error("AcceptNewClient_ActiveSocketClose error: {0}", closeException.ToString().Flatten());
}
try
{
activeSocket.Close();
}
catch (Exception closeException)
{
_log.Error("AcceptNewClient_ActiveSocketClose error: {0}", closeException.ToString().Flatten());
}
}
}
catch (Exception fatalException)
{
_log.Fatal("AcceptNewClient fatal error: {0}", fatalException.ToString().Flatten());
}
}

#region IDisposable Members
Expand Down

0 comments on commit 394cc5e

Please sign in to comment.