Skip to content

Commit

Permalink
Start the new Telegram client (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
ForNeVeR committed Jun 15, 2019
1 parent d4b0a12 commit 9ceae8b
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 27 deletions.
2 changes: 1 addition & 1 deletion Emulsion.Tests/Actors/Telegram.fs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type TelegramTest() =
let mutable sentMessage = None
let telegram = {
new IMessageSystem with
member __.Run _ _ = ()
member __.Run _ = ()
member __.PutMessage message =
sentMessage <- Some message
}
Expand Down
12 changes: 6 additions & 6 deletions Emulsion.Tests/MessageSystem.fs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ open Emulsion.MessageSystem
let private performTest expectedStage runBody =
use cts = new CancellationTokenSource()
let mutable stage = 0
let run ct =
let run() =
stage <- stage + 1
runBody cts ct stage
runBody cts stage
let context = {
cooldown = TimeSpan.Zero
logError = ignore
Expand All @@ -24,7 +24,7 @@ let private performTest expectedStage runBody =

[<Fact>]
let ``wrapRun should restart the activity on error``() =
performTest 2 (fun cts _ stage ->
performTest 2 (fun cts stage ->
match stage with
| 1 -> raise <| Exception()
| 2 -> cts.Cancel()
Expand All @@ -33,14 +33,14 @@ let ``wrapRun should restart the activity on error``() =

[<Fact>]
let ``wrapRun should not restart on OperationCanceledException``() =
performTest 1 (fun cts ct _ ->
performTest 1 (fun cts _ ->
cts.Cancel()
ct.ThrowIfCancellationRequested()
cts.Token.ThrowIfCancellationRequested()
)

[<Fact>]
let ``wrapRun should not restart on token.Cancel()``() =
performTest 4 (fun cts _ stage ->
performTest 4 (fun cts stage ->
if stage > 3 then
cts.Cancel()
)
11 changes: 8 additions & 3 deletions Emulsion/MessageSender.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module Emulsion.MessageSender

open System
open System.Threading

type MessageSenderContext = {
send: OutgoingMessage -> Async<unit>
Expand All @@ -18,13 +19,17 @@ let rec private sendRetryLoop ctx msg = async {
return! sendRetryLoop ctx msg
}

let activity(ctx: MessageSenderContext): MailboxProcessor<OutgoingMessage> = MailboxProcessor.Start(fun inbox ->
type Sender = MailboxProcessor<OutgoingMessage>
let private receiver ctx (inbox: Sender) =
let rec loop() = async {
let! msg = inbox.Receive()
do! sendRetryLoop ctx msg
return! loop()
}
loop()
)

let send(activity: MailboxProcessor<OutgoingMessage>): OutgoingMessage -> unit = activity.Post
let startActivity(ctx: MessageSenderContext, token: CancellationToken): Sender =
MailboxProcessor.Start(receiver ctx, token)

let send(activity: Sender): OutgoingMessage -> unit = activity.Post
// TODO[F]: Tests for this module.
18 changes: 9 additions & 9 deletions Emulsion/MessageSystem.fs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ type IncomingMessageReceiver = Message -> unit
/// a queue and sends them when possible. Redirects the incoming messages to a function passed when starting the queue.
type IMessageSystem =
/// Starts the IM connection, manages reconnects. Never terminates unless cancelled.
abstract member Run : IncomingMessageReceiver -> CancellationToken -> unit
abstract member Run : IncomingMessageReceiver -> unit

/// Queues the message to be sent to the IM system when possible.
abstract member PutMessage : OutgoingMessage -> unit
Expand All @@ -20,10 +20,10 @@ type RestartContext = {
logMessage: string -> unit
}

let internal wrapRun (ctx: RestartContext) (token: CancellationToken) (run: CancellationToken -> unit) : unit =
let internal wrapRun (ctx: RestartContext) (token: CancellationToken) (run: unit -> unit) : unit =
while not token.IsCancellationRequested do
try
run token
run()
with
| :? OperationCanceledException -> ()
| ex ->
Expand All @@ -35,23 +35,23 @@ let putMessage (messageSystem: IMessageSystem) (message: OutgoingMessage) =
messageSystem.PutMessage message

[<AbstractClass>]
type MessageSystemBase(restartContext: RestartContext) as this =
let sender = MessageSender.activity {
type MessageSystemBase(restartContext: RestartContext, cancellationToken: CancellationToken) as this =
let sender = MessageSender.startActivity({
send = this.Send
logError = restartContext.logError
cooldown = restartContext.cooldown
}
}, cancellationToken)

/// Starts the IM connection, manages reconnects. On cancellation could either throw OperationCanceledException or
/// return a unit.
abstract member Run : IncomingMessageReceiver -> CancellationToken -> unit
abstract member RunOnce : IncomingMessageReceiver -> unit

/// Sends a message through the message system. Free-threaded. Could throw exceptions; if throws an exception, then
/// will be restarted later.
abstract member Send : OutgoingMessage -> Async<unit>

interface IMessageSystem with
member ms.Run receiver token =
wrapRun restartContext token (this.Run receiver)
member ms.Run receiver =
wrapRun restartContext cancellationToken (fun () -> this.RunOnce receiver)
member __.PutMessage message =
MessageSender.send sender message
19 changes: 17 additions & 2 deletions Emulsion/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ open System.IO
open Akka.Actor
open Microsoft.Extensions.Configuration

open System.Threading
open Emulsion.Actors
open Emulsion.MessageSystem
open Emulsion.Settings
Expand All @@ -21,6 +22,15 @@ let private getConfiguration directory fileName =
let private logError = printfn "ERROR: %A"
let private logInfo = printfn "INFO : %s"

let private startMessageSystem (system: IMessageSystem) receiver =
Async.StartChild <| async {
do! Async.SwitchToNewThread()
try
system.Run receiver
with
| ex -> logError ex
}

let private startApp config =
async {
printfn "Prepare system..."
Expand All @@ -31,14 +41,19 @@ let private startApp config =
logError = logError
logMessage = logInfo
}
let! cancellationToken = Async.CancellationToken
let xmpp = Xmpp.Client.sharpXmpp config.xmpp
let telegram = Telegram.Client(restartContext, config.telegram)
let telegram = Telegram.Client(restartContext, cancellationToken, config.telegram)
let factories = { xmppFactory = Xmpp.spawn xmpp
telegramFactory = fun factory _ name -> Telegram.spawn telegram factory name } // TODO[F]: Change the architecture here so we don't need to ignore the `core` parameter.
printfn "Prepare Core..."
ignore <| Core.spawn factories system "core"
let core = Core.spawn factories system "core"
printfn "Starting message systems..."
let! telegram = startMessageSystem telegram (fun m -> core.Tell(TelegramMessage m))
printfn "Ready. Wait for termination..."
do! Async.AwaitTask system.WhenTerminated
printfn "Waiting for terminating of message systems..."
do! telegram
}

let private runApp app =
Expand Down
11 changes: 6 additions & 5 deletions Emulsion/Telegram/Client.fs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
namespace Emulsion.Telegram

open System.Threading

open Emulsion.MessageSystem
open Emulsion.Settings

type Client(restartContext: RestartContext, settings: TelegramSettings) =
inherit MessageSystemBase(restartContext)
type Client(restartContext: RestartContext, cancellationToken: CancellationToken, settings: TelegramSettings) =
inherit MessageSystemBase(restartContext, cancellationToken)

override __.Run receiver _ =
// TODO[F]: Update Funogram and don't ignore the cancellation token here.
Funogram.run settings receiver
override __.RunOnce receiver =
Funogram.run settings cancellationToken receiver

override __.Send message =
Funogram.send settings message
6 changes: 5 additions & 1 deletion Emulsion/Telegram/Funogram.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ open Funogram.Bot
open Funogram.Api
open Funogram.Types

open System.Threading
open Emulsion
open Emulsion.Settings

Expand Down Expand Up @@ -51,6 +52,9 @@ let send (settings : TelegramSettings) (OutgoingMessage content) : Async<unit> =
return processResult result
}

let run (settings : TelegramSettings) (onMessage : Emulsion.Message -> unit) : unit =
let run (settings: TelegramSettings)
(cancellationToken: CancellationToken)
(onMessage: Emulsion.Message -> unit) : unit =
// TODO[F]: Update Funogram and don't ignore the cancellation token here.
let config = { defaultConfig with Token = settings.token }
Bot.startBot config (updateArrived onMessage) None

0 comments on commit 9ceae8b

Please sign in to comment.