Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IMessageSystem implementation for XMPP #39

Merged
merged 6 commits into from
Jul 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Emulsion.Tests/Actors/Core.fs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type CoreTests() as this =
let mutable actorsCreated = 0
let xmppActor = this.CreateTestProbe "xmpp"
let telegramActor = this.CreateTestProbe "telegram"
let testActorFactory _ _ name =
let testActorFactory _ name =
actorsCreated <- actorsCreated + 1
match name with
| "xmpp" -> xmppActor.Ref
Expand Down
50 changes: 0 additions & 50 deletions Emulsion.Tests/Actors/SyncTaskWatcher.fs

This file was deleted.

23 changes: 12 additions & 11 deletions Emulsion.Tests/Actors/Xmpp.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,22 @@ open Xunit

open Emulsion
open Emulsion.Actors
open Emulsion.Xmpp
open Emulsion.Tests
open Emulsion.MessageSystem

type XmppTest() =
inherit TestKit()

[<Fact>]
member this.``XMPP actor should pass an outgoing message to the XMPP module``() =
let sentMessage = ref ""
let xmppSettings = Settings.testConfiguration.xmpp
let xmpp : Client =
{ construct = (Client.sharpXmpp xmppSettings).construct
run = fun _ -> ()
send = fun _ msg -> sentMessage := msg }
let actor = Xmpp.spawn xmpp this.Sys this.TestActor "xmpp"
actor.Tell(OutgoingMessage { author = "@nickname"; text = "message" }, this.TestActor)
let mutable sentMessage = None
let xmpp = {
new IMessageSystem with
ForNeVeR marked this conversation as resolved.
Show resolved Hide resolved
member __.Run _ = ()
member __.PutMessage message =
sentMessage <- Some message
}
let actor = Xmpp.spawn xmpp this.Sys "xmpp"
let message = OutgoingMessage { author = "@nickname"; text = "message" }
actor.Tell(message, this.TestActor)
this.ExpectNoMsg()
Assert.Equal("<@nickname> message", !sentMessage)
Assert.Equal(Some message, sentMessage)
1 change: 0 additions & 1 deletion Emulsion.Tests/Emulsion.Tests.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
<Compile Include="MessageSenderTests.fs" />
<Compile Include="MessageSystemTests.fs" />
<Compile Include="Actors/Core.fs" />
<Compile Include="Actors/SyncTaskWatcher.fs" />
<Compile Include="Actors/Telegram.fs" />
<Compile Include="Actors/Xmpp.fs" />
<Compile Include="Xmpp\XmppMessageFactory.fs" />
Expand Down
10 changes: 8 additions & 2 deletions Emulsion.Tests/MessageSystemTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,21 @@ open Emulsion.MessageSystem
let private performTest expectedStage runBody =
use cts = new CancellationTokenSource()
let mutable stage = 0
let run() =
let run = async {
stage <- stage + 1
runBody cts stage
}
let context = {
cooldown = TimeSpan.Zero
logError = ignore
logMessage = ignore
}
MessageSystem.wrapRun context cts.Token run

try
Async.RunSynchronously(MessageSystem.wrapRun context run, cancellationToken = cts.Token)
with
| :? OperationCanceledException -> ()

Assert.Equal(expectedStage, stage)

[<Fact>]
Expand Down
4 changes: 2 additions & 2 deletions Emulsion.Tests/Xmpp/SharpXmppHelper.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ let ``parseMessage should extract message text and author``() =
let text = "text test"
let element = XmppMessageFactory.create("x@y/author", text)
let message = SharpXmppHelper.parseMessage element
let expected = XmppMessage { author = "author"; text = text }
let expected = { author = "author"; text = text }
Assert.Equal(expected, message)

[<Fact>]
let ``Message without author is attributed to [UNKNOWN USER]``() =
let text = "xxx"
let element = XmppMessageFactory.create(text = text)
let message = SharpXmppHelper.parseMessage element
let expected = XmppMessage { author = "[UNKNOWN USER]"; text = text }
let expected = { author = "[UNKNOWN USER]"; text = text }
Assert.Equal(expected, message)

[<Fact>]
Expand Down
2 changes: 1 addition & 1 deletion Emulsion/Actors/Core.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type CoreActor(factories : ActorFactories) as this =
let mutable telegram = Unchecked.defaultof<IActorRef>

member private this.spawn (factory : ActorFactory) name =
factory ActorBase.Context this.Self name
factory ActorBase.Context name

override this.PreStart() =
printfn "Starting Core actor..."
Expand Down
2 changes: 1 addition & 1 deletion Emulsion/Actors/Factories.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module Emulsion.Actors.Factories

open Akka.Actor

type ActorFactory = IActorRefFactory -> IActorRef -> string -> IActorRef
type ActorFactory = IActorRefFactory -> string -> IActorRef
type ActorFactories =
{ xmppFactory : ActorFactory
telegramFactory : ActorFactory }
25 changes: 0 additions & 25 deletions Emulsion/Actors/SyncTaskWatcher.fs

This file was deleted.

26 changes: 8 additions & 18 deletions Emulsion/Actors/Xmpp.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,16 @@ module Emulsion.Actors.Xmpp
open Akka.Actor

open Emulsion
open Emulsion.Xmpp
open Emulsion.MessageSystem

type XmppActor(core : IActorRef, xmpp : Client) as this =
inherit SyncTaskWatcher()
type XmppActor(xmpp: IMessageSystem) as this =
inherit ReceiveActor()
do printfn "Starting XMPP actor (%A)..." this.Self.Path
do this.Receive<OutgoingMessage>(this.OnMessage)
let robot = xmpp.construct core
do this.Receive<OutgoingMessage>(MessageSystem.putMessage xmpp)

override __.RunInTask() =
printfn "Starting XMPP connection..."
xmpp.run robot

member private __.OnMessage(OutgoingMessage { author = author; text = text }) : unit =
let msg = sprintf "<%s> %s" author text
xmpp.send robot msg

let spawn (xmpp : Client)
(factory : IActorRefFactory)
(core : IActorRef)
(name : string) =
let spawn (xmpp: IMessageSystem)
(factory: IActorRefFactory)
(name: string): IActorRef =
printfn "Spawning XMPP..."
let props = Props.Create<XmppActor>(core, xmpp)
let props = Props.Create<XmppActor>(xmpp)
factory.ActorOf(props, name)
1 change: 0 additions & 1 deletion Emulsion/Emulsion.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
<Compile Include="Xmpp/XmppClient.fs" />
<Compile Include="Xmpp\Client.fs" />
<Compile Include="Actors/Factories.fs" />
<Compile Include="Actors/SyncTaskWatcher.fs" />
<Compile Include="Actors/Core.fs" />
<Compile Include="Actors/Telegram.fs" />
<Compile Include="Actors/Xmpp.fs" />
Expand Down
33 changes: 18 additions & 15 deletions Emulsion/MessageSystem.fs
Original file line number Diff line number Diff line change
Expand Up @@ -20,38 +20,41 @@ type RestartContext = {
logMessage: string -> unit
}

let internal wrapRun (ctx: RestartContext) (token: CancellationToken) (run: unit -> unit) : unit =
while not token.IsCancellationRequested do
try
run()
with
| :? OperationCanceledException -> ()
| ex ->
ctx.logError ex
ctx.logMessage <| sprintf "Waiting for %A to restart" ctx.cooldown
Thread.Sleep ctx.cooldown
let internal wrapRun (ctx: RestartContext) (runAsync: Async<unit>) : Async<unit> =
async {
while true do
try
do! runAsync
with
| :? OperationCanceledException -> return ()
| ex ->
ctx.logError ex
ctx.logMessage <| sprintf "Waiting for %A to restart" ctx.cooldown
do! Async.Sleep(int ctx.cooldown.TotalMilliseconds)
}

let putMessage (messageSystem: IMessageSystem) (message: OutgoingMessage) =
messageSystem.PutMessage message

[<AbstractClass>]
type MessageSystemBase(restartContext: RestartContext, cancellationToken: CancellationToken) as this =
type MessageSystemBase(ctx: RestartContext, cancellationToken: CancellationToken) as this =
let sender = MessageSender.startActivity({
send = this.Send
logError = restartContext.logError
cooldown = restartContext.cooldown
logError = ctx.logError
cooldown = ctx.cooldown
}, cancellationToken)

/// Starts the IM connection, manages reconnects. On cancellation could either throw OperationCanceledException or
/// return a unit.
abstract member RunOnce : IncomingMessageReceiver -> unit
abstract member RunUntilError : IncomingMessageReceiver -> Async<unit>

/// Sends a message through the message system. Free-threaded. Could throw exceptions; if throws an exception, then
/// will be restarted later.
abstract member Send : OutgoingMessage -> Async<unit>

interface IMessageSystem with
member ms.Run receiver =
wrapRun restartContext cancellationToken (fun () -> this.RunOnce receiver)
Async.RunSynchronously (wrapRun ctx (this.RunUntilError receiver), cancellationToken = cancellationToken)

member __.PutMessage message =
MessageSender.send sender message
10 changes: 6 additions & 4 deletions Emulsion/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,20 @@ let private startApp config =
logMessage = logInfo
}
let! cancellationToken = Async.CancellationToken
let xmpp = Xmpp.Client.sharpXmpp config.xmpp
let xmpp = Xmpp.Client(restartContext, cancellationToken, config.xmpp)
let telegram = Telegram.Client(restartContext, cancellationToken, config.telegram)
let factories = { xmppFactory = Xmpp.spawn xmpp
telegramFactory = fun factory _ name -> Telegram.spawn telegram factory name } // TODO[F]: Change the architecture here so we don't need to ignore the `core` parameter.
telegramFactory = Telegram.spawn telegram }
printfn "Prepare Core..."
let core = Core.spawn factories system "core"
printfn "Starting message systems..."
let! telegram = startMessageSystem telegram (fun m -> core.Tell(TelegramMessage m))
let! telegramSystem = startMessageSystem telegram (fun m -> core.Tell(TelegramMessage m))
let! xmppSystem = startMessageSystem xmpp (fun m -> core.Tell(XmppMessage m))
printfn "Ready. Wait for termination..."
do! Async.AwaitTask system.WhenTerminated
printfn "Waiting for terminating of message systems..."
do! telegram
do! telegramSystem
do! xmppSystem
}

let private runApp app =
Expand Down
8 changes: 4 additions & 4 deletions Emulsion/Telegram/Client.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ open System.Threading
open Emulsion.MessageSystem
open Emulsion.Settings

type Client(restartContext: RestartContext, cancellationToken: CancellationToken, settings: TelegramSettings) =
inherit MessageSystemBase(restartContext, cancellationToken)
type Client(ctx: RestartContext, cancellationToken: CancellationToken, settings: TelegramSettings) =
inherit MessageSystemBase(ctx, cancellationToken)

override __.RunOnce receiver =
Funogram.run settings cancellationToken receiver
override __.RunUntilError receiver =
async { Funogram.run settings cancellationToken receiver }
ForNeVeR marked this conversation as resolved.
Show resolved Hide resolved

override __.Send message =
Funogram.send settings message
24 changes: 11 additions & 13 deletions Emulsion/Xmpp/Client.fs
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
namespace Emulsion.Xmpp

open Akka.Actor
open SharpXMPP
open System.Threading

open Emulsion
open Emulsion.MessageSystem
open Emulsion.Settings

type Client =
{ construct : IActorRef -> XmppClient
run : XmppClient -> unit
send : XmppClient -> string -> unit }
type Client(ctx: RestartContext, cancellationToken: CancellationToken, settings: XmppSettings) =
inherit MessageSystemBase(ctx, cancellationToken)
let client = XmppClient.create settings

with
static member private create settings (core : IActorRef) =
XmppClient.create settings core.Tell
override __.RunUntilError receiver =
XmppClient.run settings client receiver

static member sharpXmpp (settings : XmppSettings) : Client =
{ construct = Client.create settings
run = XmppClient.run
send = XmppClient.send settings }
override __.Send (OutgoingMessage message) = async {
return XmppClient.send settings client message
}
4 changes: 2 additions & 2 deletions Emulsion/Xmpp/SharpXmppHelper.fs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ let isHistoricalMessage (message : XMPPMessage) : bool =
|> Seq.isEmpty
)

let parseMessage (message : XMPPMessage) : IncomingMessage =
let parseMessage (message: XMPPMessage): Message =
let nickname =
getAttributeValue message From
|> Option.map getResource
|> Option.defaultValue "[UNKNOWN USER]"
XmppMessage { author = nickname; text = message.Text }
{ author = nickname; text = message.Text }
Loading