Skip to content

Commit

Permalink
Merge pull request #16 from j-alexander/binary_websocket
Browse files Browse the repository at this point in the history
add Binary(byte[]) WebSocket support
  • Loading branch information
j-alexander authored Apr 18, 2019
2 parents 2940aaa + c8647de commit 006c482
Show file tree
Hide file tree
Showing 8 changed files with 842 additions and 461 deletions.
124 changes: 124 additions & 0 deletions Nata.IO.WebSocket.Tests/Host.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
module Nata.IO.WebSocket.Tests.Host

open System
open System.Net
open System.Text
open System.Threading
open Suave
open Suave.Sockets
open Suave.Sockets.Control
open Suave.Successful
open Suave.Operators
open Suave.Writers
open Suave.WebSocket
open Suave.Filters

let countText =
handShake <| fun ws _ ->
let rec loop(i) =
socket {
do! Async.Sleep(1000)
|> Async.map Choice1Of2
let code = Opcode.Text
let data =
sprintf "%d" i
|> Encoding.UTF8.GetBytes
|> ByteSegment
do! ws.send code data true
return! loop(i+1)
}
socket {
let! _ = ws.read()
return! loop(0)
}

let countBinary =
handShake <| fun ws _ ->
let rec loop(i:int32) =
socket {
do! Async.Sleep(1000)
|> Async.map Choice1Of2
let code = Opcode.Binary
let data =
BitConverter.GetBytes(i)
|> ByteSegment
do! ws.send code data true
return! loop(i+1)
}
socket {
let! _ = ws.read()
return! loop(0)
}

let echo =
handShake <| fun ws _ ->
let rec loop() =
socket {
let! (code:Opcode, data:byte[], fin:bool) = ws.read()
match code with
| Opcode.Close ->
return ()
| Opcode.Text | Opcode.Binary ->
do! ws.send code (ByteSegment data) fin
return! loop()
| _ ->
return! loop()
}
loop()

let api : WebPart =
choose
[ GET >=> path "/count/text" >=> countText
GET >=> path "/count/binary" >=> countBinary
GET >=> pathStarts "/echo" >=> echo
GET >=> OK "hello" ]
>=> setMimeType "application/json; charset=utf-8"


type Command =
| Start
| Stop

type Service(port:uint16) =

let service = MailboxProcessor.Start(fun (inbox:MailboxProcessor<Command*AsyncReplyChannel<unit>>) ->
let rec waitForStart() =
async {
let! (message, sender) = inbox.Receive()
match message with
| Start ->
let source = new CancellationTokenSource()
let configuration =
{ defaultConfig with
bindings = [ HttpBinding.create HTTP IPAddress.Loopback port ]
cancellationToken = source.Token }
let started, service = startWebServerAsync configuration api
Async.Start(service,source.Token)
let! status = started
sender.Reply()
return! waitForStop(source)
| Stop ->
sender.Reply()
return! waitForStart()
}
and waitForStop(source:CancellationTokenSource) =
async {
let! (message, sender) = inbox.Receive()
match message with
| Stop ->
source.Cancel()
source.Dispose()
sender.Reply()
return! waitForStart()
| Start ->
sender.Reply()
return! waitForStop(source)
}
waitForStart())
do
service.Error.Add(fun ex ->
let message = ex.Message
())

member x.Start() = service.PostAndReply(fun x -> Start, x)
member x.Stop() = service.PostAndReply(fun x -> Stop, x)
1 change: 1 addition & 0 deletions Nata.IO.WebSocket.Tests/Nata.IO.WebSocket.Tests.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
</PropertyGroup>
<ItemGroup>
<None Include="paket.references" />
<Compile Include="Host.fs" />
<Compile Include="SocketTests.fs" />
</ItemGroup>
<ItemGroup>
Expand Down
86 changes: 80 additions & 6 deletions Nata.IO.WebSocket.Tests/SocketTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,93 @@ type SocketTests() =

let settings = { Host="ws://websocketstest.com:80/service"; AutoPingInterval=None }

let service = new Host.Service(9998us)
let echo, countText, countBinary =
{ Host="ws://localhost:9998/echo"; AutoPingInterval=None },
{ Host="ws://localhost:9998/count/text"; AutoPingInterval=None },
{ Host="ws://localhost:9998/count/binary"; AutoPingInterval=None }

[<SetUp>]
member x.SetUp() =
service.Start()

[<TearDown>]
member x.TearDown() =
service.Stop()

[<Test; MaxTime(10000)>]
member x.TestTimeService() =
let write, subscribe =
let subscription, write =
let socket = Socket.connect settings
writer socket,
subscriber socket
let subscription =
subscribe()
subscribe socket,
writer socket
write (Event.create "version,")
let time =
subscription
|> Seq.logi (fun i x -> if i = 0 then write (Event.create "timer,"))
|> Seq.take 3
|> Seq.last
|> Event.data
Assert.True(time.Contains(DateTime.UtcNow.Year.ToString()))
Assert.True(time.Contains(DateTime.UtcNow.Year.ToString()))

[<Test; MaxTime(30000)>]
member x.TestStringSubscriber() =
let subscription, write =
let socket = Socket.connect countText
subscribe socket,
writer socket
write(Event.create(String.Empty))
let results =
subscription
|> Seq.map (Event.data >> Int64.Parse)
|> Seq.take 5
|> Seq.toList
Assert.AreEqual([0L..4L], results)

[<Test; MaxTime(30000)>]
member x.TestBinarySubscriber() =
let subscription, write =
let socket = Socket.connectBinary countBinary
subscribe socket,
writer socket
write(Event.create([||]))
let results =
subscription
|> Seq.map (Event.data >> (fun xs -> BitConverter.ToInt32(xs, 0)))
|> Seq.take 5
|> Seq.toList
Assert.AreEqual([0L..4L], results)

[<Test; MaxTime(10000)>]
member x.TestStringReadWriteEcho() =
let subscription, write =
let socket = Socket.connect echo
subscribe socket,
writer socket
let data =
let random = new Random()
[| for i in 1..10 -> random.Next().ToString() |]
for x in data do write(Event.create x)
let results =
subscription
|> Seq.map Event.data
|> Seq.take data.Length
|> Seq.toArray
Assert.AreEqual(data, results)

[<Test; MaxTime(10000)>]
member x.TestBinaryReadWriteEcho() =
let subscription, write =
let socket = Socket.connectBinary echo
subscribe socket,
writer socket
let data =
let random = new Random()
[| for i in 1..10 -> BitConverter.GetBytes(random.Next()) |]
for x in data do write(Event.create x)
let results =
subscription
|> Seq.map Event.data
|> Seq.take data.Length
|> Seq.toArray
Assert.AreEqual(data, results)
3 changes: 2 additions & 1 deletion Nata.IO.WebSocket.Tests/paket.references
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ NUnit
FSharp.Core
System.ValueTuple
NUnit3TestAdapter
Microsoft.NET.Test.Sdk
Microsoft.NET.Test.Sdk
Suave
51 changes: 38 additions & 13 deletions Nata.IO.WebSocket/Socket.fs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ module Socket =
AutoPingInterval : int option
}

type Status =
type Status<'T> =
| Opened
| Received of string
| Received of 'T
| Closed of exn option

let connect : Source<Settings,string,unit> =
let inline private connectUsing (sender, receiver) : Source<Settings,'T,unit> =
function
| { Host=host; AutoPingInterval=autoPing } ->

let multicast = new Multicast<Status>()
let multicast = new Multicast<Status<'T>>()
let socket = new WebSocket(host)

match autoPing with
Expand All @@ -33,11 +33,10 @@ module Socket =
socket.EnableAutoSendPing <- true
socket.AutoSendPingInterval <- interval

socket.Opened.AddHandler(fun s e -> multicast.Publish(Opened))
socket.Closed.AddHandler(fun s e -> multicast.Publish(Closed(None)))
socket.Error.AddHandler(fun s e -> multicast.Publish(Closed(Some e.Exception)))
socket.MessageReceived.AddHandler(fun s e -> multicast.Publish(Received(e.Message)))

socket.Opened.AddHandler(fun _ _ -> multicast.Publish(Opened))
socket.Closed.AddHandler(fun _ _ -> multicast.Publish(Closed(None)))
socket.Error.AddHandler(fun _ e -> multicast.Publish(Closed(Some e.Exception)))
socket |> receiver multicast

let initialize =
new Lazy<unit>((fun () ->
Expand All @@ -48,11 +47,11 @@ module Socket =
|> Seq.iter ignore),
true)

let write(message:Event<string>) =
let write(message:Event<'T>) =
initialize.Force()
socket.Send(message |> Event.data)
message |> Event.data |> sender socket

let listen() =
let listen() : seq<Event<'T>> =
let events = multicast.Subscribe()
initialize.Force()
seq {
Expand All @@ -73,4 +72,30 @@ module Socket =
Writer <| write

Subscriber <| listen
]
]


let connect : Source<Settings,string,unit> =

let receiver (multicast:Multicast<Status<string>>)
(socket:WebSocket)=
socket.MessageReceived.AddHandler(fun _ e ->
multicast.Publish(Received(e.Message)))

let sender (socket:WebSocket) (message:string) =
socket.Send(message)

connectUsing(sender,receiver)


let connectBinary : Source<Settings,byte[],unit> =

let receiver (multicast:Multicast<Status<byte[]>>)
(socket:WebSocket) =
socket.DataReceived.AddHandler(fun _ e ->
multicast.Publish(Received(e.Data)))

let sender (socket:WebSocket) (message:byte[]) =
socket.Send(message,0,message.Length)

connectUsing(sender,receiver)
8 changes: 4 additions & 4 deletions Nata.sln
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.28803.156
# Visual Studio 15
VisualStudioVersion = 15.0.28307.572
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".paket", ".paket", "{D5F93442-F280-45E1-9224-5BF707244CC8}"
ProjectSection(SolutionItems) = preProject
Expand Down Expand Up @@ -77,9 +77,9 @@ Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Nata.IO.Tests", "Nata.IO.Te
EndProject
Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Nata.Core.NLog", "Nata.Core.NLog\Nata.Core.NLog.fsproj", "{517BFD31-F1A9-436B-A94B-2A1FAFEA4C99}"
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Nata.Codec.FsPickler", "Nata.Codec.FsPickler\Nata.Codec.FsPickler.fsproj", "{6F589E26-552F-4754-B8B8-0D722866AEBA}"
Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Nata.Codec.FsPickler", "Nata.Codec.FsPickler\Nata.Codec.FsPickler.fsproj", "{6F589E26-552F-4754-B8B8-0D722866AEBA}"
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Nata.Codec.FsPickler.Tests", "Nata.Codec.FsPickler.Tests\Nata.Codec.FsPickler.Tests.fsproj", "{225D0C16-CB90-4AA0-8B4B-7248D411A876}"
Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Nata.Codec.FsPickler.Tests", "Nata.Codec.FsPickler.Tests\Nata.Codec.FsPickler.Tests.fsproj", "{225D0C16-CB90-4AA0-8B4B-7248D411A876}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down
1 change: 1 addition & 0 deletions paket.dependencies
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ nuget NUnit.Console
nuget NUnit3TestAdapter
nuget Owin
nuget RabbitMQ.Client
nuget Suave
nuget System.ValueTuple
nuget WebSocket4Net
nuget WindowsAzure.Storage
Loading

0 comments on commit 006c482

Please sign in to comment.