Skip to content

Commit

Permalink
Added new SendMessage method overload without payload type
Browse files Browse the repository at this point in the history
  • Loading branch information
Ahmad Noman Musleh committed Jan 17, 2022
1 parent 26fb7d2 commit ea057a7
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 13 deletions.
11 changes: 9 additions & 2 deletions src/OpenAPI.Net/Helpers/MessageFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public static ProtoMessage GetMessage<T>(this T message, ProtoOAPayloadType payl
public static IMessage GetMessage(ProtoMessage protoMessage)
{
var payload = protoMessage.Payload;

return protoMessage.PayloadType switch
{
(int)ProtoOAPayloadType.ProtoOaErrorRes => ProtoOAErrorRes.Parser.ParseFrom(payload),
Expand Down Expand Up @@ -89,7 +89,14 @@ public static IMessage GetMessage(ProtoMessage protoMessage)
};
}

private static ProtoMessage GetMessage(uint payloadType, ByteString payload, string clientMessageId = null)
/// <summary>
/// Returns a ProtoMessage based on your provided parameters
/// </summary>
/// <param name="payloadType">The message payloadType as unint</param>
/// <param name="payload">The message payload as a ByteString</param>
/// <param name="clientMessageId">The client message ID for ProtoMessage</param>
/// <returns>ProtoMessage</returns>
public static ProtoMessage GetMessage(uint payloadType, ByteString payload, string clientMessageId = null)
{
var message = new ProtoMessage
{
Expand Down
32 changes: 32 additions & 0 deletions src/OpenAPI.Net/Helpers/MessagePayloadTypeExtension.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using Google.Protobuf;
using System;
using System.Reflection;

namespace OpenAPI.Net.Helpers
{
public static class MessagePayloadTypeExtension
{
/// <summary>
/// This method returns the payload type of a message
/// </summary>
/// <typeparam name="T">IMessage</typeparam>
/// <param name="message">The message</param>
/// <returns>uint (Payload Type)</returns>
/// <exception cref="InvalidOperationException"></exception>
public static uint GetPayloadType<T>(this T message) where T : IMessage
{
PropertyInfo property;

try
{
property = message.GetType().GetProperty("PayloadType");
}
catch (Exception ex) when (ex is AmbiguousMatchException || ex is ArgumentNullException)
{
throw new InvalidOperationException($"Couldn't get the PayloadType of the message {message}", ex);
}

return (uint)property.GetValue(message);
}
}
}
2 changes: 1 addition & 1 deletion src/OpenAPI.Net/OpenAPI.Net.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<PackageTags>cTrader, Open API, Spotware</PackageTags>
<Description>A .NET RX library for Spotware Open API</Description>
<PackageId>Spotware.OpenAPI.Net</PackageId>
<Version>1.3.6-rc0</Version>
<Version>1.3.6-rc1</Version>
<Platforms>AnyCPU</Platforms>
<Company>Spotware</Company>
<Authors>Spotware</Authors>
Expand Down
38 changes: 28 additions & 10 deletions src/OpenAPI.Net/OpenClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public sealed class OpenClient : IDisposable, IObservable<IMessage>

private readonly ConcurrentDictionary<int, IObserver<IMessage>> _observers = new ConcurrentDictionary<int, IObserver<IMessage>>();

private readonly CancellationTokenSource _messagesCancellationTokenSource = new CancellationTokenSource();
private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();

private readonly TimeSpan _requestDelay;

Expand All @@ -36,8 +36,6 @@ public sealed class OpenClient : IDisposable, IObservable<IMessage>

private SslStream _sslStream;

private IDisposable _listenerDisposable;

private IDisposable _heartbeatDisposable;

private IDisposable _webSocketDisconnectionHappenedDisposable;
Expand Down Expand Up @@ -129,7 +127,7 @@ public async Task Connect()
await ConnectTcp();
}

_ = StartSendingMessages(_messagesCancellationTokenSource.Token);
_ = StartSendingMessages(_cancellationTokenSource.Token);

_heartbeatDisposable = Observable.Interval(_heartbeatInerval).DoWhile(() => !IsDisposed)
.Subscribe(x => SendHeartbeat());
Expand Down Expand Up @@ -180,7 +178,7 @@ private async Task ConnectTcp()

await _sslStream.AuthenticateAsClientAsync(Host).ConfigureAwait(false);

_ = ReadTcp();
_ = ReadTcp(_cancellationTokenSource.Token);
}

/// <summary>
Expand All @@ -197,6 +195,24 @@ public IDisposable Subscribe(IObserver<IMessage> observer)
return Disposable.Create(() => OnObserverDispose(observer));
}

/// <summary>
/// This method will insert your message on messages queue, it will not send the message instantly
/// By using this overload of SendMessage method you avoid passing the message payload type
/// and it gets the payload from message itself
/// </summary>
/// <typeparam name="T">Message Type</typeparam>
/// <param name="message">Message</param>
/// <param name="clientMsgId">The client message ID (optional)</param>
/// <exception cref="InvalidOperationException">If getting message payload type fails</exception>
/// <returns>Task</returns>
public async Task SendMessage<T>(T message, string clientMsgId = null) where T :
IMessage
{
var protoMessage = MessageFactory.GetMessage(message.GetPayloadType(), message.ToByteString(), clientMsgId);

await SendMessage(protoMessage);
}

/// <summary>
/// This method will insert your message on messages queue, it will not send the message instantly
/// </summary>
Expand Down Expand Up @@ -316,9 +332,8 @@ public void Dispose()
IsDisposed = true;

_heartbeatDisposable?.Dispose();
_listenerDisposable?.Dispose();

_messagesCancellationTokenSource.Cancel();
_cancellationTokenSource.Cancel();

_messagesChannel.Writer.TryComplete();

Expand Down Expand Up @@ -346,7 +361,7 @@ public void Dispose()
/// This method will read the TCP stream for incoming messages
/// </summary>
/// <returns>Task</returns>
private async Task ReadTcp()
private async Task ReadTcp(CancellationToken cancellationToken)
{
while (!IsDisposed)
{
Expand All @@ -360,7 +375,7 @@ private async Task ReadTcp()
{
var count = lengthArray.Length - readBytes;

readBytes += await _sslStream.ReadAsync(lengthArray, readBytes, count).ConfigureAwait(false);
readBytes += await _sslStream.ReadAsync(lengthArray, readBytes, count, cancellationToken).ConfigureAwait(false);
}
while (readBytes < lengthArray.Length);

Expand All @@ -378,14 +393,17 @@ private async Task ReadTcp()
{
var count = data.Length - readBytes;

readBytes += await _sslStream.ReadAsync(data, readBytes, count).ConfigureAwait(false);
readBytes += await _sslStream.ReadAsync(data, readBytes, count, cancellationToken).ConfigureAwait(false);
}
while (readBytes < length);

var message = ProtoMessage.Parser.ParseFrom(data);

OnNext(message);
}
catch (TaskCanceledException)
{
}
catch (Exception ex)
{
var readException = new ReadException(ex);
Expand Down

0 comments on commit ea057a7

Please sign in to comment.