-
Notifications
You must be signed in to change notification settings - Fork 3
/
MergeServer.hs
44 lines (37 loc) · 1.8 KB
/
MergeServer.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import Control.Monad.IO.Class (liftIO)
import Data.Function ((&))
import Network.Socket (Socket, close)
import Streamly.Data.Array (Array)
import Streamly.Data.Stream.Prelude (Stream)
import System.IO (Handle, withFile, IOMode(..))
import qualified Streamly.Data.Array as Array
import qualified Streamly.Data.Fold as Fold
import qualified Streamly.Data.Stream.Prelude as Stream
import qualified Streamly.FileSystem.Handle as Handle
import qualified Streamly.Network.Socket as Socket
import qualified Streamly.Network.Inet.TCP as TCP
import qualified Streamly.Unicode.Stream as Unicode
-- | Read a line stream from a socket. Note, lines are buffered, we could add
-- a limit to the buffering for safety.
readLines :: Socket -> Stream IO (Array Char)
readLines sk =
Socket.read sk -- Stream IO Word8
& Unicode.decodeLatin1 -- Stream IO Char
& split (== '\n') Array.write -- Stream IO String
where
split p f = Stream.foldMany (Fold.takeEndBy p f)
recv :: Socket -> Stream IO (Array Char)
recv sk = Stream.finallyIO (liftIO $ close sk) (readLines sk)
-- | Starts a server at port 8091 listening for lines with space separated
-- words. Multiple clients can connect to the server and send streams of lines.
-- The server handles all the connections concurrently, merges the incoming
-- streams at line boundaries and writes the merged stream to a file.
server :: Handle -> IO ()
server file =
TCP.accept 8090 -- Stream IO Socket
& Stream.parConcatMap (Stream.eager True) recv -- Stream IO (Array Char)
& Stream.unfoldMany Array.reader -- Stream IO Char
& Unicode.encodeLatin1 -- Stream IO Word8
& Stream.fold (Handle.write file) -- IO ()
main :: IO ()
main = withFile "output.txt" AppendMode server