Skip to content
This repository has been archived by the owner on Jun 16, 2022. It is now read-only.

Commit

Permalink
Re-implementation of the Socket base and Server. Working on client.
Browse files Browse the repository at this point in the history
  • Loading branch information
DJGosnell committed Aug 23, 2016
1 parent 39bcd28 commit 75923ce
Show file tree
Hide file tree
Showing 11 changed files with 351 additions and 367 deletions.
2 changes: 2 additions & 0 deletions DtronixMessageQueue/DtronixMessageQueue.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@
<Compile Include="MqServerReceiveFilterFactory.cs" />
<Compile Include="MqSession.cs" />
<Compile Include="Socket\BufferManager.cs" />
<Compile Include="Socket\SessionChangedEventArgs.cs" />
<Compile Include="Socket\SocketAsyncEventArgsPool.cs" />
<Compile Include="Socket\SocketBase.cs" />
<Compile Include="Socket\SocketClient.cs" />
<Compile Include="Socket\SocketConfig.cs" />
<Compile Include="Socket\SocketServer.cs" />
<Compile Include="Socket\SocketSession.cs" />
<None Include="app.config" />
Expand Down
31 changes: 10 additions & 21 deletions DtronixMessageQueue/MqClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,21 @@
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
using SuperSocket.ClientEngine;
using SuperSocket.Common;
using SuperSocket.ProtoBase;
using DtronixMessageQueue.Socket;


namespace DtronixMessageQueue {

/// <summary>
/// Client used to connect to a remote message queue server.
/// </summary>
public class MqClient : EasyClientBase, IDisposable {
public class MqClient : SocketClient<MqSession> {

/// <summary>
/// Internal postmaster.
/// </summary>
private readonly MqPostmaster postmaster;

/// <summary>
/// Mailbox for this client.
/// </summary>
private readonly MqMailbox mailbox;

/// <summary>
/// Represents the maximum size in bytes that a frame can be. (including headers)
/// </summary>
Expand All @@ -40,17 +34,12 @@ public class MqClient : EasyClientBase, IDisposable {
/// <summary>
/// Initializes a new instance of a message queue.
/// </summary>
public MqClient() {
PipeLineProcessor = new DefaultPipelineProcessor<BufferedPackageInfo>(new MqClientReceiveFilter(), MqFrame.MaxFrameSize);
public MqClient(SocketConfig config) : base(config) {

postmaster = new MqPostmaster() {
postmaster = new MqPostmaster {
MaxReaders = 2,
MaxWriters = 2
};

mailbox = new MqMailbox(postmaster, this);

mailbox.IncomingMessage += OnIncomingMessage;
}

/// <summary>
Expand All @@ -66,7 +55,7 @@ private void OnIncomingMessage(object sender, IncomingMessageEventArgs e) {
/// Internal method to retrieve all the bytes on the wire and enqueue them to be processed by a separate thread.
/// </summary>
/// <param name="package">Package to process</param>
protected override void HandlePackage(IPackageInfo package) {
/*protected override void HandlePackage(IPackageInfo package) {
var buff_package = package as BufferedPackageInfo;
if (buff_package == null) {
Expand All @@ -91,17 +80,17 @@ protected override void HandlePackage(IPackageInfo package) {
// Enqueue the incoming message to be processed by the postmaster.
mailbox.EnqueueIncomingBuffer(buffer);
}
}*/

/// <summary>
/// Connects to a serve endpoint.
/// </summary>
/// <param name="address">Server address to connect to.</param>
/// <param name="port">Server port to connect to. Default is 2828.</param>
/// <returns>Awaitable task.</returns>
public Task ConnectAsync(string address, int port = 2828) {
/*public Task ConnectAsync(string address, int port = 2828) {
return ConnectAsync(new IPEndPoint(IPAddress.Parse(address), port));
}
}*/

/// <summary>
/// Adds a message to the outbox to be processed.
Expand All @@ -118,7 +107,7 @@ public void Send(MqMessage message) {
}

// Enqueue the outgoing message to be processed by the postmaster.
mailbox.EnqueueOutgoingMessage(message);
Session.Mailbox.EnqueueOutgoingMessage(message);
}

/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion DtronixMessageQueue/MqFrame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public MqFrameType FrameType {
}
}

public const int MaxFrameSize = 1024*16 - HeaderLength;
public const int MaxFrameSize = 1024*4 - HeaderLength;

public const int HeaderLength = 3;

Expand Down
33 changes: 8 additions & 25 deletions DtronixMessageQueue/MqMailbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,6 @@ public class MqMailbox : IDisposable {
private readonly MqPostmaster postmaster;


private readonly MqClient client;

/// <summary>
/// Client reference. If this mailbox is run as a server mailbox, this is then null.
/// </summary>
public MqClient Client => client;


private readonly MqSession session;

/// <summary>
Expand Down Expand Up @@ -84,17 +76,6 @@ public MqMailbox(MqPostmaster postmaster, MqSession session) {
frame_builder = new MqFrameBuilder();
}

/// <summary>
/// Initializes a new instance of the MqMailbox class.
/// </summary>
/// <param name="postmaster">Reference to the postmaster for this instance.</param>
/// <param name="client">Client reference for this instance.</param>
public MqMailbox(MqPostmaster postmaster, MqClient client) {
this.postmaster = postmaster;
this.client = client;
frame_builder = new MqFrameBuilder();
}

/// <summary>
/// Adds bytes from the client/server reading methods to be processed by the postmaster.
/// </summary>
Expand Down Expand Up @@ -144,13 +125,15 @@ private void SendBufferQueue(Queue<byte[]> buffer_queue, int length) {
offset += bytes.Length;
}

if (client != null) {
client.Send(buffer);
} else {
session.Send(buffer, 0, buffer.Length);
}



session.Send(buffer, 0, buffer.Length);

// Wait for the event args to return a successful send.
Session.WriteReset.Wait();



}


Expand Down
17 changes: 12 additions & 5 deletions DtronixMessageQueue/MqSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,34 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using SuperSocket.SocketBase;
using SuperSocket.SocketBase.Protocol;
using DtronixMessageQueue.Socket;
using NLog;

namespace DtronixMessageQueue {
public class MqSession : AppSession<MqSession, RequestInfo<byte, byte[]>> {
public class MqSession : SocketSession {

private static readonly Logger logger = LogManager.GetCurrentClassLogger();

public MqMailbox Mailbox { get; set; }

/// <summary>
/// User supplied token used to pass a related object around with this session.
/// </summary>
public object Token { get; set; }

protected override void HandleIncomingBytes(byte[] buffer) {
Mailbox.EnqueueIncomingBuffer(buffer);
}


/// <summary>
/// Sends a message to the session's client.
/// </summary>
/// <param name="message">Message to send.</param>
public void Send(MqMessage message) {
if (Connected == false) {
/*if (Connected == false) {
throw new InvalidOperationException("Can not send messages while disconnected from server.");
}
}*/

if (message.Count == 0) {
return;
Expand Down
16 changes: 16 additions & 0 deletions DtronixMessageQueue/Socket/SessionChangedEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace DtronixMessageQueue.Socket {
public class SessionChangedEventArgs<TSession> : EventArgs
where TSession : SocketSession {
public SessionChangedEventArgs(TSession session) {
Session = session;
}

public TSession Session { get; }
}
}
Loading

0 comments on commit 75923ce

Please sign in to comment.