Skip to content

Commit

Permalink
[ fix ] shut down client connections gracefully
Browse files Browse the repository at this point in the history
  • Loading branch information
stefan-hoeck committed Jan 12, 2024
1 parent dca90bd commit f6e556a
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 145 deletions.
3 changes: 2 additions & 1 deletion docs/src/Docs/UV/Async.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ parameters {auto l : UVLoop}
onConnection ac server = do
putOutLn "Got a connection"
client <- acceptTcp server
_ <- streamRead ac client $ \case
_ <- streamReadWrite ac client $ \case
Done => pure (Just ())
Data val => bytesOut val >> write client val $> Nothing
Err x => throw x
Expand All @@ -139,6 +139,7 @@ parameters {auto l : UVLoop}
Right srv => onConnection ac srv
ignore $ onSignal SIGINT
putOutLn "Shutting down server..."
cancel
main : IO ()
Expand Down
48 changes: 39 additions & 9 deletions src/IO/Async/Async.idr
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ data Prim : List Type -> Type -> Type where
-- spawn a new fiber with the given computation
Start : Async es a -> Prim es (Fiber es a)

-- out cancel state that can be used when spawning a new fiber.
Self : Prim es (MVar CancelState)

-- repeatedly poll the given `IO` action until int yields a `Just`
Poll : IO (Maybe $ Outcome es a) -> Prim es a

Expand Down Expand Up @@ -99,6 +102,7 @@ primType (CB f) = "CB"
primType (Start x) = "Start of \{type x}"
primType (Poll x) = "Poll"
primType Cancel = "Cancel"
primType Self = "Self"

type (AP c x) = "AP(\{disp c}) of \{primType x}"
type (AB c x f) = "AB(\{disp c}) of \{type x}"
Expand Down Expand Up @@ -150,11 +154,22 @@ throw = fail . inject
-- Cancelling fibers
--------------------------------------------------------------------------------

||| Sets the given computation's cancelability to "cancelable".
||| Sets the given computation's cancelability to "cancelable" if it
||| is currently at "parent".
|||
||| If you plan to enforce cancelability no matter what, use
||| `strictCancelable`.
export
cancelable : Async es a -> Async es a
cancelable (AP _ x) = AP C x
cancelable (AB _ x f) = AB C x f
cancelable (AP P x) = AP C x
cancelable (AB P x f) = AB C x f
cancelable x = x

||| Sets the given computation's cancelability to "cancelable".
export
strictCancelable : Async es a -> Async es a
strictCancelable (AP _ x) = AP C x
strictCancelable (AB _ x f) = AB C x f

||| Sets the given computation's cancelability to "uncancelable".
export
Expand Down Expand Up @@ -204,7 +219,7 @@ HasIO (Async es) where
export
catch : (HSum es -> Async fs a) -> Async es a -> Async fs a
catch f as =
AB U as $ \case
AB U (cancelable as) $ \case
Succeeded a => pure a
Error err => f err
Canceled => canceled
Expand Down Expand Up @@ -235,7 +250,7 @@ guarantee :
-> (cleanup : Outcome es a -> Async [] ())
-> Async es a
guarantee as fun =
AB U as (\o => AB U (fun o) (\_ => sync $ pure o))
AB U (cancelable as) (\o => AB U (fun o) (\_ => sync $ pure o))

||| Guarantees to run the given cleanup hook in case a fiber
||| has been canceled.
Expand Down Expand Up @@ -375,6 +390,10 @@ export %inline
raceEither : (x : Async es a) -> (y : Async es b) -> Async es (Either a b)
raceEither x y = race (map Left x) (map Right y)

export %inline
self : Async es (MVar CancelState)
self = AP P Self

--------------------------------------------------------------------------------
-- Evaluation
--------------------------------------------------------------------------------
Expand Down Expand Up @@ -491,6 +510,8 @@ parameters {auto ctxt : AsyncContext}
| MkIORes (Just v) w2 => MkIORes (Done b v) w2
in MkIORes (Cede b . AP c $ Poll x) w2

prim m b c Self w = MkIORes (Done b $ Succeeded m) w

prim m b c Cancel w = MkIORes (Done True Canceled) w

step :
Expand Down Expand Up @@ -527,13 +548,22 @@ parameters {auto ctxt : AsyncContext}
-- Running asynchronous computations
--------------------------------------------------------------------------------

export
childOnAsync :
{auto ctxt : AsyncContext}
-> (parent : Maybe $ MVar CancelState)
-> Async es a
-> (Outcome es a -> IO ())
-> IO ()
childOnAsync parent as cb = do
fb <- newFiber parent
eval (EST fb (guarantee as (liftIO . cb)) [])

||| Runs the given asynchronous computation to completion
||| invoking the given callback once it is done.
export
export %inline
onAsync : AsyncContext => Async es a -> (Outcome es a -> IO ()) -> IO ()
onAsync as cb = do
fb <- newFiber Nothing
eval (EST fb (guarantee as (liftIO . cb)) [])
onAsync = childOnAsync Nothing

||| Asynchronously runs the given computation to completion.
export %inline
Expand Down
123 changes: 0 additions & 123 deletions src/IO/Async/MVar.idr
Original file line number Diff line number Diff line change
Expand Up @@ -118,126 +118,3 @@ complete d v = do
Nothing => (Just v, True)
conditionBroadcast d.cond
pure b

--------------------------------------------------------------------------------
-- MQueue
--------------------------------------------------------------------------------

||| A thread-safe work queue.
export
record MQueue a where
[noHints]
constructor MQ
queue : MVar (Queue a)
cond : Condition

export
newMQueue : IO (MQueue a)
newMQueue = [| MQ (newMVar empty) makeCondition |]

export
enqueue : MQueue a -> a -> IO ()
enqueue q v = modifyMVar q.queue (`enqueue` v) >> conditionSignal q.cond

export
enqueueAll : MQueue a -> List a -> IO ()
enqueueAll q [] = pure ()
enqueueAll q vs = modifyMVar q.queue (`enqueueAll` vs) >> conditionSignal q.cond

covering
dequeue' : Bool -> MQueue a -> IO a
dequeue' b q = do
when b (mutexAcquire q.queue.lock)
q1 <- readIORef q.queue.ref
case dequeue q1 of
Nothing => do
conditionWait q.cond q.queue.lock
dequeue' False q
Just (v,q2) => do
writeIORef q.queue.ref q2
mutexRelease q.queue.lock
pure v

export covering %inline
dequeue : MQueue a -> IO a
dequeue = dequeue' True

--------------------------------------------------------------------------------
-- Worker
--------------------------------------------------------------------------------
--
-- export
-- record WorkST a where
-- constructor W
-- stopped : IORef Bool
-- queue : MQueue a
-- act : (a -> IO ()) -> a -> IO ()
--
-- export
-- newWorkST : ((a -> IO ()) -> a -> IO ()) -> IO (WorkST a)
-- newWorkST fun = [| W (newIORef False) newMQueue (pure fun) |]
--
-- namespace WorkST
-- export
-- stop : WorkST a -> IO ()
-- stop w = do
-- mutexAcquire w.queue.queue.lock
-- writeIORef w.stopped True
-- conditionBroadcast w.queue.cond
-- mutexRelease w.queue.queue.lock
--
-- export %inline
-- submit : a -> WorkST a -> IO ()
-- submit v w = enqueue w.queue v
--
-- export %inline
-- submitAll : List a -> WorkST a -> IO ()
-- submitAll vs w = enqueueAll w.queue vs
--
-- covering
-- process' : Bool -> WorkST a -> IO ()
-- process' b w = do
-- when b (mutexAcquire w.queue.queue.lock)
-- q1 <- readIORef w.queue.queue.ref
-- case dequeue q1 of
-- Nothing => do
-- False <- readIORef w.stopped
-- | True => mutexRelease w.queue.queue.lock
-- conditionWait w.queue.cond w.queue.queue.lock
-- process' False w
--
-- Just (v,q2) => do
-- writeIORef w.queue.queue.ref q2
-- mutexRelease w.queue.queue.lock
-- w.act (`submit` w) v
-- process' True w
--
-- export covering %inline
-- process : WorkST a -> IO ()
-- process = process' True
--
-- export
-- record WorkPool a where
-- constructor WP
-- st : WorkST a
-- ts : List ThreadID
--
-- export covering
-- newWorkPool : (n : Nat) -> ((a -> IO ()) -> a -> IO ()) -> IO (WorkPool a)
-- newWorkPool n fun = do
-- w <- newWorkST fun
-- ts <- traverse (\x => fork $ process w) [1..n]
-- pure $ WP w ts
--
-- namespace WorkPool
-- export
-- stop : WorkPool a -> IO ()
-- stop (WP st ts) = stop st >> traverse_ (\x => threadWait x >> putStrLn "Thread done") ts
--
-- export %inline
-- submit : a -> WorkPool a -> IO ()
-- submit v = submit v . st
--
-- export %inline
-- submitAll : List a -> WorkPool a -> IO ()
-- submitAll vs = submitAll vs . st
4 changes: 2 additions & 2 deletions src/System/UV/File.idr
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ parameters {auto l : UVLoop}
readBytes : File -> Bits32 -> Async es ByteString
readBytes f size =
use1 (mallocPtrs Bits8 size) $ \cs =>
cancelable $ uvAsync $ \cb => do
uvAsync $ \cb => do
async (uv_fs_read l.loop f.file cs size (-1)) (readOutcome cs cb)

export
Expand All @@ -189,7 +189,7 @@ parameters {auto l : UVLoop}
-> (ByteString -> Async es (Maybe b))
-> Async es (Maybe b)
streamFileUntil {b} path size fun =
use1 (fsOpen path RDONLY 0) $ \h => cancelable $ go h
use1 (fsOpen path RDONLY 0) $ \h => go h
where
go : File -> Async es (Maybe b)
go h = do
Expand Down
16 changes: 9 additions & 7 deletions src/System/UV/Loop.idr
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ parameters {auto has : Has UVError es}
-> Async es a
uvOnce p close reg =
finally
(cancelable $ async $ \cb => do
(async $ \cb => do
n <- reg (cb . Succeeded)
case uvRes n of
Left err => cb (Error err)
Expand Down Expand Up @@ -108,12 +108,14 @@ parameters {auto has : Has UVError es}
-> Async es (Fiber es b)
uvForever to p close reg =
start $ finally
(cancelable $ forever $ \cb => do
n <- reg (\va => onAsync (to va) cb)
case uvRes n of
Left err => cb (Error err)
Right () => pure ()
)
(do
cc <- self
forever $ \cb => do
n <- reg (\va => childOnAsync (Just cc) (to va) cb)
case uvRes n of
Left err => cb (Error err)
Right () => pure ()
)
(close p)

export
Expand Down
22 changes: 19 additions & 3 deletions src/System/UV/Stream.idr
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ export
parameters {auto l : UVLoop}
{auto has : Has UVError es}

close_stream : Ptr Stream -> Async [] ()
close_stream x = do
uv_read_stop x
ignore (uv_shutdown x $ \_,_ => uv_close x %search)

export
streamRead :
AllocCB
Expand All @@ -45,6 +50,17 @@ parameters {auto l : UVLoop}
uvForever run h (\x => uv_read_stop x) $ \cb =>
uv_read_start h ac (\_,n,buf => toMsg n buf >>= cb)

export
streamReadWrite :
AllocCB
-> Ptr t
-> {auto 0 cstt : PCast t Stream}
-> (ReadRes ByteString -> Async es (Maybe a))
-> Async es (Fiber es a)
streamReadWrite ac h run = do
uvForever run h (close_stream . castPtr) $ \cb =>
uv_read_start h ac (\_,n,buf => toMsg n buf >>= cb)

export
write :
Ptr t
Expand All @@ -58,10 +74,10 @@ parameters {auto l : UVLoop}
export
listen :
Ptr t
-> {auto 0 cstt : PCast t Stream}
-> {auto 0 cst : PCast t Stream}
-> (Either UVError (Ptr Stream) -> Async es (Maybe a))
-> Async es (Fiber es a)
listen server run =
uvForever run server (\_ => pure ()) $ \cb =>
listen {cst} server run =
uvForever run server (release . castPtr @{cst}) $ \cb =>
uv_listen server 128 $ \p,res =>
cb $ if res < 0 then Left $ fromCode res else Right p

0 comments on commit f6e556a

Please sign in to comment.