Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ fix ] shut down client connections gracefully #12

Merged
merged 1 commit into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/src/Docs/UV/Async.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ parameters {auto l : UVLoop}
onConnection ac server = do
putOutLn "Got a connection"
client <- acceptTcp server
_ <- streamRead ac client $ \case
_ <- streamReadWrite ac client $ \case
Done => pure (Just ())
Data val => bytesOut val >> write client val $> Nothing
Err x => throw x
Expand All @@ -139,6 +139,7 @@ parameters {auto l : UVLoop}
Right srv => onConnection ac srv
ignore $ onSignal SIGINT
putOutLn "Shutting down server..."
cancel
main : IO ()
Expand Down
48 changes: 39 additions & 9 deletions src/IO/Async/Async.idr
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ data Prim : List Type -> Type -> Type where
-- spawn a new fiber with the given computation
Start : Async es a -> Prim es (Fiber es a)

-- out cancel state that can be used when spawning a new fiber.
Self : Prim es (MVar CancelState)

-- repeatedly poll the given `IO` action until int yields a `Just`
Poll : IO (Maybe $ Outcome es a) -> Prim es a

Expand Down Expand Up @@ -99,6 +102,7 @@ primType (CB f) = "CB"
primType (Start x) = "Start of \{type x}"
primType (Poll x) = "Poll"
primType Cancel = "Cancel"
primType Self = "Self"

type (AP c x) = "AP(\{disp c}) of \{primType x}"
type (AB c x f) = "AB(\{disp c}) of \{type x}"
Expand Down Expand Up @@ -150,11 +154,22 @@ throw = fail . inject
-- Cancelling fibers
--------------------------------------------------------------------------------

||| Sets the given computation's cancelability to "cancelable".
||| Sets the given computation's cancelability to "cancelable" if it
||| is currently at "parent".
|||
||| If you plan to enforce cancelability no matter what, use
||| `strictCancelable`.
export
cancelable : Async es a -> Async es a
cancelable (AP _ x) = AP C x
cancelable (AB _ x f) = AB C x f
cancelable (AP P x) = AP C x
cancelable (AB P x f) = AB C x f
cancelable x = x

||| Sets the given computation's cancelability to "cancelable".
export
strictCancelable : Async es a -> Async es a
strictCancelable (AP _ x) = AP C x
strictCancelable (AB _ x f) = AB C x f

||| Sets the given computation's cancelability to "uncancelable".
export
Expand Down Expand Up @@ -204,7 +219,7 @@ HasIO (Async es) where
export
catch : (HSum es -> Async fs a) -> Async es a -> Async fs a
catch f as =
AB U as $ \case
AB U (cancelable as) $ \case
Succeeded a => pure a
Error err => f err
Canceled => canceled
Expand Down Expand Up @@ -235,7 +250,7 @@ guarantee :
-> (cleanup : Outcome es a -> Async [] ())
-> Async es a
guarantee as fun =
AB U as (\o => AB U (fun o) (\_ => sync $ pure o))
AB U (cancelable as) (\o => AB U (fun o) (\_ => sync $ pure o))

||| Guarantees to run the given cleanup hook in case a fiber
||| has been canceled.
Expand Down Expand Up @@ -375,6 +390,10 @@ export %inline
raceEither : (x : Async es a) -> (y : Async es b) -> Async es (Either a b)
raceEither x y = race (map Left x) (map Right y)

export %inline
self : Async es (MVar CancelState)
self = AP P Self

--------------------------------------------------------------------------------
-- Evaluation
--------------------------------------------------------------------------------
Expand Down Expand Up @@ -491,6 +510,8 @@ parameters {auto ctxt : AsyncContext}
| MkIORes (Just v) w2 => MkIORes (Done b v) w2
in MkIORes (Cede b . AP c $ Poll x) w2

prim m b c Self w = MkIORes (Done b $ Succeeded m) w

prim m b c Cancel w = MkIORes (Done True Canceled) w

step :
Expand Down Expand Up @@ -527,13 +548,22 @@ parameters {auto ctxt : AsyncContext}
-- Running asynchronous computations
--------------------------------------------------------------------------------

export
childOnAsync :
{auto ctxt : AsyncContext}
-> (parent : Maybe $ MVar CancelState)
-> Async es a
-> (Outcome es a -> IO ())
-> IO ()
childOnAsync parent as cb = do
fb <- newFiber parent
eval (EST fb (guarantee as (liftIO . cb)) [])

||| Runs the given asynchronous computation to completion
||| invoking the given callback once it is done.
export
export %inline
onAsync : AsyncContext => Async es a -> (Outcome es a -> IO ()) -> IO ()
onAsync as cb = do
fb <- newFiber Nothing
eval (EST fb (guarantee as (liftIO . cb)) [])
onAsync = childOnAsync Nothing

||| Asynchronously runs the given computation to completion.
export %inline
Expand Down
123 changes: 0 additions & 123 deletions src/IO/Async/MVar.idr
Original file line number Diff line number Diff line change
Expand Up @@ -118,126 +118,3 @@ complete d v = do
Nothing => (Just v, True)
conditionBroadcast d.cond
pure b

--------------------------------------------------------------------------------
-- MQueue
--------------------------------------------------------------------------------

||| A thread-safe work queue.
export
record MQueue a where
[noHints]
constructor MQ
queue : MVar (Queue a)
cond : Condition

export
newMQueue : IO (MQueue a)
newMQueue = [| MQ (newMVar empty) makeCondition |]

export
enqueue : MQueue a -> a -> IO ()
enqueue q v = modifyMVar q.queue (`enqueue` v) >> conditionSignal q.cond

export
enqueueAll : MQueue a -> List a -> IO ()
enqueueAll q [] = pure ()
enqueueAll q vs = modifyMVar q.queue (`enqueueAll` vs) >> conditionSignal q.cond

covering
dequeue' : Bool -> MQueue a -> IO a
dequeue' b q = do
when b (mutexAcquire q.queue.lock)
q1 <- readIORef q.queue.ref
case dequeue q1 of
Nothing => do
conditionWait q.cond q.queue.lock
dequeue' False q
Just (v,q2) => do
writeIORef q.queue.ref q2
mutexRelease q.queue.lock
pure v

export covering %inline
dequeue : MQueue a -> IO a
dequeue = dequeue' True

--------------------------------------------------------------------------------
-- Worker
--------------------------------------------------------------------------------
--
-- export
-- record WorkST a where
-- constructor W
-- stopped : IORef Bool
-- queue : MQueue a
-- act : (a -> IO ()) -> a -> IO ()
--
-- export
-- newWorkST : ((a -> IO ()) -> a -> IO ()) -> IO (WorkST a)
-- newWorkST fun = [| W (newIORef False) newMQueue (pure fun) |]
--
-- namespace WorkST
-- export
-- stop : WorkST a -> IO ()
-- stop w = do
-- mutexAcquire w.queue.queue.lock
-- writeIORef w.stopped True
-- conditionBroadcast w.queue.cond
-- mutexRelease w.queue.queue.lock
--
-- export %inline
-- submit : a -> WorkST a -> IO ()
-- submit v w = enqueue w.queue v
--
-- export %inline
-- submitAll : List a -> WorkST a -> IO ()
-- submitAll vs w = enqueueAll w.queue vs
--
-- covering
-- process' : Bool -> WorkST a -> IO ()
-- process' b w = do
-- when b (mutexAcquire w.queue.queue.lock)
-- q1 <- readIORef w.queue.queue.ref
-- case dequeue q1 of
-- Nothing => do
-- False <- readIORef w.stopped
-- | True => mutexRelease w.queue.queue.lock
-- conditionWait w.queue.cond w.queue.queue.lock
-- process' False w
--
-- Just (v,q2) => do
-- writeIORef w.queue.queue.ref q2
-- mutexRelease w.queue.queue.lock
-- w.act (`submit` w) v
-- process' True w
--
-- export covering %inline
-- process : WorkST a -> IO ()
-- process = process' True
--
-- export
-- record WorkPool a where
-- constructor WP
-- st : WorkST a
-- ts : List ThreadID
--
-- export covering
-- newWorkPool : (n : Nat) -> ((a -> IO ()) -> a -> IO ()) -> IO (WorkPool a)
-- newWorkPool n fun = do
-- w <- newWorkST fun
-- ts <- traverse (\x => fork $ process w) [1..n]
-- pure $ WP w ts
--
-- namespace WorkPool
-- export
-- stop : WorkPool a -> IO ()
-- stop (WP st ts) = stop st >> traverse_ (\x => threadWait x >> putStrLn "Thread done") ts
--
-- export %inline
-- submit : a -> WorkPool a -> IO ()
-- submit v = submit v . st
--
-- export %inline
-- submitAll : List a -> WorkPool a -> IO ()
-- submitAll vs = submitAll vs . st
4 changes: 2 additions & 2 deletions src/System/UV/File.idr
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ parameters {auto l : UVLoop}
readBytes : File -> Bits32 -> Async es ByteString
readBytes f size =
use1 (mallocPtrs Bits8 size) $ \cs =>
cancelable $ uvAsync $ \cb => do
uvAsync $ \cb => do
async (uv_fs_read l.loop f.file cs size (-1)) (readOutcome cs cb)

export
Expand All @@ -189,7 +189,7 @@ parameters {auto l : UVLoop}
-> (ByteString -> Async es (Maybe b))
-> Async es (Maybe b)
streamFileUntil {b} path size fun =
use1 (fsOpen path RDONLY 0) $ \h => cancelable $ go h
use1 (fsOpen path RDONLY 0) $ \h => go h
where
go : File -> Async es (Maybe b)
go h = do
Expand Down
16 changes: 9 additions & 7 deletions src/System/UV/Loop.idr
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ parameters {auto has : Has UVError es}
-> Async es a
uvOnce p close reg =
finally
(cancelable $ async $ \cb => do
(async $ \cb => do
n <- reg (cb . Succeeded)
case uvRes n of
Left err => cb (Error err)
Expand Down Expand Up @@ -108,12 +108,14 @@ parameters {auto has : Has UVError es}
-> Async es (Fiber es b)
uvForever to p close reg =
start $ finally
(cancelable $ forever $ \cb => do
n <- reg (\va => onAsync (to va) cb)
case uvRes n of
Left err => cb (Error err)
Right () => pure ()
)
(do
cc <- self
forever $ \cb => do
n <- reg (\va => childOnAsync (Just cc) (to va) cb)
case uvRes n of
Left err => cb (Error err)
Right () => pure ()
)
(close p)

export
Expand Down
22 changes: 19 additions & 3 deletions src/System/UV/Stream.idr
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ export
parameters {auto l : UVLoop}
{auto has : Has UVError es}

close_stream : Ptr Stream -> Async [] ()
close_stream x = do
uv_read_stop x
ignore (uv_shutdown x $ \_,_ => uv_close x %search)

export
streamRead :
AllocCB
Expand All @@ -45,6 +50,17 @@ parameters {auto l : UVLoop}
uvForever run h (\x => uv_read_stop x) $ \cb =>
uv_read_start h ac (\_,n,buf => toMsg n buf >>= cb)

export
streamReadWrite :
AllocCB
-> Ptr t
-> {auto 0 cstt : PCast t Stream}
-> (ReadRes ByteString -> Async es (Maybe a))
-> Async es (Fiber es a)
streamReadWrite ac h run = do
uvForever run h (close_stream . castPtr) $ \cb =>
uv_read_start h ac (\_,n,buf => toMsg n buf >>= cb)

export
write :
Ptr t
Expand All @@ -58,10 +74,10 @@ parameters {auto l : UVLoop}
export
listen :
Ptr t
-> {auto 0 cstt : PCast t Stream}
-> {auto 0 cst : PCast t Stream}
-> (Either UVError (Ptr Stream) -> Async es (Maybe a))
-> Async es (Fiber es a)
listen server run =
uvForever run server (\_ => pure ()) $ \cb =>
listen {cst} server run =
uvForever run server (release . castPtr @{cst}) $ \cb =>
uv_listen server 128 $ \p,res =>
cb $ if res < 0 then Left $ fromCode res else Right p
Loading