Skip to content
This repository has been archived by the owner on Jun 16, 2022. It is now read-only.

Commit

Permalink
Added documentation.
Browse files Browse the repository at this point in the history
Started sub-threadding work on RPC calls.
  • Loading branch information
DJGosnell committed Sep 12, 2016
1 parent 750a423 commit 2af8c77
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 44 deletions.
25 changes: 25 additions & 0 deletions DtronixMessageQueue.Rpc.Tests/RpcClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,30 @@ public void Client_calls_proxy_method_sequential() {

StartAndWait();
}

[Fact]
public void Client_calls_proxy_method_and_canceles() {

Server.Connected += (sender, args) => {
args.Session.AddService<ICalculatorService>(new CalculatorService());
};


Client.Connected += (sender, args) => {
args.Session.AddProxy<ICalculatorService>(new CalculatorService());
var service = Client.Session.GetProxy<ICalculatorService>();
Stopwatch stopwatch = Stopwatch.StartNew();

int added_int = 0;
for (int i = 0; i < 10; i++) {
added_int = service.Add(added_int, 1);
}

Output.WriteLine($"{stopwatch.ElapsedMilliseconds}");
TestStatus.Set();
};

StartAndWait();
}
}
}
15 changes: 15 additions & 0 deletions DtronixMessageQueue.Rpc.Tests/Services/Server/CalculatorService.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
using System;
using System.Threading;

namespace DtronixMessageQueue.Rpc.Tests.Services.Server {
public class CalculatorService : MarshalByRefObject, ICalculatorService {
public string Name { get; } = "CalculatorService";
public SimpleRpcSession Session { get; set; }

public event EventHandler LongRunningTaskCanceled;

public int Add(int number_1, int number_2) {
return number_1 + number_2;
}
Expand All @@ -20,5 +23,17 @@ public int Multiply(int number_1, int number_2) {
public int Divide(int number_1, int number_2) {
return number_1/number_2;
}

public int LongRunningTask(int number_1, int number_2, CancellationToken token) {
ManualResetEventSlim mre = new ManualResetEventSlim();

mre.Wait(token);

if (mre.IsSet == false) {
LongRunningTaskCanceled?.Invoke(this, EventArgs.Empty);
}

return number_1 / number_2;
}
}
}
2 changes: 1 addition & 1 deletion DtronixMessageQueue.Rpc/DtronixMessageQueue.Rpc.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="BsonReadWriteStore.cs" />
<Compile Include="SerializationStore.cs" />
<Compile Include="IRemoteService.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="RpcClient.cs" />
Expand Down
6 changes: 4 additions & 2 deletions DtronixMessageQueue.Rpc/RpcProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public override IMessage Invoke(IMessage msg) {
var method_call = msg as IMethodCallMessage;
var method_info = method_call.MethodBase as MethodInfo;

var store = session.ReadWriteStore.Get();
var store = session.Store.Get();

object[] arguments = method_call.Args;
CancellationToken cancellation_token = CancellationToken.None;
Expand Down Expand Up @@ -54,6 +54,7 @@ public override IMessage Invoke(IMessage msg) {

return_wait = session.CreateReturnCallWait();
store.MessageWriter.Write(return_wait.Id);
return_wait.Token = cancellation_token;
}

store.MessageWriter.Write(decorated.Name);
Expand All @@ -79,6 +80,7 @@ public override IMessage Invoke(IMessage msg) {

return_wait.ReturnResetEvent.Wait(return_wait.Token);

return_wait.Token.ThrowIfCancellationRequested();

try {
store.MessageReader.Message = return_wait.ReturnMessage;
Expand Down Expand Up @@ -106,7 +108,7 @@ public override IMessage Invoke(IMessage msg) {
}

} finally {
session.ReadWriteStore.Put(store);
session.Store.Put(store);
}
}
}
Expand Down
20 changes: 19 additions & 1 deletion DtronixMessageQueue.Rpc/RpcReturnCallWait.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,28 @@
using System.Threading;
using System;
using System.Threading;

namespace DtronixMessageQueue.Rpc {
public class RpcReturnCallWait {
public ushort Id { get; set; }

/// <summary>
/// Reset event used to hold the requesting
/// </summary>
public ManualResetEventSlim ReturnResetEvent { get; set; }

/// <summary>
/// Message that the other session receives from the connection that is associated with the return value for this request.
/// </summary>
public MqMessage ReturnMessage { get; set; }

/// <summary>
/// Cancellation token for the request.
/// </summary>
public CancellationToken Token { get; set; }

/// <summary>
/// Contains the time that this call wait was created to check for timeouts.
/// </summary>
public DateTime Created { get; } = DateTime.UtcNow;
}
}
100 changes: 61 additions & 39 deletions DtronixMessageQueue.Rpc/RpcSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Reflection;
using System.Runtime.Remoting.Proxies;
using System.Threading;
using System.Threading.Tasks;
using DtronixMessageQueue.Socket;
using ProtoBuf;
using ProtoBuf.Meta;
Expand All @@ -13,27 +14,37 @@ namespace DtronixMessageQueue.Rpc {
public class RpcSession<TSession> : MqSession<TSession>
where TSession : RpcSession<TSession>, new() {


/// <summary>
/// Current call Id wich gets incremented for each call return request.
/// </summary>
private int rpc_call_id;

private object rpc_call_id_lock = new object();
/// <summary>
/// Lock to increment and loop return ID.
/// </summary>
private readonly object rpc_call_id_lock = new object();

public BsonReadWriteStore ReadWriteStore { get; private set; }
/// <summary>
/// Store which contains instances of all classes for serialization and destabilization of data.
/// </summary>
public SerializationStore Store { get; private set; }

private ConcurrentDictionary<ushort, RpcReturnCallWait> return_call_wait =
new ConcurrentDictionary<ushort, RpcReturnCallWait>();
/// <summary>
/// Contains all outstanding call returns pending a return of data from the other end of the connection.
/// </summary>
private readonly ConcurrentDictionary<ushort, RpcReturnCallWait> return_call_wait = new ConcurrentDictionary<ushort, RpcReturnCallWait>();

private Dictionary<string, IRemoteService<TSession>> services = new Dictionary<string, IRemoteService<TSession>>();
private Dictionary<Type, IRemoteService<TSession>> remote_services_proxy = new Dictionary<Type, IRemoteService<TSession>>();
private Dictionary<Type, RealProxy> remote_service_realproxy = new Dictionary<Type, RealProxy>();
private readonly Dictionary<string, IRemoteService<TSession>> services = new Dictionary<string, IRemoteService<TSession>>();
private readonly Dictionary<Type, IRemoteService<TSession>> remote_services_proxy = new Dictionary<Type, IRemoteService<TSession>>();
private readonly Dictionary<Type, RealProxy> remote_service_realproxy = new Dictionary<Type, RealProxy>();


public RpcServer<TSession> Server { get; set; }

protected override void OnSetup() {
base.OnSetup();

ReadWriteStore = new BsonReadWriteStore((MqSocketConfig)Config);
Store = new SerializationStore((MqSocketConfig)Config);
}


Expand Down Expand Up @@ -89,7 +100,7 @@ public void AddService<T>(T instance) where T : IRemoteService<TSession>{


private void ProcessRpcCall(MqMessage message, IncomingMessageEventArgs<TSession> e, RpcMessageType message_type) {
var store = ReadWriteStore.Get();
var store = Store.Get();
ushort message_return_id = 0;
try {
store.MessageReader.Message = message;
Expand Down Expand Up @@ -128,49 +139,60 @@ private void ProcessRpcCall(MqMessage message, IncomingMessageEventArgs<TSession
PrefixStyle.Base128, i);
}
}
Task.Run(() => {
object return_value;
try {
return_value = method_info.Invoke(service, parameters);
} catch (Exception ex) {
SendRpcException(store, ex, message_return_id);
return;
}


var return_value = method_info.Invoke(service, parameters);

switch (message_type) {
case RpcMessageType.RpcCall:
store.Stream.SetLength(0);

switch (message_type) {
case RpcMessageType.RpcCall:
store.Stream.SetLength(0);
store.MessageWriter.Clear();
store.MessageWriter.Write((byte)RpcMessageType.RpcCallReturn);
store.MessageWriter.Write(message_return_id);

store.MessageWriter.Clear();
store.MessageWriter.Write((byte)RpcMessageType.RpcCallReturn);
store.MessageWriter.Write(message_return_id);
RuntimeTypeModel.Default.SerializeWithLengthPrefix(store.Stream, return_value, return_value.GetType(), PrefixStyle.Base128, 0);

RuntimeTypeModel.Default.SerializeWithLengthPrefix(store.Stream, return_value, return_value.GetType(), PrefixStyle.Base128, 0);
store.MessageWriter.Write(store.Stream.ToArray());

store.MessageWriter.Write(store.Stream.ToArray());
Send(store.MessageWriter.ToMessage(true));

Send(store.MessageWriter.ToMessage(true));
break;
case RpcMessageType.RpcCallNoReturn:
break;
}

break;
case RpcMessageType.RpcCallNoReturn:
break;
default:
throw new ArgumentOutOfRangeException(nameof(message_type), message_type, null);
}
Store.Put(store);
});

} catch (Exception ex) {
store.Stream.SetLength(0);
SendRpcException(store, ex, message_return_id);
Store.Put(store);
}

store.MessageWriter.Clear();
store.MessageWriter.Write((byte)RpcMessageType.RpcCallException);
store.MessageWriter.Write(message_return_id);
}

var exception = new RpcRemoteExceptionDataContract(ex is TargetInvocationException ? ex.InnerException : ex);
private void SendRpcException(SerializationStore.Store store, Exception ex, ushort message_return_id) {
store.Stream.SetLength(0);

RuntimeTypeModel.Default.SerializeWithLengthPrefix(store.Stream, exception, exception.GetType(), PrefixStyle.Base128, 1);
store.MessageWriter.Clear();
store.MessageWriter.Write((byte)RpcMessageType.RpcCallException);
store.MessageWriter.Write(message_return_id);

store.MessageWriter.Write(store.Stream.ToArray());
var exception = new RpcRemoteExceptionDataContract(ex is TargetInvocationException ? ex.InnerException : ex);

Send(store.MessageWriter.ToMessage(true));
RuntimeTypeModel.Default.SerializeWithLengthPrefix(store.Stream, exception, exception.GetType(), PrefixStyle.Base128, 1);

} finally {
ReadWriteStore.Put(store);
}
store.MessageWriter.Write(store.Stream.ToArray());

Send(store.MessageWriter.ToMessage(true));

}

Expand All @@ -195,7 +217,7 @@ public RpcReturnCallWait CreateReturnCallWait() {


private void ProcessRpcReturn(MqMessage mq_message) {
var store = ReadWriteStore.Get();
var store = Store.Get();
try {
store.MessageReader.Message = mq_message;

Expand All @@ -212,7 +234,7 @@ private void ProcessRpcReturn(MqMessage mq_message) {

call_wait.ReturnResetEvent.Set();
} finally {
ReadWriteStore.Put(store);
Store.Put(store);
}
}

Expand Down
50 changes: 50 additions & 0 deletions DtronixMessageQueue.Rpc/SerializationStore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
using System.Collections.Concurrent;
using System.IO;

namespace DtronixMessageQueue.Rpc {
public class SerializationStore {
private readonly MqSocketConfig config;

public class Store {
public MqMessageWriter MessageWriter;
public MqMessageReader MessageReader;
public MemoryStream Stream;
}

private readonly ConcurrentQueue<Store> reader_writers = new ConcurrentQueue<Store>();

public SerializationStore(MqSocketConfig config) {
this.config = config;
}

public Store Get() {
Store store;
if (reader_writers.TryDequeue(out store) == false) {

var mq_writer = new MqMessageWriter(config);
var mq_reader = new MqMessageReader();

store = new Store {
MessageWriter = mq_writer,
MessageReader = mq_reader,
Stream = new MemoryStream()

};
} else {
store.MessageWriter.Clear();
store.Stream.SetLength(0);
}

//


return store;
}

public void Put(Store store) {
reader_writers.Enqueue(store);
}


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public RpcPerformanceTest(string[] args) {
Port = 2828
};

RpcSingleProcessTest(100000, 4, config, RpcTestType.NoRetrun);
RpcSingleProcessTest(100, 4, config, RpcTestType.NoRetrun);

RpcSingleProcessTest(1000, 4, config, RpcTestType.Return);

Expand Down

0 comments on commit 2af8c77

Please sign in to comment.