Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message system implementation for Telegram #34

Merged
merged 7 commits into from
Jul 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions Emulsion.Tests/Actors/Telegram.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,21 @@ open Xunit

open Emulsion
open Emulsion.Actors
open Emulsion.Telegram
open Emulsion.MessageSystem

type TelegramTest() =
inherit TestKit()

[<Fact>]
member this.``Telegram actor should pass an outgoing message to the Telegram module``() =
member this.``Telegram actor should pass an outgoing message to the Telegram client``() =
let mutable sentMessage = None
let telegram : Client =
{ run = fun _ -> ()
send = fun msg -> sentMessage <- Some msg }
let actor = Telegram.spawn telegram this.Sys this.TestActor "telegram"
let telegram = {
new IMessageSystem with
member __.Run _ = ()
member __.PutMessage message =
sentMessage <- Some message
}
let actor = Telegram.spawn telegram this.Sys "telegram"
let msg = OutgoingMessage { author = "x"; text = "message" }
actor.Tell(msg, this.TestActor)
this.ExpectNoMsg()
Expand Down
2 changes: 2 additions & 0 deletions Emulsion.Tests/Emulsion.Tests.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
</PropertyGroup>
<ItemGroup>
<Compile Include="Settings.fs" />
<Compile Include="MessageSenderTests.fs" />
<Compile Include="MessageSystemTests.fs" />
<Compile Include="Actors/Core.fs" />
<Compile Include="Actors/SyncTaskWatcher.fs" />
<Compile Include="Actors/Telegram.fs" />
Expand Down
59 changes: 59 additions & 0 deletions Emulsion.Tests/MessageSenderTests.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
module Emulsion.Tests.MessageSenderTests

open System
open System.Threading

open Xunit
open Emulsion
open Emulsion.MessageSender

let private testContext = {
send = fun _ -> async { return () }
logError = ignore
cooldown = TimeSpan.Zero
}

[<Fact>]
let ``Message sender sends the messages sequentially``() =
use cts = new CancellationTokenSource()
let messagesReceived = ResizeArray()
let context = {
testContext with
send = fun m -> async {
lock messagesReceived (fun () ->
messagesReceived.Add m
)
}
}
let sender = MessageSender.startActivity(context, cts.Token)

let messagesSent = [| 1..100 |] |> Array.map (fun i ->
OutgoingMessage {
author = "author"
text = string i
}
)
messagesSent |> Array.iter(MessageSender.send sender)

SpinWait.SpinUntil((fun () -> messagesReceived.Count = messagesSent.Length), TimeSpan.FromSeconds 30.0)
|> Assert.True

Assert.Equal(messagesSent, messagesReceived)

[<Fact>]
let ``Message sender should be cancellable``() =
use cts = new CancellationTokenSource()
let errors = ResizeArray()
let context = {
testContext with
send = fun _ -> failwith "Should not be called"
logError = fun e -> lock errors (fun () -> errors.Add e)
}
let sender = MessageSender.startActivity(context, cts.Token)
cts.Cancel()

let msg = OutgoingMessage { author = "author"; text = "xx" }
MessageSender.send sender msg

SpinWait.SpinUntil((fun () -> errors.Count > 0), TimeSpan.FromMilliseconds 100.0) |> ignore
Assert.Empty errors
46 changes: 46 additions & 0 deletions Emulsion.Tests/MessageSystemTests.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
module Emulsion.Tests.MessageSystemTests

open System
open System.Threading

open Xunit

open Emulsion
open Emulsion.MessageSystem

let private performTest expectedStage runBody =
use cts = new CancellationTokenSource()
let mutable stage = 0
let run() =
stage <- stage + 1
runBody cts stage
let context = {
cooldown = TimeSpan.Zero
logError = ignore
logMessage = ignore
}
MessageSystem.wrapRun context cts.Token run
Assert.Equal(expectedStage, stage)

[<Fact>]
let ``wrapRun should restart the activity on error``() =
performTest 2 (fun cts stage ->
match stage with
| 1 -> raise <| Exception()
| 2 -> cts.Cancel()
| _ -> failwith "Impossible"
)

[<Fact>]
let ``wrapRun should not restart on OperationCanceledException``() =
performTest 1 (fun cts _ ->
cts.Cancel()
cts.Token.ThrowIfCancellationRequested()
)

[<Fact>]
let ``wrapRun should not restart on token.Cancel()``() =
performTest 4 (fun cts stage ->
if stage > 3 then
cts.Cancel()
)
22 changes: 6 additions & 16 deletions Emulsion/Actors/Telegram.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,14 @@
open Akka.Actor

open Emulsion
open Emulsion.Telegram
open Emulsion.MessageSystem

type TelegramActor(core : IActorRef, telegram : Client) as this =
inherit SyncTaskWatcher()
type TelegramActor(telegram: IMessageSystem) as this =
inherit ReceiveActor()
do printfn "Starting Telegram actor (%A)..." this.Self.Path
do this.Receive<OutgoingMessage>(this.OnMessage)
do this.Receive<OutgoingMessage>(MessageSystem.putMessage telegram)

override this.RunInTask() =
printfn "Starting Telegram connection..."
telegram.run (fun message -> core.Tell(TelegramMessage message))

member private __.OnMessage message : unit =
telegram.send message

let spawn (telegram : Client)
(factory : IActorRefFactory)
(core : IActorRef)
(name : string) : IActorRef =
let spawn (telegram: IMessageSystem) (factory: IActorRefFactory) (name: string): IActorRef =
printfn "Spawning Telegram..."
let props = Props.Create<TelegramActor>(core, telegram)
let props = Props.Create<TelegramActor>(telegram)
factory.ActorOf(props, name)
2 changes: 2 additions & 0 deletions Emulsion/Emulsion.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
<Compile Include="AssemblyInfo.fs" />
<Compile Include="EmulsionSettings.fs" />
<Compile Include="Message.fs" />
<Compile Include="MessageSender.fs" />
<Compile Include="MessageSystem.fs" />
<Compile Include="Telegram\Html.fs" />
<Compile Include="Telegram\Funogram.fs" />
<Compile Include="Telegram\Client.fs" />
Expand Down
34 changes: 34 additions & 0 deletions Emulsion/MessageSender.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
module Emulsion.MessageSender

open System
open System.Threading

type MessageSenderContext = {
send: OutgoingMessage -> Async<unit>
logError: Exception -> unit
cooldown: TimeSpan
}

let rec private sendRetryLoop ctx msg = async {
try
do! ctx.send msg
with
| ex ->
ctx.logError ex
do! Async.Sleep(int ctx.cooldown.TotalMilliseconds)
return! sendRetryLoop ctx msg
}

type Sender = MailboxProcessor<OutgoingMessage>
let private receiver ctx (inbox: Sender) =
let rec loop() = async {
let! msg = inbox.Receive()
do! sendRetryLoop ctx msg
return! loop()
}
loop()

let startActivity(ctx: MessageSenderContext, token: CancellationToken): Sender =
MailboxProcessor.Start(receiver ctx, token)

let send(activity: Sender): OutgoingMessage -> unit = activity.Post
57 changes: 57 additions & 0 deletions Emulsion/MessageSystem.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
module Emulsion.MessageSystem

open System
open System.Threading

type IncomingMessageReceiver = Message -> unit

/// The IM message queue. Manages the underlying connection, reconnects when necessary, stores the outgoing messages in
/// a queue and sends them when possible. Redirects the incoming messages to a function passed when starting the queue.
type IMessageSystem =
/// Starts the IM connection, manages reconnects. Never terminates unless cancelled.
abstract member Run : IncomingMessageReceiver -> unit

/// Queues the message to be sent to the IM system when possible.
abstract member PutMessage : OutgoingMessage -> unit

type RestartContext = {
cooldown: TimeSpan
logError: Exception -> unit
logMessage: string -> unit
}

let internal wrapRun (ctx: RestartContext) (token: CancellationToken) (run: unit -> unit) : unit =
while not token.IsCancellationRequested do
try
run()
with
| :? OperationCanceledException -> ()
| ex ->
ctx.logError ex
ctx.logMessage <| sprintf "Waiting for %A to restart" ctx.cooldown
Thread.Sleep ctx.cooldown

let putMessage (messageSystem: IMessageSystem) (message: OutgoingMessage) =
messageSystem.PutMessage message

[<AbstractClass>]
type MessageSystemBase(restartContext: RestartContext, cancellationToken: CancellationToken) as this =
let sender = MessageSender.startActivity({
send = this.Send
logError = restartContext.logError
cooldown = restartContext.cooldown
}, cancellationToken)

/// Starts the IM connection, manages reconnects. On cancellation could either throw OperationCanceledException or
/// return a unit.
abstract member RunOnce : IncomingMessageReceiver -> unit

/// Sends a message through the message system. Free-threaded. Could throw exceptions; if throws an exception, then
/// will be restarted later.
abstract member Send : OutgoingMessage -> Async<unit>

interface IMessageSystem with
member ms.Run receiver =
wrapRun restartContext cancellationToken (fun () -> this.RunOnce receiver)
member __.PutMessage message =
MessageSender.send sender message
31 changes: 28 additions & 3 deletions Emulsion/Program.fs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
module Emulsion.Program

open System
open System.IO

open Akka.Actor
open Microsoft.Extensions.Configuration

open System.Threading
open Emulsion.Actors
open Emulsion.MessageSystem
open Emulsion.Settings

let private getConfiguration directory fileName =
Expand All @@ -16,19 +19,41 @@ let private getConfiguration directory fileName =
.Build()
Settings.read config

let private logError = printfn "ERROR: %A"
let private logInfo = printfn "INFO : %s"

let private startMessageSystem (system: IMessageSystem) receiver =
Async.StartChild <| async {
do! Async.SwitchToNewThread()
try
system.Run receiver
with
| ex -> logError ex
}

let private startApp config =
async {
printfn "Prepare system..."
use system = ActorSystem.Create("emulsion")
printfn "Prepare factories..."
let restartContext = {
cooldown = TimeSpan.FromSeconds(30.0) // TODO[F]: Customize through the config.
ForNeVeR marked this conversation as resolved.
Show resolved Hide resolved
logError = logError
logMessage = logInfo
}
let! cancellationToken = Async.CancellationToken
let xmpp = Xmpp.Client.sharpXmpp config.xmpp
let telegram = Telegram.Client.funogram config.telegram
let telegram = Telegram.Client(restartContext, cancellationToken, config.telegram)
let factories = { xmppFactory = Xmpp.spawn xmpp
telegramFactory = Telegram.spawn telegram }
telegramFactory = fun factory _ name -> Telegram.spawn telegram factory name } // TODO[F]: Change the architecture here so we don't need to ignore the `core` parameter.
ForNeVeR marked this conversation as resolved.
Show resolved Hide resolved
printfn "Prepare Core..."
ignore <| Core.spawn factories system "core"
let core = Core.spawn factories system "core"
printfn "Starting message systems..."
let! telegram = startMessageSystem telegram (fun m -> core.Tell(TelegramMessage m))
printfn "Ready. Wait for termination..."
do! Async.AwaitTask system.WhenTerminated
printfn "Waiting for terminating of message systems..."
do! telegram
}

let private runApp app =
Expand Down
18 changes: 10 additions & 8 deletions Emulsion/Telegram/Client.fs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
namespace Emulsion.Telegram

open Emulsion
open System.Threading

open Emulsion.MessageSystem
open Emulsion.Settings

type Client =
{ run : (Emulsion.Message -> unit) -> unit
send : OutgoingMessage -> unit }
type Client(restartContext: RestartContext, cancellationToken: CancellationToken, settings: TelegramSettings) =
inherit MessageSystemBase(restartContext, cancellationToken)

override __.RunOnce receiver =
Funogram.run settings cancellationToken receiver

with
static member funogram (settings : TelegramSettings) : Client =
{ run = Funogram.run settings
send = Funogram.send settings }
override __.Send message =
Funogram.send settings message
17 changes: 11 additions & 6 deletions Emulsion/Telegram/Funogram.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ open Funogram.Bot
open Funogram.Api
open Funogram.Types

open System.Threading
open Emulsion
open Emulsion.Settings

Expand Down Expand Up @@ -40,16 +41,20 @@ let private updateArrived onMessage (ctx : UpdateContext) =
let internal prepareHtmlMessage { author = author; text = text } : string =
sprintf "<b>%s</b>\n%s" (Html.escape author) (Html.escape text)

let send (settings : TelegramSettings) (OutgoingMessage content) : unit =
let send (settings : TelegramSettings) (OutgoingMessage content) : Async<unit> =
let sendHtmlMessage groupId text =
sendMessageBase groupId text (Some ParseMode.HTML) None None None None

let groupId = Int (int64 settings.groupId)
let message = prepareHtmlMessage content
api settings.token (sendHtmlMessage groupId message)
|> Async.RunSynchronously
|> processResult

let run (settings : TelegramSettings) (onMessage : Emulsion.Message -> unit) : unit =
async {
let! result = api settings.token (sendHtmlMessage groupId message)
return processResult result
}

let run (settings: TelegramSettings)
(cancellationToken: CancellationToken)
(onMessage: Emulsion.Message -> unit) : unit =
// TODO[F]: Update Funogram and don't ignore the cancellation token here.
ForNeVeR marked this conversation as resolved.
Show resolved Hide resolved
let config = { defaultConfig with Token = settings.token }
Bot.startBot config (updateArrived onMessage) None