Skip to content
This repository has been archived by the owner on Jun 16, 2022. It is now read-only.

Commit

Permalink
Updated protobuf package.
Browse files Browse the repository at this point in the history
Changed RPC processing to be sequential and to run on dedicated threads instead of the thread pool.
TODO: Fix broken tests.
  • Loading branch information
DJGosnell committed Jun 20, 2018
1 parent 2250aeb commit 695022f
Show file tree
Hide file tree
Showing 11 changed files with 153 additions and 119 deletions.
4 changes: 2 additions & 2 deletions src/DtronixMessageQueue.Tests.Gui/Tests/MqBaseTestSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ protected byte[] RandomBytes(int len)
return val;
}

protected override void Send(byte[] buffer, int offset, int count)
protected override void Send(byte[] buffer, int offset, int count, bool pad)
{
base.Send(buffer, offset, count);
base.Send(buffer, offset, count, pad);
Interlocked.Add(ref TotalSent, count);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
<OutputPath>bin\Nuget\</OutputPath>
</PropertyGroup>
<ItemGroup>
<Reference Include="protobuf-net, Version=2.3.12.0, Culture=neutral, PublicKeyToken=257b51d87d2e4d67, processorArchitecture=MSIL">
<HintPath>..\packages\protobuf-net.2.3.12\lib\net40\protobuf-net.dll</HintPath>
<Reference Include="protobuf-net, Version=2.3.13.0, Culture=neutral, PublicKeyToken=257b51d87d2e4d67, processorArchitecture=MSIL">
<HintPath>..\packages\protobuf-net.2.3.13\lib\net40\protobuf-net.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.Configuration" />
Expand Down
2 changes: 1 addition & 1 deletion src/DtronixMessageQueue.Tests.Performance/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ static void Main(string[] args)
Console.WriteLine($"DMQPerf.exe {string.Join(" ", args)}");

Console.WriteLine("MQ Performance tests.\r\n");
new MqPerformanceTest().StartTest();
//new MqPerformanceTest().StartTest();

Console.WriteLine("RPC Performance tests.\r\n");
new RpcPerformanceTest(args);
Expand Down
2 changes: 1 addition & 1 deletion src/DtronixMessageQueue.Tests.Performance/packages.config
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="protobuf-net" version="2.3.12" targetFramework="net46" />
<package id="protobuf-net" version="2.3.13" targetFramework="net46" />
</packages>
2 changes: 0 additions & 2 deletions src/DtronixMessageQueue.Tests/Rpc/RpcClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,8 @@ public void Server_requests_authentication()
{
Server.Config.RequireAuthentication = true;


Client.Authenticate += (sender, e) => { TestComplete.Set(); };


StartAndWait();
}

Expand Down
4 changes: 2 additions & 2 deletions src/DtronixMessageQueue/DtronixMessageQueue.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
</PropertyGroup>
<ItemGroup>
<Reference Include="protobuf-net, Version=2.3.12.0, Culture=neutral, PublicKeyToken=257b51d87d2e4d67, processorArchitecture=MSIL">
<HintPath>..\packages\protobuf-net.2.3.12\lib\net40\protobuf-net.dll</HintPath>
<Reference Include="protobuf-net, Version=2.3.13.0, Culture=neutral, PublicKeyToken=257b51d87d2e4d67, processorArchitecture=MSIL">
<HintPath>..\packages\protobuf-net.2.3.13\lib\net40\protobuf-net.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.Configuration" />
Expand Down
198 changes: 95 additions & 103 deletions src/DtronixMessageQueue/Rpc/MessageHandlers/RpcCallMessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,117 +77,113 @@ public void MethodCancelAction(byte actionId, MqMessage message)
/// <param name="message">Message containing the Rpc call.</param>
private void ProcessRpcCallAction(byte actionId, MqMessage message)
{
// Execute the processing on the worker thread.
Task.Run(() =>
{
var messageType = (RpcCallMessageAction)actionId;
var messageType = (RpcCallMessageAction) actionId;

// Retrieve a serialization cache to work with.
var serialization = Session.SerializationCache.Get(message);
ushort recMessageReturnId = 0;
// Retrieve a serialization cache to work with.
var serialization = Session.SerializationCache.Get(message);
ushort recMessageReturnId = 0;

try
try
{
// Determine if this call has a return value.
if (messageType == RpcCallMessageAction.MethodCall)
{
// Determine if this call has a return value.
if (messageType == RpcCallMessageAction.MethodCall)
{
recMessageReturnId = serialization.MessageReader.ReadUInt16();
}

// Read the string service name, method and number of arguments.
var recServiceName = serialization.MessageReader.ReadString();
var recMethodName = serialization.MessageReader.ReadString();
var recArgumentCount = serialization.MessageReader.ReadByte();

// Get the method info from the cache.
var serviceMethod = Session.ServiceMethodCache.GetMethodInfo(recServiceName, recMethodName);


// Verify that the requested service exists.
if (serviceMethod == null || !_serviceInstances.ContainsKey(recServiceName))
throw new Exception($"Service '{recServiceName}' does not exist.");

ResponseWaitHandle cancellationWait = null;
recMessageReturnId = serialization.MessageReader.ReadUInt16();
}

// If the past parameter is a cancellation token, setup a return wait for this call to allow for remote cancellation.
if (recMessageReturnId != 0 && serviceMethod.HasCancellation)
{
cancellationWait = _remoteWaitOperations.CreateWaitHandle(recMessageReturnId);
// Read the string service name, method and number of arguments.
var recServiceName = serialization.MessageReader.ReadString();
var recMethodName = serialization.MessageReader.ReadString();
var recArgumentCount = serialization.MessageReader.ReadByte();

cancellationWait.TokenSource = new CancellationTokenSource();
cancellationWait.Token = cancellationWait.TokenSource.Token;
}
// Get the method info from the cache.
var serviceMethod = Session.ServiceMethodCache.GetMethodInfo(recServiceName, recMethodName);

// Setup the parameters to pass to the invoked method.
object[] parameters = new object[recArgumentCount + (cancellationWait == null ? 0 : 1)];

// Determine if we have any parameters to pass to the invoked method.
if (recArgumentCount > 0)
{
serialization.PrepareDeserializeReader();
// Verify that the requested service exists.
if (serviceMethod == null || !_serviceInstances.ContainsKey(recServiceName))
throw new Exception($"Service '{recServiceName}' does not exist.");

// Parse each parameter to the parameter list.
for (var i = 0; i < recArgumentCount; i++)
parameters[i] = serialization.DeserializeFromReader(serviceMethod.ParameterTypes[i], i);
}
ResponseWaitHandle cancellationWait = null;

// Add the cancellation token to the parameters.
if (cancellationWait != null)
parameters[parameters.Length - 1] = cancellationWait.Token;
// If the past parameter is a cancellation token, setup a return wait for this call to allow for remote cancellation.
if (recMessageReturnId != 0 && serviceMethod.HasCancellation)
{
cancellationWait = _remoteWaitOperations.CreateWaitHandle(recMessageReturnId);

cancellationWait.TokenSource = new CancellationTokenSource();
cancellationWait.Token = cancellationWait.TokenSource.Token;
}

object returnValue;
try
{
// Invoke the requested method.
returnValue = serviceMethod.Invoke(_serviceInstances[recServiceName], parameters);
}
catch (Exception ex)
{
// Determine if this method was waited on. If it was and an exception was thrown,
// Let the recipient session know an exception was thrown.
if (recMessageReturnId != 0 &&
ex.InnerException?.GetType() != typeof(OperationCanceledException))
{
SendRpcException(serialization, ex, recMessageReturnId);
}
return;
}
finally
{
_remoteWaitOperations.Remove(recMessageReturnId);
}
// Setup the parameters to pass to the invoked method.
var parameters = new object[recArgumentCount + (cancellationWait == null ? 0 : 1)];

// Determine if we have any parameters to pass to the invoked method.
if (recArgumentCount > 0)
{
serialization.PrepareDeserializeReader();

// Determine what to do with the return value.
if (messageType == RpcCallMessageAction.MethodCall)
{
// Reset the stream.
serialization.Stream.SetLength(0);
serialization.MessageWriter.Clear();
// Parse each parameter to the parameter list.
for (var i = 0; i < recArgumentCount; i++)
parameters[i] = serialization.DeserializeFromReader(serviceMethod.ParameterTypes[i], i);
}

// Write the return id.
serialization.MessageWriter.Write(recMessageReturnId);
// Add the cancellation token to the parameters.
if (cancellationWait != null)
parameters[parameters.Length - 1] = cancellationWait.Token;

// Serialize the return value and add it to the stream.
serialization.SerializeToWriter(returnValue, 0);

// Send the return value message to the recipient.
SendHandlerMessage((byte)RpcCallMessageAction.MethodReturn,
serialization.MessageWriter.ToMessage(true));
}
object returnValue;
try
{
// Invoke the requested method.
returnValue = serviceMethod.Invoke(_serviceInstances[recServiceName], parameters);
}
catch (Exception ex)
{
// If an exception occurred, notify the recipient connection.
SendRpcException(serialization, ex, recMessageReturnId);
// Determine if this method was waited on. If it was and an exception was thrown,
// Let the recipient session know an exception was thrown.
if (recMessageReturnId != 0 &&
ex.InnerException?.GetType() != typeof(OperationCanceledException))
{
SendRpcException(serialization, ex, recMessageReturnId);
}
return;
}
finally
{
// Return the serialization to the cache to be reused.
Session.SerializationCache.Put(serialization);
_remoteWaitOperations.Remove(recMessageReturnId);
}
});


// Determine what to do with the return value.
if (messageType == RpcCallMessageAction.MethodCall)
{
// Reset the stream.
serialization.Stream.SetLength(0);
serialization.MessageWriter.Clear();

// Write the return id.
serialization.MessageWriter.Write(recMessageReturnId);

// Serialize the return value and add it to the stream.
serialization.SerializeToWriter(returnValue, 0);

// Send the return value message to the recipient.
SendHandlerMessage((byte) RpcCallMessageAction.MethodReturn,
serialization.MessageWriter.ToMessage(true));
}
}
catch (Exception ex)
{
// If an exception occurred, notify the recipient connection.
SendRpcException(serialization, ex, recMessageReturnId);
}
finally
{
// Return the serialization to the cache to be reused.
Session.SerializationCache.Put(serialization);
}
}


Expand All @@ -198,25 +194,21 @@ private void ProcessRpcCallAction(byte actionId, MqMessage message)
/// <param name="message">Message containing the frames for the return value.</param>
private void ProcessRpcReturnAction(byte actionId, MqMessage message)
{
// Execute the processing on the worker thread.
Task.Run(() =>
{
// Read the return Id.
var returnId = message[0].ReadUInt16(0);
// Read the return Id.
var returnId = message[0].ReadUInt16(0);


ResponseWaitHandle callWaitHandle = ProxyWaitOperations.Remove(returnId);
// Try to get the outstanding wait from the return id. If it does not exist, the has already completed.
if (callWaitHandle != null)
{
callWaitHandle.Message = message;
ResponseWaitHandle callWaitHandle = ProxyWaitOperations.Remove(returnId);
// Try to get the outstanding wait from the return id. If it does not exist, the has already completed.
if (callWaitHandle != null)
{
callWaitHandle.Message = message;

callWaitHandle.MessageActionId = actionId;
callWaitHandle.MessageActionId = actionId;

// Release the wait event.
callWaitHandle.ReturnResetEvent.Set();
}
});
// Release the wait event.
callWaitHandle.ReturnResetEvent.Set();
}
}

/// <summary>
Expand Down
11 changes: 11 additions & 0 deletions src/DtronixMessageQueue/Rpc/RpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ public class RpcClient<TSession, TConfig> : MqClient<TSession, TConfig>
/// </summary>
public RpcServerInfoDataContract ServerInfo { get; set; }

public ActionProcessor<Guid> RpcActionProcessor { get; }

/// <summary>
/// Called to send authentication data to the server.
/// </summary>
Expand All @@ -28,12 +30,21 @@ public class RpcClient<TSession, TConfig> : MqClient<TSession, TConfig>
/// </summary>
public event EventHandler<SessionEventArgs<TSession, TConfig>> Ready;



/// <summary>
/// Initializes a new instance of a Rpc client.
/// </summary>
/// <param name="config">Configurations for this client to use.</param>
public RpcClient(TConfig config) : base(config)
{
RpcActionProcessor = new ActionProcessor<Guid>(new ActionProcessor<Guid>.Config
{
StartThreads = 1,
ThreadName = "RpcProcessor-Client"
});

RpcActionProcessor.Start();
}

protected override TSession CreateSession(System.Net.Sockets.Socket sessionSocket)
Expand Down
8 changes: 8 additions & 0 deletions src/DtronixMessageQueue/Rpc/RpcServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ public class RpcServer<TSession, TConfig> : MqServer<TSession, TConfig>
/// </summary>
public RpcServerInfoDataContract ServerInfo { get; }

public ActionProcessor<Guid> RpcActionProcessor { get; }

/// <summary>
/// Called to send authentication data to the server.
/// </summary>
Expand All @@ -44,6 +46,12 @@ public class RpcServer<TSession, TConfig> : MqServer<TSession, TConfig>
public RpcServer(TConfig config, RpcServerInfoDataContract serverInfo) : base(config)
{
ServerInfo = serverInfo ?? new RpcServerInfoDataContract();
RpcActionProcessor = new ActionProcessor<Guid>(new ActionProcessor<Guid>.Config
{
ThreadName = "RpcProcessor-Server"
});

RpcActionProcessor.Start();
}

/// <summary>
Expand Down
Loading

0 comments on commit 695022f

Please sign in to comment.