Skip to content

Commit

Permalink
Brand new IMessageSystem-based Telegram client (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
ForNeVeR committed Jun 15, 2019
1 parent f04df8f commit d4b0a12
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 38 deletions.
15 changes: 9 additions & 6 deletions Emulsion.Tests/Actors/Telegram.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,21 @@ open Xunit

open Emulsion
open Emulsion.Actors
open Emulsion.Telegram
open Emulsion.MessageSystem

type TelegramTest() =
inherit TestKit()

[<Fact>]
member this.``Telegram actor should pass an outgoing message to the Telegram module``() =
member this.``Telegram actor should pass an outgoing message to the Telegram client``() =
let mutable sentMessage = None
let telegram : Client =
{ run = fun _ -> ()
send = fun msg -> sentMessage <- Some msg }
let actor = Telegram.spawn telegram this.Sys this.TestActor "telegram"
let telegram = {
new IMessageSystem with
member __.Run _ _ = ()
member __.PutMessage message =
sentMessage <- Some message
}
let actor = Telegram.spawn telegram this.Sys "telegram"
let msg = OutgoingMessage { author = "x"; text = "message" }
actor.Tell(msg, this.TestActor)
this.ExpectNoMsg()
Expand Down
22 changes: 6 additions & 16 deletions Emulsion/Actors/Telegram.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,14 @@
open Akka.Actor

open Emulsion
open Emulsion.Telegram
open Emulsion.MessageSystem

type TelegramActor(core : IActorRef, telegram : Client) as this =
inherit SyncTaskWatcher()
type TelegramActor(telegram: IMessageSystem) as this =
inherit ReceiveActor()
do printfn "Starting Telegram actor (%A)..." this.Self.Path
do this.Receive<OutgoingMessage>(this.OnMessage)
do this.Receive<OutgoingMessage>(MessageSystem.putMessage telegram)

override this.RunInTask() =
printfn "Starting Telegram connection..."
telegram.run (fun message -> core.Tell(TelegramMessage message))

member private __.OnMessage message : unit =
telegram.send message

let spawn (telegram : Client)
(factory : IActorRefFactory)
(core : IActorRef)
(name : string) : IActorRef =
let spawn (telegram: IMessageSystem) (factory: IActorRefFactory) (name: string): IActorRef =
printfn "Spawning Telegram..."
let props = Props.Create<TelegramActor>(core, telegram)
let props = Props.Create<TelegramActor>(telegram)
factory.ActorOf(props, name)
7 changes: 5 additions & 2 deletions Emulsion/MessageSystem.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module Emulsion.MessageSystem
open System
open System.Threading

type IncomingMessageReceiver = IncomingMessage -> unit
type IncomingMessageReceiver = Message -> unit

/// The IM message queue. Manages the underlying connection, reconnects when necessary, stores the outgoing messages in
/// a queue and sends them when possible. Redirects the incoming messages to a function passed when starting the queue.
Expand All @@ -20,7 +20,7 @@ type RestartContext = {
logMessage: string -> unit
}

let wrapRun (ctx: RestartContext) (token: CancellationToken) (run: CancellationToken -> unit) : unit =
let internal wrapRun (ctx: RestartContext) (token: CancellationToken) (run: CancellationToken -> unit) : unit =
while not token.IsCancellationRequested do
try
run token
Expand All @@ -31,6 +31,9 @@ let wrapRun (ctx: RestartContext) (token: CancellationToken) (run: CancellationT
ctx.logMessage <| sprintf "Waiting for %A to restart" ctx.cooldown
Thread.Sleep ctx.cooldown

let putMessage (messageSystem: IMessageSystem) (message: OutgoingMessage) =
messageSystem.PutMessage message

[<AbstractClass>]
type MessageSystemBase(restartContext: RestartContext) as this =
let sender = MessageSender.activity {
Expand Down
14 changes: 12 additions & 2 deletions Emulsion/Program.fs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
module Emulsion.Program

open System
open System.IO

open Akka.Actor
open Microsoft.Extensions.Configuration

open Emulsion.Actors
open Emulsion.MessageSystem
open Emulsion.Settings

let private getConfiguration directory fileName =
Expand All @@ -16,15 +18,23 @@ let private getConfiguration directory fileName =
.Build()
Settings.read config

let private logError = printfn "ERROR: %A"
let private logInfo = printfn "INFO : %s"

let private startApp config =
async {
printfn "Prepare system..."
use system = ActorSystem.Create("emulsion")
printfn "Prepare factories..."
let restartContext = {
cooldown = TimeSpan.FromSeconds(30.0) // TODO[F]: Customize through the config.
logError = logError
logMessage = logInfo
}
let xmpp = Xmpp.Client.sharpXmpp config.xmpp
let telegram = Telegram.Client.funogram config.telegram
let telegram = Telegram.Client(restartContext, config.telegram)
let factories = { xmppFactory = Xmpp.spawn xmpp
telegramFactory = Telegram.spawn telegram }
telegramFactory = fun factory _ name -> Telegram.spawn telegram factory name } // TODO[F]: Change the architecture here so we don't need to ignore the `core` parameter.
printfn "Prepare Core..."
ignore <| Core.spawn factories system "core"
printfn "Ready. Wait for termination..."
Expand Down
17 changes: 9 additions & 8 deletions Emulsion/Telegram/Client.fs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
namespace Emulsion.Telegram

open Emulsion
open Emulsion.MessageSystem
open Emulsion.Settings

type Client =
{ run : (Emulsion.Message -> unit) -> unit
send : OutgoingMessage -> unit }
type Client(restartContext: RestartContext, settings: TelegramSettings) =
inherit MessageSystemBase(restartContext)

with
static member funogram (settings : TelegramSettings) : Client =
{ run = Funogram.run settings
send = Funogram.send settings }
override __.Run receiver _ =
// TODO[F]: Update Funogram and don't ignore the cancellation token here.
Funogram.run settings receiver

override __.Send message =
Funogram.send settings message
9 changes: 5 additions & 4 deletions Emulsion/Telegram/Funogram.fs
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,16 @@ let private updateArrived onMessage (ctx : UpdateContext) =
let internal prepareHtmlMessage { author = author; text = text } : string =
sprintf "<b>%s</b>\n%s" (Html.escape author) (Html.escape text)

let send (settings : TelegramSettings) (OutgoingMessage content) : unit =
let send (settings : TelegramSettings) (OutgoingMessage content) : Async<unit> =
let sendHtmlMessage groupId text =
sendMessageBase groupId text (Some ParseMode.HTML) None None None None

let groupId = Int (int64 settings.groupId)
let message = prepareHtmlMessage content
api settings.token (sendHtmlMessage groupId message)
|> Async.RunSynchronously
|> processResult
async {
let! result = api settings.token (sendHtmlMessage groupId message)
return processResult result
}

let run (settings : TelegramSettings) (onMessage : Emulsion.Message -> unit) : unit =
let config = { defaultConfig with Token = settings.token }
Expand Down

0 comments on commit d4b0a12

Please sign in to comment.