Skip to content

Commit

Permalink
Add a message sender mailbox processor, wire it with the message syst…
Browse files Browse the repository at this point in the history
…em (#29)
  • Loading branch information
ForNeVeR committed Jun 13, 2019
1 parent bf0c80d commit f04df8f
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 20 deletions.
3 changes: 1 addition & 2 deletions Emulsion.Tests/MessageSystem.fs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@ let private performTest expectedStage runBody =
stage <- stage + 1
runBody cts ct stage
let context = {
token = cts.Token
cooldown = TimeSpan.Zero
logError = ignore
logMessage = ignore
}
MessageSystem.wrapRun context run
MessageSystem.wrapRun context cts.Token run
Assert.Equal(expectedStage, stage)

[<Fact>]
Expand Down
1 change: 1 addition & 0 deletions Emulsion/Emulsion.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
<Compile Include="AssemblyInfo.fs" />
<Compile Include="EmulsionSettings.fs" />
<Compile Include="Message.fs" />
<Compile Include="MessageSender.fs" />
<Compile Include="MessageSystem.fs" />
<Compile Include="Telegram\Html.fs" />
<Compile Include="Telegram\Funogram.fs" />
Expand Down
30 changes: 30 additions & 0 deletions Emulsion/MessageSender.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
module Emulsion.MessageSender

open System

type MessageSenderContext = {
send: OutgoingMessage -> Async<unit>
logError: Exception -> unit
cooldown: TimeSpan
}

let rec private sendRetryLoop ctx msg = async {
try
do! ctx.send msg
with
| ex ->
ctx.logError ex
do! Async.Sleep(int ctx.cooldown.TotalMilliseconds)
return! sendRetryLoop ctx msg
}

let activity(ctx: MessageSenderContext): MailboxProcessor<OutgoingMessage> = MailboxProcessor.Start(fun inbox ->
let rec loop() = async {
let! msg = inbox.Receive()
do! sendRetryLoop ctx msg
return! loop()
}
loop()
)

let send(activity: MailboxProcessor<OutgoingMessage>): OutgoingMessage -> unit = activity.Post
45 changes: 27 additions & 18 deletions Emulsion/MessageSystem.fs
Original file line number Diff line number Diff line change
@@ -1,45 +1,54 @@
module Emulsion.MessageSystem

open System
open System.Collections.Concurrent
open System.Threading

type IncomingMessageReceiver = IncomingMessage -> unit

/// The IM message queue. Manages the underlying connection, reconnects when necessary, stores the outgoing messages in
/// a queue and sends them when possible. Redirects the incoming messages to a function passed when starting the queue.
type IMessageSystem =
/// Starts the IM connection, manages reconnects. On cancellation could either throw OperationCanceledException or
/// return a unit.
abstract member Run : CancellationToken -> IncomingMessageReceiver -> unit
/// Starts the IM connection, manages reconnects. Never terminates unless cancelled.
abstract member Run : IncomingMessageReceiver -> CancellationToken -> unit

/// Queues the message to be sent to the IM system when possible.
abstract member PutMessage : OutgoingMessage -> unit

[<AbstractClass>]
type MessageSystemBase() =
let queue = ConcurrentQueue<OutgoingMessage>()
abstract member Run : CancellationToken -> ConcurrentQueue<OutgoingMessage> -> IncomingMessageReceiver -> unit
interface IMessageSystem with
member this.Run token receiver =
this.Run token queue receiver
member __.PutMessage message =
queue.Enqueue message

type RestartContext = {
token: CancellationToken
cooldown: TimeSpan
logError: Exception -> unit
logMessage: string -> unit
}

let wrapRun (ctx: RestartContext) (run: CancellationToken -> unit) : unit =
while not ctx.token.IsCancellationRequested do
let wrapRun (ctx: RestartContext) (token: CancellationToken) (run: CancellationToken -> unit) : unit =
while not token.IsCancellationRequested do
try
run ctx.token
run token
with
| :? OperationCanceledException -> ()
| ex ->
ctx.logError ex
ctx.logMessage <| sprintf "Waiting for %A to restart" ctx.cooldown
Thread.Sleep ctx.cooldown

[<AbstractClass>]
type MessageSystemBase(restartContext: RestartContext) as this =
let sender = MessageSender.activity {
send = this.Send
logError = restartContext.logError
cooldown = restartContext.cooldown
}

/// Starts the IM connection, manages reconnects. On cancellation could either throw OperationCanceledException or
/// return a unit.
abstract member Run : IncomingMessageReceiver -> CancellationToken -> unit

/// Sends a message through the message system. Free-threaded. Could throw exceptions; if throws an exception, then
/// will be restarted later.
abstract member Send : OutgoingMessage -> Async<unit>

interface IMessageSystem with
member ms.Run receiver token =
wrapRun restartContext token (this.Run receiver)
member __.PutMessage message =
MessageSender.send sender message

0 comments on commit f04df8f

Please sign in to comment.