Skip to content
This repository has been archived by the owner on Jun 16, 2022. It is now read-only.

Commit

Permalink
Renamed SessionConnectedEventArgs.cs to SessionEventArgs to be used i…
Browse files Browse the repository at this point in the history
…n multiple places.

Fixed tests hanging.
Added new event on SocketBase called SessionSetup.  Called before the session is connected but after initial setup.
  • Loading branch information
DJGosnell committed Sep 15, 2016
1 parent 5f8c537 commit 1f8ae73
Show file tree
Hide file tree
Showing 9 changed files with 30 additions and 21 deletions.
8 changes: 4 additions & 4 deletions src/DtronixMessageQueue.Tests/Rpc/RpcClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class Test {
[Fact]
public void Client_calls_proxy_method() {

Server.Connected += (sender, args) => {
Server.SessionSetup += (sender, args) => {
args.Session.AddService(new CalculatorService());
};

Expand All @@ -46,8 +46,8 @@ public void Client_calls_proxy_method() {
[Fact]
public void Client_calls_proxy_method_sequential() {

Server.Connected += (sender, args) => {
args.Session.AddService<ICalculatorService>(new CalculatorService());
Server.SessionSetup += (sender, args) => {
args.Session.AddService(new CalculatorService());
};


Expand All @@ -71,7 +71,7 @@ public void Client_calls_proxy_method_sequential() {
[Fact]
public void Client_calls_proxy_method_and_canceles() {

Server.Connected += (sender, args) => {
Server.SessionSetup += (sender, args) => {
var service = new CalculatorService();
args.Session.AddService<ICalculatorService>(service);

Expand Down
2 changes: 1 addition & 1 deletion src/DtronixMessageQueue/DtronixMessageQueue.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
<Compile Include="Rpc\RpcSession.cs" />
<Compile Include="Rpc\SerializationStore.cs" />
<Compile Include="Socket\BufferManager.cs" />
<Compile Include="Socket\SessionConnectedEventArgs.cs" />
<Compile Include="Socket\SessionEventArgs.cs" />
<Compile Include="Socket\SessionClosedEventArgs.cs" />
<Compile Include="Socket\SocketAsyncEventArgsPool.cs" />
<Compile Include="Socket\SocketBase.cs" />
Expand Down
3 changes: 2 additions & 1 deletion src/DtronixMessageQueue/Rpc/RpcServer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Net.Sockets;
using System;
using System.Net.Sockets;
using Amib.Threading;

namespace DtronixMessageQueue.Rpc {
Expand Down
6 changes: 3 additions & 3 deletions src/DtronixMessageQueue/Rpc/RpcSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.Reflection;
using System.Runtime.Remoting.Proxies;
using System.Threading;
using System.Threading.Tasks;
using Amib.Threading;
using DtronixMessageQueue.Socket;
using ProtoBuf;
Expand All @@ -31,6 +30,9 @@ public class RpcSession<TSession, TConfig> : MqSession<TSession, TConfig>
/// </summary>
public SerializationStore Store { get; private set; }

/// <summary>
/// Thread pool for performing tasks on this session.
/// </summary>
private SmartThreadPool worker_thread_pool;

/// <summary>
Expand Down Expand Up @@ -73,8 +75,6 @@ public override void OnIncomingMessage(object sender, IncomingMessageEventArgs<T
// Read the type of message.
var message_type = (RpcMessageType) message[0].ReadByte(0);

//var message_type = Enum ;

switch (message_type) {
case RpcMessageType.Command:
ProcessRpcCommand(message, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace DtronixMessageQueue.Socket {
/// Event args used when the session has connected to a remote endpoint.
/// </summary>
/// <typeparam name="TSession">Session type.</typeparam>
public class SessionConnectedEventArgs<TSession, TConfig> : EventArgs
public class SessionEventArgs<TSession, TConfig> : EventArgs
where TSession : SocketSession<TConfig>
where TConfig : SocketConfig {

Expand All @@ -19,7 +19,7 @@ public class SessionConnectedEventArgs<TSession, TConfig> : EventArgs
/// Creates a new instance of the session connected event args.
/// </summary>
/// <param name="session">Connected session.</param>
public SessionConnectedEventArgs(TSession session) {
public SessionEventArgs(TSession session) {
Session = session;
}

Expand Down
10 changes: 8 additions & 2 deletions src/DtronixMessageQueue/Socket/SocketBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@ public abstract class SocketBase<TSession, TConfig>
/// <summary>
/// This event fires when a connection has been established.
/// </summary>
public event EventHandler<SessionConnectedEventArgs<TSession, TConfig>> Connected;
public event EventHandler<SessionEventArgs<TSession, TConfig>> Connected;

/// <summary>
/// This event fires when a connection has been closed.
/// </summary>
public event EventHandler<SessionClosedEventArgs<TSession, TConfig>> Closed;

/// <summary>
/// Event called when a new session is created and is being setup but before the session is active.
/// </summary>
public event EventHandler<SessionEventArgs<TSession, TConfig>> SessionSetup;

/// <summary>
/// Configurations of this socket.
Expand Down Expand Up @@ -60,7 +65,7 @@ protected SocketBase(TConfig config) {
/// </summary>
/// <param name="session">Session that connected.</param>
protected virtual void OnConnect(TSession session) {
Connected?.Invoke(this, new SessionConnectedEventArgs<TSession, TConfig>(session));
Connected?.Invoke(this, new SessionEventArgs<TSession, TConfig>(session));
}


Expand Down Expand Up @@ -107,6 +112,7 @@ protected void Setup() {
protected virtual TSession CreateSession(System.Net.Sockets.Socket socket) {
var session = new TSession();
SocketSession<TConfig>.Setup(session, socket, AsyncPool, Config);
SessionSetup?.Invoke(this, new SessionEventArgs<TSession, TConfig>(session));
session.Closed += (sender, args) => OnClose(session, args.CloseReason);
return session;
}
Expand Down
3 changes: 2 additions & 1 deletion src/DtronixMessageQueue/Socket/SocketClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ public void Connect(IPEndPoint end_point) {
event_arg.Completed += (sender, args) => {
if (args.LastOperation == SocketAsyncOperation.Connect) {
Session = CreateSession(MainSocket);
OnConnect(Session);

Session.Start();
OnConnect(Session);
}
};

Expand Down
6 changes: 3 additions & 3 deletions src/DtronixMessageQueue/Socket/SocketServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ private void AcceptCompleted(SocketAsyncEventArgs e) {
// Add this session to the list of connected sessions.
ConnectedSessions.TryAdd(session.Id, session);

// Invoke the events.
OnConnect(session);

// Start the session.
session.Start();

// Invoke the events.
OnConnect(session);

// Accept the next connection request
StartAccept(e);
}
Expand Down
9 changes: 5 additions & 4 deletions src/DtronixMessageQueue/Socket/SocketSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,14 @@ public enum State : byte {
/// <summary>
/// This event fires when a connection has been established.
/// </summary>
public event EventHandler<SessionConnectedEventArgs<SocketSession<TConfig>, TConfig>> Connected;
public event EventHandler<SessionEventArgs<SocketSession<TConfig>, TConfig>> Connected;

/// <summary>
/// This event fires when a connection has been shutdown.
/// </summary>
public event EventHandler<SessionClosedEventArgs<SocketSession<TConfig>, TConfig>> Closed;


/// <summary>
/// Creates a new socket session with a new Id.
/// </summary>
Expand Down Expand Up @@ -147,15 +148,15 @@ public static void Setup(SocketSession<TConfig> session, System.Net.Sockets.Sock
socket.NoDelay = true;
socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.DontLinger, true);

session.CurrentState = State.Connecting;

session.OnSetup();
}

internal void Start() {
if (CurrentState == State.Connecting) {
// Start receiving data.
CurrentState = State.Connected;
socket.ReceiveAsync(receive_args);

}
}

Expand All @@ -171,7 +172,7 @@ protected virtual void OnSetup() {
/// </summary>
protected void OnConnected() {
//logger.Info("Session {0}: Connected", Id);
Connected?.Invoke(this, new SessionConnectedEventArgs<SocketSession<TConfig>, TConfig>(this));
Connected?.Invoke(this, new SessionEventArgs<SocketSession<TConfig>, TConfig>(this));
}

/// <summary>
Expand Down

0 comments on commit 1f8ae73

Please sign in to comment.