diff --git a/docs/src/Docs/UV/Async.md b/docs/src/Docs/UV/Async.md index 3ab6c5a..192217c 100644 --- a/docs/src/Docs/UV/Async.md +++ b/docs/src/Docs/UV/Async.md @@ -125,7 +125,7 @@ parameters {auto l : UVLoop} onConnection ac server = do putOutLn "Got a connection" client <- acceptTcp server - _ <- streamRead ac client $ \case + _ <- streamReadWrite ac client $ \case Done => pure (Just ()) Data val => bytesOut val >> write client val $> Nothing Err x => throw x @@ -139,6 +139,7 @@ parameters {auto l : UVLoop} Right srv => onConnection ac srv ignore $ onSignal SIGINT + putOutLn "Shutting down server..." cancel main : IO () diff --git a/src/IO/Async/Async.idr b/src/IO/Async/Async.idr index 63279e9..fd39cd3 100644 --- a/src/IO/Async/Async.idr +++ b/src/IO/Async/Async.idr @@ -70,6 +70,9 @@ data Prim : List Type -> Type -> Type where -- spawn a new fiber with the given computation Start : Async es a -> Prim es (Fiber es a) + -- out cancel state that can be used when spawning a new fiber. + Self : Prim es (MVar CancelState) + -- repeatedly poll the given `IO` action until int yields a `Just` Poll : IO (Maybe $ Outcome es a) -> Prim es a @@ -99,6 +102,7 @@ primType (CB f) = "CB" primType (Start x) = "Start of \{type x}" primType (Poll x) = "Poll" primType Cancel = "Cancel" +primType Self = "Self" type (AP c x) = "AP(\{disp c}) of \{primType x}" type (AB c x f) = "AB(\{disp c}) of \{type x}" @@ -150,11 +154,22 @@ throw = fail . inject -- Cancelling fibers -------------------------------------------------------------------------------- -||| Sets the given computation's cancelability to "cancelable". +||| Sets the given computation's cancelability to "cancelable" if it +||| is currently at "parent". +||| +||| If you plan to enforce cancelability no matter what, use +||| `strictCancelable`. export cancelable : Async es a -> Async es a -cancelable (AP _ x) = AP C x -cancelable (AB _ x f) = AB C x f +cancelable (AP P x) = AP C x +cancelable (AB P x f) = AB C x f +cancelable x = x + +||| Sets the given computation's cancelability to "cancelable". +export +strictCancelable : Async es a -> Async es a +strictCancelable (AP _ x) = AP C x +strictCancelable (AB _ x f) = AB C x f ||| Sets the given computation's cancelability to "uncancelable". export @@ -204,7 +219,7 @@ HasIO (Async es) where export catch : (HSum es -> Async fs a) -> Async es a -> Async fs a catch f as = - AB U as $ \case + AB U (cancelable as) $ \case Succeeded a => pure a Error err => f err Canceled => canceled @@ -235,7 +250,7 @@ guarantee : -> (cleanup : Outcome es a -> Async [] ()) -> Async es a guarantee as fun = - AB U as (\o => AB U (fun o) (\_ => sync $ pure o)) + AB U (cancelable as) (\o => AB U (fun o) (\_ => sync $ pure o)) ||| Guarantees to run the given cleanup hook in case a fiber ||| has been canceled. @@ -375,6 +390,10 @@ export %inline raceEither : (x : Async es a) -> (y : Async es b) -> Async es (Either a b) raceEither x y = race (map Left x) (map Right y) +export %inline +self : Async es (MVar CancelState) +self = AP P Self + -------------------------------------------------------------------------------- -- Evaluation -------------------------------------------------------------------------------- @@ -491,6 +510,8 @@ parameters {auto ctxt : AsyncContext} | MkIORes (Just v) w2 => MkIORes (Done b v) w2 in MkIORes (Cede b . AP c $ Poll x) w2 + prim m b c Self w = MkIORes (Done b $ Succeeded m) w + prim m b c Cancel w = MkIORes (Done True Canceled) w step : @@ -527,13 +548,22 @@ parameters {auto ctxt : AsyncContext} -- Running asynchronous computations -------------------------------------------------------------------------------- +export +childOnAsync : + {auto ctxt : AsyncContext} + -> (parent : Maybe $ MVar CancelState) + -> Async es a + -> (Outcome es a -> IO ()) + -> IO () +childOnAsync parent as cb = do + fb <- newFiber parent + eval (EST fb (guarantee as (liftIO . cb)) []) + ||| Runs the given asynchronous computation to completion ||| invoking the given callback once it is done. -export +export %inline onAsync : AsyncContext => Async es a -> (Outcome es a -> IO ()) -> IO () -onAsync as cb = do - fb <- newFiber Nothing - eval (EST fb (guarantee as (liftIO . cb)) []) +onAsync = childOnAsync Nothing ||| Asynchronously runs the given computation to completion. export %inline diff --git a/src/IO/Async/MVar.idr b/src/IO/Async/MVar.idr index 7200806..13d150b 100644 --- a/src/IO/Async/MVar.idr +++ b/src/IO/Async/MVar.idr @@ -118,126 +118,3 @@ complete d v = do Nothing => (Just v, True) conditionBroadcast d.cond pure b - --------------------------------------------------------------------------------- --- MQueue --------------------------------------------------------------------------------- - -||| A thread-safe work queue. -export -record MQueue a where - [noHints] - constructor MQ - queue : MVar (Queue a) - cond : Condition - -export -newMQueue : IO (MQueue a) -newMQueue = [| MQ (newMVar empty) makeCondition |] - -export -enqueue : MQueue a -> a -> IO () -enqueue q v = modifyMVar q.queue (`enqueue` v) >> conditionSignal q.cond - -export -enqueueAll : MQueue a -> List a -> IO () -enqueueAll q [] = pure () -enqueueAll q vs = modifyMVar q.queue (`enqueueAll` vs) >> conditionSignal q.cond - -covering -dequeue' : Bool -> MQueue a -> IO a -dequeue' b q = do - when b (mutexAcquire q.queue.lock) - q1 <- readIORef q.queue.ref - case dequeue q1 of - Nothing => do - conditionWait q.cond q.queue.lock - dequeue' False q - Just (v,q2) => do - writeIORef q.queue.ref q2 - mutexRelease q.queue.lock - pure v - -export covering %inline -dequeue : MQueue a -> IO a -dequeue = dequeue' True - --------------------------------------------------------------------------------- --- Worker --------------------------------------------------------------------------------- --- --- export --- record WorkST a where --- constructor W --- stopped : IORef Bool --- queue : MQueue a --- act : (a -> IO ()) -> a -> IO () --- --- export --- newWorkST : ((a -> IO ()) -> a -> IO ()) -> IO (WorkST a) --- newWorkST fun = [| W (newIORef False) newMQueue (pure fun) |] --- --- namespace WorkST --- export --- stop : WorkST a -> IO () --- stop w = do --- mutexAcquire w.queue.queue.lock --- writeIORef w.stopped True --- conditionBroadcast w.queue.cond --- mutexRelease w.queue.queue.lock --- --- export %inline --- submit : a -> WorkST a -> IO () --- submit v w = enqueue w.queue v --- --- export %inline --- submitAll : List a -> WorkST a -> IO () --- submitAll vs w = enqueueAll w.queue vs --- --- covering --- process' : Bool -> WorkST a -> IO () --- process' b w = do --- when b (mutexAcquire w.queue.queue.lock) --- q1 <- readIORef w.queue.queue.ref --- case dequeue q1 of --- Nothing => do --- False <- readIORef w.stopped --- | True => mutexRelease w.queue.queue.lock --- conditionWait w.queue.cond w.queue.queue.lock --- process' False w --- --- Just (v,q2) => do --- writeIORef w.queue.queue.ref q2 --- mutexRelease w.queue.queue.lock --- w.act (`submit` w) v --- process' True w --- --- export covering %inline --- process : WorkST a -> IO () --- process = process' True --- --- export --- record WorkPool a where --- constructor WP --- st : WorkST a --- ts : List ThreadID --- --- export covering --- newWorkPool : (n : Nat) -> ((a -> IO ()) -> a -> IO ()) -> IO (WorkPool a) --- newWorkPool n fun = do --- w <- newWorkST fun --- ts <- traverse (\x => fork $ process w) [1..n] --- pure $ WP w ts --- --- namespace WorkPool --- export --- stop : WorkPool a -> IO () --- stop (WP st ts) = stop st >> traverse_ (\x => threadWait x >> putStrLn "Thread done") ts --- --- export %inline --- submit : a -> WorkPool a -> IO () --- submit v = submit v . st --- --- export %inline --- submitAll : List a -> WorkPool a -> IO () --- submitAll vs = submitAll vs . st diff --git a/src/System/UV/File.idr b/src/System/UV/File.idr index b07666b..8fe6b3c 100644 --- a/src/System/UV/File.idr +++ b/src/System/UV/File.idr @@ -171,7 +171,7 @@ parameters {auto l : UVLoop} readBytes : File -> Bits32 -> Async es ByteString readBytes f size = use1 (mallocPtrs Bits8 size) $ \cs => - cancelable $ uvAsync $ \cb => do + uvAsync $ \cb => do async (uv_fs_read l.loop f.file cs size (-1)) (readOutcome cs cb) export @@ -189,7 +189,7 @@ parameters {auto l : UVLoop} -> (ByteString -> Async es (Maybe b)) -> Async es (Maybe b) streamFileUntil {b} path size fun = - use1 (fsOpen path RDONLY 0) $ \h => cancelable $ go h + use1 (fsOpen path RDONLY 0) $ \h => go h where go : File -> Async es (Maybe b) go h = do diff --git a/src/System/UV/Loop.idr b/src/System/UV/Loop.idr index 12d8555..4607653 100644 --- a/src/System/UV/Loop.idr +++ b/src/System/UV/Loop.idr @@ -80,7 +80,7 @@ parameters {auto has : Has UVError es} -> Async es a uvOnce p close reg = finally - (cancelable $ async $ \cb => do + (async $ \cb => do n <- reg (cb . Succeeded) case uvRes n of Left err => cb (Error err) @@ -108,12 +108,14 @@ parameters {auto has : Has UVError es} -> Async es (Fiber es b) uvForever to p close reg = start $ finally - (cancelable $ forever $ \cb => do - n <- reg (\va => onAsync (to va) cb) - case uvRes n of - Left err => cb (Error err) - Right () => pure () - ) + (do + cc <- self + forever $ \cb => do + n <- reg (\va => childOnAsync (Just cc) (to va) cb) + case uvRes n of + Left err => cb (Error err) + Right () => pure () + ) (close p) export diff --git a/src/System/UV/Stream.idr b/src/System/UV/Stream.idr index 45c3700..eb4c690 100644 --- a/src/System/UV/Stream.idr +++ b/src/System/UV/Stream.idr @@ -34,6 +34,11 @@ export parameters {auto l : UVLoop} {auto has : Has UVError es} + close_stream : Ptr Stream -> Async [] () + close_stream x = do + uv_read_stop x + ignore (uv_shutdown x $ \_,_ => uv_close x %search) + export streamRead : AllocCB @@ -45,6 +50,17 @@ parameters {auto l : UVLoop} uvForever run h (\x => uv_read_stop x) $ \cb => uv_read_start h ac (\_,n,buf => toMsg n buf >>= cb) + export + streamReadWrite : + AllocCB + -> Ptr t + -> {auto 0 cstt : PCast t Stream} + -> (ReadRes ByteString -> Async es (Maybe a)) + -> Async es (Fiber es a) + streamReadWrite ac h run = do + uvForever run h (close_stream . castPtr) $ \cb => + uv_read_start h ac (\_,n,buf => toMsg n buf >>= cb) + export write : Ptr t @@ -58,10 +74,10 @@ parameters {auto l : UVLoop} export listen : Ptr t - -> {auto 0 cstt : PCast t Stream} + -> {auto 0 cst : PCast t Stream} -> (Either UVError (Ptr Stream) -> Async es (Maybe a)) -> Async es (Fiber es a) - listen server run = - uvForever run server (\_ => pure ()) $ \cb => + listen {cst} server run = + uvForever run server (release . castPtr @{cst}) $ \cb => uv_listen server 128 $ \p,res => cb $ if res < 0 then Left $ fromCode res else Right p