Skip to content

Commit

Permalink
IMessageSystem implementation for XMPP (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
ForNeVeR committed Jul 8, 2019
1 parent d37e5c8 commit e9d3c00
Show file tree
Hide file tree
Showing 12 changed files with 64 additions and 67 deletions.
2 changes: 1 addition & 1 deletion Emulsion.Tests/Actors/Core.fs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type CoreTests() as this =
let mutable actorsCreated = 0
let xmppActor = this.CreateTestProbe "xmpp"
let telegramActor = this.CreateTestProbe "telegram"
let testActorFactory _ _ name =
let testActorFactory _ name =
actorsCreated <- actorsCreated + 1
match name with
| "xmpp" -> xmppActor.Ref
Expand Down
23 changes: 12 additions & 11 deletions Emulsion.Tests/Actors/Xmpp.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,22 @@ open Xunit

open Emulsion
open Emulsion.Actors
open Emulsion.Xmpp
open Emulsion.Tests
open Emulsion.MessageSystem

type XmppTest() =
inherit TestKit()

[<Fact>]
member this.``XMPP actor should pass an outgoing message to the XMPP module``() =
let sentMessage = ref ""
let xmppSettings = Settings.testConfiguration.xmpp
let xmpp : Client =
{ construct = (Client.sharpXmpp xmppSettings).construct
run = fun _ -> ()
send = fun _ msg -> sentMessage := msg }
let actor = Xmpp.spawn xmpp this.Sys this.TestActor "xmpp"
actor.Tell(OutgoingMessage { author = "@nickname"; text = "message" }, this.TestActor)
let mutable sentMessage = None
let xmpp : IMessageSystem = {
new IMessageSystem with
member __.Run _ = ()
member __.PutMessage message =
sentMessage <- Some message
}
let actor = Xmpp.spawn xmpp this.Sys "xmpp"
let message = OutgoingMessage { author = "@nickname"; text = "message" }
actor.Tell(message, this.TestActor)
this.ExpectNoMsg()
Assert.Equal("<@nickname> message", !sentMessage)
Assert.Equal(Some message, sentMessage)
4 changes: 2 additions & 2 deletions Emulsion.Tests/Xmpp/SharpXmppHelper.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ let ``parseMessage should extract message text and author``() =
let text = "text test"
let element = XmppMessageFactory.create("x@y/author", text)
let message = SharpXmppHelper.parseMessage element
let expected = XmppMessage { author = "author"; text = text }
let expected = { author = "author"; text = text }
Assert.Equal(expected, message)

[<Fact>]
let ``Message without author is attributed to [UNKNOWN USER]``() =
let text = "xxx"
let element = XmppMessageFactory.create(text = text)
let message = SharpXmppHelper.parseMessage element
let expected = XmppMessage { author = "[UNKNOWN USER]"; text = text }
let expected = { author = "[UNKNOWN USER]"; text = text }
Assert.Equal(expected, message)

[<Fact>]
Expand Down
2 changes: 1 addition & 1 deletion Emulsion/Actors/Core.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type CoreActor(factories : ActorFactories) as this =
let mutable telegram = Unchecked.defaultof<IActorRef>

member private this.spawn (factory : ActorFactory) name =
factory ActorBase.Context this.Self name
factory ActorBase.Context name

override this.PreStart() =
printfn "Starting Core actor..."
Expand Down
2 changes: 1 addition & 1 deletion Emulsion/Actors/Factories.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module Emulsion.Actors.Factories

open Akka.Actor

type ActorFactory = IActorRefFactory -> IActorRef -> string -> IActorRef
type ActorFactory = IActorRefFactory -> string -> IActorRef
type ActorFactories =
{ xmppFactory : ActorFactory
telegramFactory : ActorFactory }
26 changes: 8 additions & 18 deletions Emulsion/Actors/Xmpp.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,16 @@ module Emulsion.Actors.Xmpp
open Akka.Actor

open Emulsion
open Emulsion.Xmpp
open Emulsion.MessageSystem

type XmppActor(core : IActorRef, xmpp : Client) as this =
inherit SyncTaskWatcher()
type XmppActor(xmpp: IMessageSystem) as this =
inherit ReceiveActor()
do printfn "Starting XMPP actor (%A)..." this.Self.Path
do this.Receive<OutgoingMessage>(this.OnMessage)
let robot = xmpp.construct core
do this.Receive<OutgoingMessage>(MessageSystem.putMessage xmpp)

override __.RunInTask() =
printfn "Starting XMPP connection..."
xmpp.run robot

member private __.OnMessage(OutgoingMessage { author = author; text = text }) : unit =
let msg = sprintf "<%s> %s" author text
xmpp.send robot msg

let spawn (xmpp : Client)
(factory : IActorRefFactory)
(core : IActorRef)
(name : string) =
let spawn (xmpp: IMessageSystem)
(factory: IActorRefFactory)
(name: string) =
printfn "Spawning XMPP..."
let props = Props.Create<XmppActor>(core, xmpp)
let props = Props.Create<XmppActor>(xmpp)
factory.ActorOf(props, name)
10 changes: 5 additions & 5 deletions Emulsion/MessageSystem.fs
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,23 @@ let putMessage (messageSystem: IMessageSystem) (message: OutgoingMessage) =
messageSystem.PutMessage message

[<AbstractClass>]
type MessageSystemBase(restartContext: RestartContext, cancellationToken: CancellationToken) as this =
type MessageSystemBase(ctx: RestartContext, cancellationToken: CancellationToken) as this =
let sender = MessageSender.startActivity({
send = this.Send
logError = restartContext.logError
cooldown = restartContext.cooldown
logError = ctx.logError
cooldown = ctx.cooldown
}, cancellationToken)

/// Starts the IM connection, manages reconnects. On cancellation could either throw OperationCanceledException or
/// return a unit.
abstract member RunOnce : IncomingMessageReceiver -> unit
abstract member RunUntilError : IncomingMessageReceiver -> unit

/// Sends a message through the message system. Free-threaded. Could throw exceptions; if throws an exception, then
/// will be restarted later.
abstract member Send : OutgoingMessage -> Async<unit>

interface IMessageSystem with
member ms.Run receiver =
wrapRun restartContext cancellationToken (fun () -> this.RunOnce receiver)
wrapRun ctx cancellationToken (fun () -> this.RunUntilError receiver)
member __.PutMessage message =
MessageSender.send sender message
10 changes: 6 additions & 4 deletions Emulsion/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,20 @@ let private startApp config =
logMessage = logInfo
}
let! cancellationToken = Async.CancellationToken
let xmpp = Xmpp.Client.sharpXmpp config.xmpp
let xmpp = Xmpp.Client(restartContext, cancellationToken, config.xmpp)
let telegram = Telegram.Client(restartContext, cancellationToken, config.telegram)
let factories = { xmppFactory = Xmpp.spawn xmpp
telegramFactory = fun factory _ name -> Telegram.spawn telegram factory name } // TODO[F]: Change the architecture here so we don't need to ignore the `core` parameter.
telegramFactory = Telegram.spawn telegram }
printfn "Prepare Core..."
let core = Core.spawn factories system "core"
printfn "Starting message systems..."
let! telegram = startMessageSystem telegram (fun m -> core.Tell(TelegramMessage m))
let! telegramSystem = startMessageSystem telegram (fun m -> core.Tell(TelegramMessage m))
let! xmppSystem = startMessageSystem xmpp (fun m -> core.Tell(XmppMessage m))
printfn "Ready. Wait for termination..."
do! Async.AwaitTask system.WhenTerminated
printfn "Waiting for terminating of message systems..."
do! telegram
do! telegramSystem
do! xmppSystem
}

let private runApp app =
Expand Down
6 changes: 3 additions & 3 deletions Emulsion/Telegram/Client.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ open System.Threading
open Emulsion.MessageSystem
open Emulsion.Settings

type Client(restartContext: RestartContext, cancellationToken: CancellationToken, settings: TelegramSettings) =
inherit MessageSystemBase(restartContext, cancellationToken)
type Client(ctx: RestartContext, cancellationToken: CancellationToken, settings: TelegramSettings) =
inherit MessageSystemBase(ctx, cancellationToken)

override __.RunOnce receiver =
override __.RunUntilError receiver =
Funogram.run settings cancellationToken receiver

override __.Send message =
Expand Down
24 changes: 11 additions & 13 deletions Emulsion/Xmpp/Client.fs
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
namespace Emulsion.Xmpp

open Akka.Actor
open SharpXMPP
open System.Threading

open Emulsion
open Emulsion.MessageSystem
open Emulsion.Settings

type Client =
{ construct : IActorRef -> XmppClient
run : XmppClient -> unit
send : XmppClient -> string -> unit }
type Client(ctx: RestartContext, cancellationToken: CancellationToken, settings: XmppSettings) as this =
inherit MessageSystemBase(ctx, cancellationToken)
let client = XmppClient.create settings

with
static member private create settings (core : IActorRef) =
XmppClient.create settings core.Tell
override __.RunUntilError receiver =
XmppClient.run settings client receiver

static member sharpXmpp (settings : XmppSettings) : Client =
{ construct = Client.create settings
run = XmppClient.run
send = XmppClient.send settings }
override __.Send (OutgoingMessage message) = async {
return XmppClient.send settings client message
}
4 changes: 2 additions & 2 deletions Emulsion/Xmpp/SharpXmppHelper.fs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ let isHistoricalMessage (message : XMPPMessage) : bool =
|> Seq.isEmpty
)

let parseMessage (message : XMPPMessage) : IncomingMessage =
let parseMessage (message: XMPPMessage): Message =
let nickname =
getAttributeValue message From
|> Option.map getResource
|> Option.defaultValue "[UNKNOWN USER]"
XmppMessage { author = nickname; text = message.Text }
{ author = nickname; text = message.Text }
18 changes: 12 additions & 6 deletions Emulsion/Xmpp/XmppClient.fs
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,25 @@ let private elementHandler = XmppConnection.ElementHandler(fun s e ->
let private presenceHandler = XmppConnection.PresenceHandler(fun s e ->
printfn "[P]: %A" e)

let create (settings : XmppSettings) (onMessage : IncomingMessage -> unit) : XmppClient =
let create (settings: XmppSettings): XmppClient =
let client = XmppClient(JID(settings.login), settings.password)
client.add_ConnectionFailed(connectionFailedHandler)
client.add_SignedIn(signedInHandler settings client)
client.add_Message(messageHandler settings onMessage)
client.add_Element(elementHandler)
client.add_Presence(presenceHandler)
client

let run (client : XmppClient) =
let run (settings: XmppSettings) (client: XmppClient) (onMessage: Message -> unit): unit =
printfn "Bot name: %s" client.Jid.FullJid
client.Connect()
let handler = messageHandler settings onMessage

let send (settings : XmppSettings) (client : XmppClient) (message : string) : unit =
SharpXmppHelper.message settings.room message
try
client.Connect()
client.add_Message handler
finally
client.remove_Message handler

let send (settings: XmppSettings) (client: XmppClient) (message: Message): unit =
let text = sprintf "<%s> %s" message.author message.text
SharpXmppHelper.message settings.room text
|> client.Send

0 comments on commit e9d3c00

Please sign in to comment.