From 76c2a4da6a7554885882d09594b74bcf98d713b4 Mon Sep 17 00:00:00 2001 From: Friedrich von Never Date: Sun, 7 Jul 2019 21:36:47 +0700 Subject: [PATCH] Async everything (#29) XMPP connection turned out to work in an asynchronous manner, thus I'm making small part of our stack asynchronous, too, to work around that. --- Emulsion.Tests/MessageSystemTests.fs | 10 ++++++++-- Emulsion/MessageSystem.fs | 27 ++++++++++++++------------ Emulsion/Telegram/Client.fs | 2 +- Emulsion/Xmpp/XmppClient.fs | 29 ++++++++++++++++++++++------ 4 files changed, 47 insertions(+), 21 deletions(-) diff --git a/Emulsion.Tests/MessageSystemTests.fs b/Emulsion.Tests/MessageSystemTests.fs index e6c6d57e..7d3da1ae 100644 --- a/Emulsion.Tests/MessageSystemTests.fs +++ b/Emulsion.Tests/MessageSystemTests.fs @@ -11,15 +11,21 @@ open Emulsion.MessageSystem let private performTest expectedStage runBody = use cts = new CancellationTokenSource() let mutable stage = 0 - let run() = + let run = async { stage <- stage + 1 runBody cts stage + } let context = { cooldown = TimeSpan.Zero logError = ignore logMessage = ignore } - MessageSystem.wrapRun context cts.Token run + + try + Async.RunSynchronously(MessageSystem.wrapRun context run, cancellationToken = cts.Token) + with + | :? OperationCanceledException -> () + Assert.Equal(expectedStage, stage) [] diff --git a/Emulsion/MessageSystem.fs b/Emulsion/MessageSystem.fs index 6fb27e44..48ec7442 100644 --- a/Emulsion/MessageSystem.fs +++ b/Emulsion/MessageSystem.fs @@ -20,16 +20,18 @@ type RestartContext = { logMessage: string -> unit } -let internal wrapRun (ctx: RestartContext) (token: CancellationToken) (run: unit -> unit) : unit = - while not token.IsCancellationRequested do - try - run() - with - | :? OperationCanceledException -> () - | ex -> - ctx.logError ex - ctx.logMessage <| sprintf "Waiting for %A to restart" ctx.cooldown - Thread.Sleep ctx.cooldown +let internal wrapRun (ctx: RestartContext) (runAsync: Async) : Async = + async { + while true do + try + do! runAsync + with + | :? OperationCanceledException -> return () + | ex -> + ctx.logError ex + ctx.logMessage <| sprintf "Waiting for %A to restart" ctx.cooldown + do! Async.Sleep(int ctx.cooldown.TotalMilliseconds) + } let putMessage (messageSystem: IMessageSystem) (message: OutgoingMessage) = messageSystem.PutMessage message @@ -44,7 +46,7 @@ type MessageSystemBase(ctx: RestartContext, cancellationToken: CancellationToken /// Starts the IM connection, manages reconnects. On cancellation could either throw OperationCanceledException or /// return a unit. - abstract member RunUntilError : IncomingMessageReceiver -> unit + abstract member RunUntilError : IncomingMessageReceiver -> Async /// Sends a message through the message system. Free-threaded. Could throw exceptions; if throws an exception, then /// will be restarted later. @@ -52,6 +54,7 @@ type MessageSystemBase(ctx: RestartContext, cancellationToken: CancellationToken interface IMessageSystem with member ms.Run receiver = - wrapRun ctx cancellationToken (fun () -> this.RunUntilError receiver) + Async.RunSynchronously (wrapRun ctx (this.RunUntilError receiver), cancellationToken = cancellationToken) + member __.PutMessage message = MessageSender.send sender message diff --git a/Emulsion/Telegram/Client.fs b/Emulsion/Telegram/Client.fs index 54c7d659..2364eec6 100644 --- a/Emulsion/Telegram/Client.fs +++ b/Emulsion/Telegram/Client.fs @@ -9,7 +9,7 @@ type Client(ctx: RestartContext, cancellationToken: CancellationToken, settings: inherit MessageSystemBase(ctx, cancellationToken) override __.RunUntilError receiver = - Funogram.run settings cancellationToken receiver + async { Funogram.run settings cancellationToken receiver } override __.Send message = Funogram.send settings message diff --git a/Emulsion/Xmpp/XmppClient.fs b/Emulsion/Xmpp/XmppClient.fs index a5a29104..535f281f 100644 --- a/Emulsion/Xmpp/XmppClient.fs +++ b/Emulsion/Xmpp/XmppClient.fs @@ -7,6 +7,7 @@ open System.Xml.Linq open SharpXMPP open SharpXMPP.XMPP +open System.Threading.Tasks open Emulsion open Emulsion.Settings @@ -44,15 +45,31 @@ let create (settings: XmppSettings): XmppClient = client.add_Presence(presenceHandler) client -let run (settings: XmppSettings) (client: XmppClient) (onMessage: Message -> unit): unit = +exception ConnectionFailedError of string + +let run (settings: XmppSettings) (client: XmppClient) (onMessage: Message -> unit): Async = printfn "Bot name: %s" client.Jid.FullJid let handler = messageHandler settings onMessage + let tcs = TaskCompletionSource() + let connectionFailedHandler = + XmppConnection.ConnectionFailedHandler( + fun _ error -> tcs.SetException(ConnectionFailedError error.Message) + ) + + async { + try + let! token = Async.CancellationToken + use _ = token.Register(fun () -> client.Close()) + + client.add_Message handler + client.add_ConnectionFailed connectionFailedHandler + client.Connect() - try - client.Connect() - client.add_Message handler - finally - client.remove_Message handler + do! Async.AwaitTask tcs.Task + finally + client.remove_ConnectionFailed connectionFailedHandler + client.remove_Message handler + } let send (settings: XmppSettings) (client: XmppClient) (message: Message): unit = let text = sprintf "<%s> %s" message.author message.text