From ea057a7501bcf88d4e1218e50c352e0e84565d7e Mon Sep 17 00:00:00 2001 From: Ahmad Noman Musleh Date: Mon, 17 Jan 2022 12:42:02 +0300 Subject: [PATCH] Added new SendMessage method overload without payload type --- src/OpenAPI.Net/Helpers/MessageFactory.cs | 11 +++++- .../Helpers/MessagePayloadTypeExtension.cs | 32 ++++++++++++++++ src/OpenAPI.Net/OpenAPI.Net.csproj | 2 +- src/OpenAPI.Net/OpenClient.cs | 38 ++++++++++++++----- 4 files changed, 70 insertions(+), 13 deletions(-) create mode 100644 src/OpenAPI.Net/Helpers/MessagePayloadTypeExtension.cs diff --git a/src/OpenAPI.Net/Helpers/MessageFactory.cs b/src/OpenAPI.Net/Helpers/MessageFactory.cs index 5d59d0c..730daea 100644 --- a/src/OpenAPI.Net/Helpers/MessageFactory.cs +++ b/src/OpenAPI.Net/Helpers/MessageFactory.cs @@ -40,7 +40,7 @@ public static ProtoMessage GetMessage(this T message, ProtoOAPayloadType payl public static IMessage GetMessage(ProtoMessage protoMessage) { var payload = protoMessage.Payload; - + return protoMessage.PayloadType switch { (int)ProtoOAPayloadType.ProtoOaErrorRes => ProtoOAErrorRes.Parser.ParseFrom(payload), @@ -89,7 +89,14 @@ public static IMessage GetMessage(ProtoMessage protoMessage) }; } - private static ProtoMessage GetMessage(uint payloadType, ByteString payload, string clientMessageId = null) + /// + /// Returns a ProtoMessage based on your provided parameters + /// + /// The message payloadType as unint + /// The message payload as a ByteString + /// The client message ID for ProtoMessage + /// ProtoMessage + public static ProtoMessage GetMessage(uint payloadType, ByteString payload, string clientMessageId = null) { var message = new ProtoMessage { diff --git a/src/OpenAPI.Net/Helpers/MessagePayloadTypeExtension.cs b/src/OpenAPI.Net/Helpers/MessagePayloadTypeExtension.cs new file mode 100644 index 0000000..c2c5224 --- /dev/null +++ b/src/OpenAPI.Net/Helpers/MessagePayloadTypeExtension.cs @@ -0,0 +1,32 @@ +using Google.Protobuf; +using System; +using System.Reflection; + +namespace OpenAPI.Net.Helpers +{ + public static class MessagePayloadTypeExtension + { + /// + /// This method returns the payload type of a message + /// + /// IMessage + /// The message + /// uint (Payload Type) + /// + public static uint GetPayloadType(this T message) where T : IMessage + { + PropertyInfo property; + + try + { + property = message.GetType().GetProperty("PayloadType"); + } + catch (Exception ex) when (ex is AmbiguousMatchException || ex is ArgumentNullException) + { + throw new InvalidOperationException($"Couldn't get the PayloadType of the message {message}", ex); + } + + return (uint)property.GetValue(message); + } + } +} diff --git a/src/OpenAPI.Net/OpenAPI.Net.csproj b/src/OpenAPI.Net/OpenAPI.Net.csproj index b74976b..f6ef7c2 100644 --- a/src/OpenAPI.Net/OpenAPI.Net.csproj +++ b/src/OpenAPI.Net/OpenAPI.Net.csproj @@ -9,7 +9,7 @@ cTrader, Open API, Spotware A .NET RX library for Spotware Open API Spotware.OpenAPI.Net - 1.3.6-rc0 + 1.3.6-rc1 AnyCPU Spotware Spotware diff --git a/src/OpenAPI.Net/OpenClient.cs b/src/OpenAPI.Net/OpenClient.cs index a385429..80b227e 100644 --- a/src/OpenAPI.Net/OpenClient.cs +++ b/src/OpenAPI.Net/OpenClient.cs @@ -26,7 +26,7 @@ public sealed class OpenClient : IDisposable, IObservable private readonly ConcurrentDictionary> _observers = new ConcurrentDictionary>(); - private readonly CancellationTokenSource _messagesCancellationTokenSource = new CancellationTokenSource(); + private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); private readonly TimeSpan _requestDelay; @@ -36,8 +36,6 @@ public sealed class OpenClient : IDisposable, IObservable private SslStream _sslStream; - private IDisposable _listenerDisposable; - private IDisposable _heartbeatDisposable; private IDisposable _webSocketDisconnectionHappenedDisposable; @@ -129,7 +127,7 @@ public async Task Connect() await ConnectTcp(); } - _ = StartSendingMessages(_messagesCancellationTokenSource.Token); + _ = StartSendingMessages(_cancellationTokenSource.Token); _heartbeatDisposable = Observable.Interval(_heartbeatInerval).DoWhile(() => !IsDisposed) .Subscribe(x => SendHeartbeat()); @@ -180,7 +178,7 @@ private async Task ConnectTcp() await _sslStream.AuthenticateAsClientAsync(Host).ConfigureAwait(false); - _ = ReadTcp(); + _ = ReadTcp(_cancellationTokenSource.Token); } /// @@ -197,6 +195,24 @@ public IDisposable Subscribe(IObserver observer) return Disposable.Create(() => OnObserverDispose(observer)); } + /// + /// This method will insert your message on messages queue, it will not send the message instantly + /// By using this overload of SendMessage method you avoid passing the message payload type + /// and it gets the payload from message itself + /// + /// Message Type + /// Message + /// The client message ID (optional) + /// If getting message payload type fails + /// Task + public async Task SendMessage(T message, string clientMsgId = null) where T : + IMessage + { + var protoMessage = MessageFactory.GetMessage(message.GetPayloadType(), message.ToByteString(), clientMsgId); + + await SendMessage(protoMessage); + } + /// /// This method will insert your message on messages queue, it will not send the message instantly /// @@ -316,9 +332,8 @@ public void Dispose() IsDisposed = true; _heartbeatDisposable?.Dispose(); - _listenerDisposable?.Dispose(); - _messagesCancellationTokenSource.Cancel(); + _cancellationTokenSource.Cancel(); _messagesChannel.Writer.TryComplete(); @@ -346,7 +361,7 @@ public void Dispose() /// This method will read the TCP stream for incoming messages /// /// Task - private async Task ReadTcp() + private async Task ReadTcp(CancellationToken cancellationToken) { while (!IsDisposed) { @@ -360,7 +375,7 @@ private async Task ReadTcp() { var count = lengthArray.Length - readBytes; - readBytes += await _sslStream.ReadAsync(lengthArray, readBytes, count).ConfigureAwait(false); + readBytes += await _sslStream.ReadAsync(lengthArray, readBytes, count, cancellationToken).ConfigureAwait(false); } while (readBytes < lengthArray.Length); @@ -378,7 +393,7 @@ private async Task ReadTcp() { var count = data.Length - readBytes; - readBytes += await _sslStream.ReadAsync(data, readBytes, count).ConfigureAwait(false); + readBytes += await _sslStream.ReadAsync(data, readBytes, count, cancellationToken).ConfigureAwait(false); } while (readBytes < length); @@ -386,6 +401,9 @@ private async Task ReadTcp() OnNext(message); } + catch (TaskCanceledException) + { + } catch (Exception ex) { var readException = new ReadException(ex);