Skip to content

Commit

Permalink
Merge pull request #40 from tcz717/turnip-dev
Browse files Browse the repository at this point in the history
Heartbeat on RpcClient and Timeout on RpcServer
  • Loading branch information
panyz522 committed May 8, 2019
2 parents a627d45 + 61b5ad6 commit 87ac126
Show file tree
Hide file tree
Showing 22 changed files with 608 additions and 14 deletions.
4 changes: 4 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
root = true

[*.cs]
end_of_line = lf
11 changes: 11 additions & 0 deletions SimCivil.Contract/IConnectionControl.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace SimCivil.Contract
{
public interface IConnectionControl
{
void Noop();
}
}
1 change: 1 addition & 0 deletions SimCivil.Gate/Gate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ private static ContainerBuilder ConfigureRpc(
builder.RegisterModule(module);
builder.Populate(services);
builder.UseRpcSession();
builder.RegisterRpcProvider<GatewayConnectionControl, IConnectionControl>().InstancePerChannel();
builder.RegisterRpcProvider<OrleansAuth, IAuth>().SingleInstance();
builder.RegisterRpcProvider<OrleansRoleManager, IRoleManager>().InstancePerChannel();
builder.RegisterRpcProvider<OrleansChunkViewSynchronizer, IViewSynchronizer>().SingleInstance().AsSelf();
Expand Down
18 changes: 18 additions & 0 deletions SimCivil.Gate/GatewayConnectionControl.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System;
using System.Collections.Generic;
using System.Text;
using SimCivil.Contract;

namespace SimCivil.Gate
{
class GatewayConnectionControl : IConnectionControl
{
/// <summary>
/// An empty request for keeping alive
/// </summary>
public void Noop()
{
// An empty request for keeping alive
}
}
}
1 change: 1 addition & 0 deletions SimCivil.IntegrationTest/SimCivil.IntegrationTest.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@
<Compile Include="ServerConfigWindow.xaml.cs">
<DependentUpon>ServerConfigWindow.xaml</DependentUpon>
</Compile>
<Compile Include="Testcase\HeartbeatTest.cs" />
<Compile Include="Testcase\MovementTest.cs" />
<Compile Include="TextBoxLogger.cs" />
<Compile Include="TextBoxLoggerExtension.cs" />
Expand Down
109 changes: 109 additions & 0 deletions SimCivil.IntegrationTest/Testcase/HeartbeatTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
using Microsoft.Extensions.Logging;
using Orleans;
using SimCivil.Contract;
using SimCivil.Contract.Model;
using SimCivil.Orleans.Interfaces;
using SimCivil.Rpc;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static System.Diagnostics.Debug;

namespace SimCivil.IntegrationTest.Testcase
{
class HeartbeatTest : IIntegrationTest
{
private static int _id;
protected RpcClient Client { get; private set; }
public IClusterClient Cluster { get; }
public bool IsRunning { get; private set; }
public ILogger<HeartbeatTest> Logger { get; }
public string RoleName { get; set; }

public HeartbeatTest(ILogger<HeartbeatTest> logger, IClusterClient cluster)
{
RoleName = nameof(HeartbeatTest) + Interlocked.Increment(ref _id);
Logger = logger;
Cluster = cluster;
}

private void NewClient()
{
Client = new RpcClient(
new IPEndPoint(
Dns.GetHostAddresses(Dns.GetHostName()).First(ip => ip.AddressFamily == AddressFamily.InterNetwork),
20170));
}

public Guid GetEntityId()
{
var rm = Client.Import<IRoleManager>();

return Task.Factory.StartNew(() => rm.GetRoleList().Result).Result.First().Id;
}

public Task Stop()
{
Client.Disconnect();
IsRunning = false;
return Task.CompletedTask;
}

public async Task Test()
{
IsRunning = true;
NewClient();
await Client.ConnectAsync();
await RegisterAndLogin();

// Create a client with heartbeat and wait
var rm = Client.Import<IRoleManager>();
await rm.CreateRole(new CreateRoleOption { Gender = Gender.Male, Name = RoleName, Race = Race.Human });
await rm.UseRole((await rm.GetRoleList()).First().Id);

await Task.Delay(15000);
Client.Disconnect();
Client.Dispose();
Logger.LogInformation($"{RoleName} client disconnected");
await Task.Delay(1000);

// Create a client without heartbeat and wait
NewClient();
Client.HeartbeatDelay = int.MaxValue;
await Client.ConnectAsync();
await Login();

rm = Client.Import<IRoleManager>();
await rm.UseRole((await rm.GetRoleList()).First().Id);

await Task.Delay(10000);
Client.Disconnect();
Client.Dispose();
Logger.LogInformation($"{RoleName} client disconnected");
await Task.Delay(1000);

Logger.LogInformation($"{RoleName} test end");
}

protected async Task RegisterAndLogin(string _name = null, string password = "")
{
var name = _name ?? RoleName;
await Cluster.GetGrain<IAccount>(name).Register(password);
Logger.LogInformation($"Role \"{RoleName}\" Created");
await Login(name, password);
}

protected async Task Login(string name = null, string password = "")
{
var auth = Client.Import<IAuth>();
await auth.LogInAsync(name ?? RoleName, password);

Logger.LogInformation($"Role \"{RoleName}\" login");
}
}
}
1 change: 1 addition & 0 deletions SimCivil.IntegrationTest/Testcase/MovementTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public void Dispose()
if (IsRunning)
Stop().Wait();
Client?.Dispose();
Logger.LogInformation($"{RoleName} test disposed");
}

public bool IsRunning { get; private set; }
Expand Down
143 changes: 138 additions & 5 deletions SimCivil.Rpc/RpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static System.Diagnostics.Debug;

using Castle.DynamicProxy;

Expand All @@ -38,6 +39,7 @@
using DotNetty.Transport.Channels.Sockets;

using SimCivil.Rpc.Callback;
using SimCivil.Contract;

namespace SimCivil.Rpc
{
Expand All @@ -49,6 +51,8 @@ public class RpcClient : IDisposable

private readonly ProxyGenerator _generator = new ProxyGenerator();
private readonly IChannelHandler _resolver;
private readonly IConnectionControl _connectionControl;
private readonly HeartbeatGenerator _heartbeatGenerator;
private int _nextCallbackId;

private long _nextSeq;
Expand All @@ -58,7 +62,9 @@ public class RpcClient : IDisposable
public Dictionary<Type, object> ProxyCache { get; } = new Dictionary<Type, object>();
public Dictionary<long, RpcRequest> ResponseWaitlist { get; } = new Dictionary<long, RpcRequest>();
public int ResponseTimeout { get; set; } = 3000;
public int HeartbeatDelay { get; set; } = 2000;
public IInterceptor Interceptor { get; }
public bool Connected { get; private set; }

public Dictionary<int, Delegate> CallBackList { get; } = new Dictionary<int, Delegate>();

Expand All @@ -70,13 +76,18 @@ public RpcClient(IPEndPoint endPoint)
Interceptor = new RpcInterceptor(this);
_resolver = new RpcClientResolver(this);
_callbackResolver = new RpcCallbackResolver(this);
_connectionControl = Import<IConnectionControl>();
_heartbeatGenerator = new HeartbeatGenerator(this);
}

/// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
public void Dispose()
{
if (Channel?.Open ?? false)
Channel.CloseAsync().Wait();
_heartbeatGenerator.Dispose();
}

public event EventHandler<EventArgs<string>> DecodeFail;
Expand Down Expand Up @@ -122,6 +133,9 @@ public async Task ConnectAsync()

throw;
}
_heartbeatGenerator.Start();
_heartbeatGenerator.HeartbeatNeeded += (sender, args) => SendHeartbeat();
Connected = true;
}

protected virtual void ChannelInit(ISocketChannel channel)
Expand All @@ -134,15 +148,21 @@ protected virtual void ChannelInit(ISocketChannel channel)
.AddLast(_callbackResolver);
}

[MethodImpl(MethodImplOptions.Synchronized)]
public void Disconnect()
{
Channel?.DisconnectAsync();
Channel = null;
ProxyCache.Clear();
if (Connected)
{
_heartbeatGenerator.Stop();
Channel?.DisconnectAsync();
Connected = false;
Channel = null;
ProxyCache.Clear();
}
}

/// <summary>
/// Imports reomote service.
/// Imports or gets remote service.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
Expand Down Expand Up @@ -170,12 +190,125 @@ protected virtual void OnDecodeFail(EventArgs<string> e)
DecodeFail?.Invoke(this, e);
}

/// <summary>Attaches the callback and gets id.</summary>
/// <param name="delegate">The callback to be attached.</param>
/// <returns>Id of attached callback</returns>
public int AttachCallback(Delegate @delegate)
{
int id = Interlocked.Increment(ref _nextCallbackId);
CallBackList[id] = @delegate;

return id;
}

public void SendHeartbeat()
{
Task.Run(() =>
{
try
{
_connectionControl.Noop();
}
catch
{
// Heartbeat failed
Disconnect();
}
});
}

public void NotifyPacketSent()
{
if (!Connected)
throw new InvalidOperationException("Rpc Client is disconnected");
_heartbeatGenerator.NotifyPacketSent();
}
}

/// <summary>
/// Heartbeat generation indication
/// </summary>
class HeartbeatGenerator : IDisposable
{
private CancellationTokenSource cancel;
private Task runTask;
private bool sentPacket;
private readonly RpcClient client;

/// <summary>
/// Is running
/// </summary>
public bool IsRunning { get; private set; }

/// <summary>
/// Need to sent a heartbeat to server
/// </summary>
public event EventHandler<EventArgs> HeartbeatNeeded;

public HeartbeatGenerator(RpcClient client)
{
this.client = client;
}

/// <summary>
/// Start the daemon
/// </summary>
public void Start()
{
cancel = new CancellationTokenSource();
var tf = new TaskFactory(cancel.Token, TaskCreationOptions.LongRunning, TaskContinuationOptions.None, TaskScheduler.Default);
runTask = tf.StartNew(Run);
IsRunning = true;
}

/// <summary>
/// Stop the daemon
/// </summary>
public void Stop()
{
cancel.Cancel();
runTask.Wait();
IsRunning = false;
}

/// <summary>
/// Notify that a request has been sent
/// </summary>
public void NotifyPacketSent()
{
if (!IsRunning)
throw new InvalidOperationException("HeartbeatGenerator is stopped");
sentPacket = true;
}

private void Run()
{
while (!cancel.IsCancellationRequested)
{
try
{
Task.Delay(client.HeartbeatDelay, cancel.Token).Wait();
}
catch
{
// Log cancelled
}
if (!sentPacket)
{
HeartbeatNeeded?.Invoke(this, new EventArgs());
}
else
{
sentPacket = false;
}
}
}

public void Dispose()
{
if (IsRunning)
Stop();
cancel.Dispose();
}
}
}
Loading

0 comments on commit 87ac126

Please sign in to comment.