Skip to content

Commit

Permalink
Async everything (#29)
Browse files Browse the repository at this point in the history
XMPP connection turned out to work in an asynchronous manner, thus I'm
making small part of our stack asynchronous, too, to work around that.
  • Loading branch information
ForNeVeR committed Jul 7, 2019
1 parent 7c2282b commit 5ff2e4d
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 21 deletions.
10 changes: 8 additions & 2 deletions Emulsion.Tests/MessageSystemTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,21 @@ open Emulsion.MessageSystem
let private performTest expectedStage runBody =
use cts = new CancellationTokenSource()
let mutable stage = 0
let run() =
let run = async {
stage <- stage + 1
runBody cts stage
}
let context = {
cooldown = TimeSpan.Zero
logError = ignore
logMessage = ignore
}
MessageSystem.wrapRun context cts.Token run

try
Async.RunSynchronously(MessageSystem.wrapRun context run, cancellationToken = cts.Token)
with
| :? OperationCanceledException -> ()

Assert.Equal(expectedStage, stage)

[<Fact>]
Expand Down
27 changes: 15 additions & 12 deletions Emulsion/MessageSystem.fs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,18 @@ type RestartContext = {
logMessage: string -> unit
}

let internal wrapRun (ctx: RestartContext) (token: CancellationToken) (run: unit -> unit) : unit =
while not token.IsCancellationRequested do
try
run()
with
| :? OperationCanceledException -> ()
| ex ->
ctx.logError ex
ctx.logMessage <| sprintf "Waiting for %A to restart" ctx.cooldown
Thread.Sleep ctx.cooldown
let internal wrapRun (ctx: RestartContext) (runAsync: Async<unit>) : Async<unit> =
async {
while true do
try
do! runAsync
with
| :? OperationCanceledException -> return ()
| ex ->
ctx.logError ex
ctx.logMessage <| sprintf "Waiting for %A to restart" ctx.cooldown
do! Async.Sleep(int ctx.cooldown.TotalMilliseconds)
}

let putMessage (messageSystem: IMessageSystem) (message: OutgoingMessage) =
messageSystem.PutMessage message
Expand All @@ -44,14 +46,15 @@ type MessageSystemBase(ctx: RestartContext, cancellationToken: CancellationToken

/// Starts the IM connection, manages reconnects. On cancellation could either throw OperationCanceledException or
/// return a unit.
abstract member RunUntilError : IncomingMessageReceiver -> unit
abstract member RunUntilError : IncomingMessageReceiver -> Async<unit>

/// Sends a message through the message system. Free-threaded. Could throw exceptions; if throws an exception, then
/// will be restarted later.
abstract member Send : OutgoingMessage -> Async<unit>

interface IMessageSystem with
member ms.Run receiver =
wrapRun ctx cancellationToken (fun () -> this.RunUntilError receiver)
Async.RunSynchronously (wrapRun ctx (this.RunUntilError receiver), cancellationToken = cancellationToken)

member __.PutMessage message =
MessageSender.send sender message
2 changes: 1 addition & 1 deletion Emulsion/Telegram/Client.fs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ type Client(ctx: RestartContext, cancellationToken: CancellationToken, settings:
inherit MessageSystemBase(ctx, cancellationToken)

override __.RunUntilError receiver =
Funogram.run settings cancellationToken receiver
async { Funogram.run settings cancellationToken receiver }

override __.Send message =
Funogram.send settings message
29 changes: 23 additions & 6 deletions Emulsion/Xmpp/XmppClient.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ open System.Xml.Linq
open SharpXMPP
open SharpXMPP.XMPP

open System.Threading.Tasks
open Emulsion
open Emulsion.Settings

Expand Down Expand Up @@ -44,15 +45,31 @@ let create (settings: XmppSettings): XmppClient =
client.add_Presence(presenceHandler)
client

let run (settings: XmppSettings) (client: XmppClient) (onMessage: Message -> unit): unit =
exception ConnectionFailedError of string

let run (settings: XmppSettings) (client: XmppClient) (onMessage: Message -> unit): Async<unit> =
printfn "Bot name: %s" client.Jid.FullJid
let handler = messageHandler settings onMessage
let tcs = TaskCompletionSource()
let connectionFailedHandler =
XmppConnection.ConnectionFailedHandler(
fun _ error -> tcs.SetException(ConnectionFailedError error.Message)
)

async {
try
let! token = Async.CancellationToken
use _ = token.Register(fun () -> client.Close())

client.add_Message handler
client.add_ConnectionFailed connectionFailedHandler
client.Connect()

try
client.Connect()
client.add_Message handler
finally
client.remove_Message handler
do! Async.AwaitTask tcs.Task
finally
client.remove_ConnectionFailed connectionFailedHandler
client.remove_Message handler
}

let send (settings: XmppSettings) (client: XmppClient) (message: Message): unit =
let text = sprintf "<%s> %s" message.author message.text
Expand Down

0 comments on commit 5ff2e4d

Please sign in to comment.