diff --git a/ExchangeSharp/Utility/ClientWebSocket.cs b/ExchangeSharp/Utility/ClientWebSocket.cs index e8be871c..44bf504e 100644 --- a/ExchangeSharp/Utility/ClientWebSocket.cs +++ b/ExchangeSharp/Utility/ClientWebSocket.cs @@ -237,10 +237,10 @@ public void Start() { CreateWebSocket(); - // kick off message parser and message listener - Task.Run(MessageTask); - Task.Run(ReadTask); - } + // kick off message parser and message listener + Task.Run(MessageTask); + Task.Run(ReadTask); + } /// /// Close and dispose of all resources, stops the web socket and shuts it down. @@ -249,34 +249,38 @@ public void Dispose() { if (!disposed) { - disposed = true; - cancellationTokenSource.Cancel(); - Task.Run(async () => - { - try - { - if (webSocket.State == WebSocketState.Open) - { - if (CloseCleanly) - { - await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Dispose", cancellationToken); - } - else - { - await webSocket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Dispose", cancellationToken); - } - } - } - catch (OperationCanceledException) - { - // dont care - } - catch (Exception ex) - { - Logger.Info(ex.ToString()); - } - }); - } + Task.Run(async () => + { + try + { + if (webSocket.State == WebSocketState.Open) + { + if (CloseCleanly) + { + await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, + "Dispose", cancellationToken); + } + else + { + await webSocket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, + "Dispose", cancellationToken); + } + } + disposed = true; + } + catch (OperationCanceledException) + { + // dont care + } + catch (Exception ex) + { + Logger.Info(ex.ToString()); + } + cancellationTokenSource.Cancel(); + webSocket.Dispose(); + messageQueue.CompleteAdding(); + }); + } } /// @@ -317,7 +321,7 @@ public Task SendMessageAsync(object message) private void QueueActions(params Func[] actions) { - if (actions != null && actions.Length != 0) + if (actions != null && actions.Length != 0 && !messageQueue.IsAddingCompleted) { Func[] actionsCopy = actions; messageQueue.Add((Func)(async () => @@ -339,8 +343,8 @@ private void QueueActions(params Func[] actions) private void QueueActionsWithNoExceptions(params Func[] actions) { - if (actions != null && actions.Length != 0) - { + if (actions != null && actions.Length != 0 && !messageQueue.IsAddingCompleted) + { Func[] actionsCopy = actions; messageQueue.Add((Func)(async () => { @@ -384,7 +388,6 @@ private async Task InvokeDisconnected(IWebSocket socket) private async Task ReadTask() { ArraySegment receiveBuffer = new ArraySegment(new byte[receiveChunkSize]); - TimeSpan keepAlive = webSocket.KeepAliveInterval; MemoryStream stream = new MemoryStream(); WebSocketReceiveResult result; bool wasConnected = false; @@ -411,7 +414,7 @@ private async Task ReadTask() // for lists, etc. QueueActionsWithNoExceptions(InvokeConnected); - while (webSocket.State == WebSocketState.Open) + while (webSocket.State == WebSocketState.Open && !disposed) { do { @@ -420,7 +423,8 @@ private async Task ReadTask() { if (result.MessageType == WebSocketMessageType.Close) { - await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken); + await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, + string.Empty, new CancellationToken()); // if it's closing, then let it complete QueueActions(InvokeDisconnected); } else @@ -429,8 +433,8 @@ private async Task ReadTask() } } } - while (result != null && !result.EndOfMessage); - if (stream.Length != 0) + while (result != null && !result.EndOfMessage && !disposed); + if (stream.Length != 0 && !disposed) { // if text message and we are handling text messages if (result.MessageType == WebSocketMessageType.Text && OnTextMessage != null) @@ -450,11 +454,15 @@ private async Task ReadTask() } } } - catch (OperationCanceledException) - { - // dont care - } - catch (Exception ex) + catch (OperationCanceledException) // includes TaskCanceledException + { + // dont care + } + catch (IOException ex) + { + Logger.Info(ex.ToString()); + } + catch (Exception ex) { // eat exceptions, most likely a result of a disconnect, either way we will re-create the web socket Logger.Info(ex.ToString()); @@ -466,7 +474,8 @@ private async Task ReadTask() } try { - webSocket.Dispose(); + stream.Close(); + webSocket.Dispose(); } catch (Exception ex) {