Skip to content

Commit

Permalink
[ refactor ] drive Async via an uv_async_t handle
Browse files Browse the repository at this point in the history
  • Loading branch information
stefan-hoeck committed Jan 12, 2024
1 parent b200d31 commit 1801c63
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 39 deletions.
2 changes: 2 additions & 0 deletions docs/src/Docs/UV/Async.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ parameters {auto l : UVLoop}
idleExample : DocIO ()
idleExample = do
putStrLn "Hello World"
ref <- newIORef 0
putStrLn "Starting the counter"
counter <- onIdle (checkCounter ref)
res <- raceEither (onSignal SIGINT) (once 5000)
Expand Down
63 changes: 32 additions & 31 deletions src/IO/Async/Async.idr
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ data Prim : List Type -> Type -> Type where
-- out cancel state that can be used when spawning a new fiber.
Self : Prim es (MVar CancelState)

-- repeatedly poll the given `IO` action until int yields a `Just`
-- repeatedly poll the given `IO` action until it yields a `Just`
Poll : IO (Maybe $ Outcome es a) -> Prim es a

-- cancel the current fiber
Expand Down Expand Up @@ -319,17 +319,6 @@ export %inline
forever : ((Outcome es (Maybe a) -> IO ()) -> IO ()) -> Async es a
forever f = AP P $ CB f

||| Repeatedly invokes the given `IO` action until it returns
||| a `Just`.
|||
||| This will semantically block the current fiber.
|||
||| If you want to run this in the background without blocking,
||| wrap it with `start`.
export %inline
poll : IO (Maybe $ Outcome es a) -> Async es a
poll = AP P . Poll

||| Runs an asynchronous computation in the background on a new fiber.
|||
||| The resulting fiber can be canceled from the current fiber, and
Expand All @@ -351,14 +340,14 @@ export
liftIO $
for_ f.parent (\p => removeChild p f.token) >>
ignore (cancel f.canceled)
AB P (poll $ tryGet f.outcome) (\_ => succeed ())
AB P (AP P . Poll $ tryGet f.outcome) (\_ => succeed ())

||| Semantically blocks the current fiber until the target fiber
||| has produced a result, in which case we continue with that
||| result.
export
(.await) : Fiber es a -> Async es a
(.await) f = poll $ tryGet f.outcome
(.await) f = AP P . Poll $ tryGet f.outcome

||| Semantically blocks the current fiber until one
||| of the two given fibers has produced an outcome, in which
Expand All @@ -374,7 +363,7 @@ raceF x y = do
fx <- x
fy <- y
finally
(cancelable $ poll $
(cancelable $ AP P . Poll $
tryGet fx.outcome >>= \case
Nothing => tryGet fy.outcome
Just v => pure (Just v)
Expand Down Expand Up @@ -434,6 +423,9 @@ record AsyncContext where
||| computations (fibers)
submit : EvalST -> IO ()

||| Wake up the context:w
wakeup : IO ()

||| Maximal number of iterations before we cede evaluation
||| to the next fiber.
limit : Nat
Expand All @@ -450,6 +442,7 @@ data PrimRes : (es : List Type) -> Type -> Type where
Done : Bool -> Outcome es a -> PrimRes es a

data EvalRes : (es : List Type) -> Type -> Type where
ECont : Bool -> Async es a -> Stack es fs a b -> EvalRes fs b
ECede : Bool -> Async es a -> Stack es fs a b -> EvalRes fs b
EDone : Bool -> Outcome es a -> EvalRes es a

Expand All @@ -470,19 +463,21 @@ set : Cancelability -> Async es a -> Async es a
set x (AP y z) = AP (x <+> y) z
set x (AB y z f) = AB (x <+> y) z f

runDeferred :
((Outcome es (Maybe a) -> IO ()) -> IO ())
-> Deferred (Outcome es a)
-> IO ()
runDeferred f x =
f $ \case
Succeeded (Just a) => ignore $ complete x (Succeeded a)
Succeeded Nothing => pure ()
Canceled => ignore $ complete x Canceled
Error err => ignore $ complete x (Error err)

parameters {auto ctxt : AsyncContext}

runDeferred :
((Outcome es (Maybe a) -> IO ()) -> IO ())
-> Deferred (Outcome es a)
-> IO ()
runDeferred f x =
f $ \y => do
case y of
Succeeded (Just a) => ignore $ complete x (Succeeded a)
Succeeded Nothing => pure ()
Canceled => ignore $ complete x Canceled
Error err => ignore $ complete x (Error err)
ctxt.wakeup

prim :
MVar CancelState
-> Bool
Expand All @@ -503,7 +498,8 @@ parameters {auto ctxt : AsyncContext}
prim m b c (Start x) w =
let MkIORes fbr w2 := toPrim (newFiber (Just m)) w
MkIORes () w3 := toPrim (ctxt.submit (EST fbr x [])) w2
in MkIORes (Done b $ Succeeded fbr) w3
MkIORes () w4 := toPrim ctxt.wakeup w3
in MkIORes (Done b $ Succeeded fbr) w4

prim m b c (Poll x) w =
let MkIORes Nothing w2 := toPrim x w
Expand All @@ -521,7 +517,7 @@ parameters {auto ctxt : AsyncContext}
-> Async es a
-> Stack es fs a b
-> PrimIO (EvalRes fs b)
step 0 m b act stck w = MkIORes (ECede b act stck) w
step 0 m b act stck w = MkIORes (ECont b act stck) w

step (S k) m b (AB c p f) stck w =
step k m b (set c p) ((c,f)::stck) w
Expand All @@ -541,8 +537,12 @@ parameters {auto ctxt : AsyncContext}
cs <- readMVar f.canceled
res <- primIO (step ctxt.limit f.canceled cs.canceled act stck)
case res of
ECede b act' stck' => cancel b f >> ctxt.submit (EST f act' stck')
EDone b res => cancel b f >> ignore (complete f.outcome res)
ECont b act' stck' =>
cancel b f >> ctxt.submit (EST f act' stck') >> ctxt.wakeup
ECede b act' stck' =>
cancel b f >> ctxt.submit (EST f act' stck')
EDone b res =>
cancel b f >> ignore (complete f.outcome res) >> ctxt.wakeup

--------------------------------------------------------------------------------
-- Running asynchronous computations
Expand All @@ -557,7 +557,8 @@ childOnAsync :
-> IO ()
childOnAsync parent as cb = do
fb <- newFiber parent
eval (EST fb (guarantee as (liftIO . cb)) [])
ctxt.submit (EST fb (guarantee as (liftIO . cb)) [])
ctxt.wakeup

||| Runs the given asynchronous computation to completion
||| invoking the given callback once it is done.
Expand Down
19 changes: 12 additions & 7 deletions src/System/UV/Loop.idr
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import IO.Async.Token

import Data.IORef
import System
import System.UV.Raw.Async
import System.UV.Raw.Handle
import System.UV.Raw.Loop
import System.UV.Raw.Pointer
import System.UV.Raw.Idle

import public IO.Async
import public System.UV.Data.Error
Expand All @@ -22,14 +22,20 @@ record UVLoop where
[noHints]
constructor MkLoop
loop : Ptr Loop
async : Ptr Async
tg : TokenGen
cc : CloseCB
ref : IORef (SnocList EvalST)
limit : Nat

export %inline %hint
loopTokenGen : UVLoop => AsyncContext
loopTokenGen @{l} = AC l.tg (\x => modifyIORef l.ref (:< x)) l.limit
loopTokenGen @{l} =
AC
l.tg
(\x => modifyIORef l.ref (:< x))
(ignore (uv_async_send l.async))
l.limit

export %inline %hint
loopCloseCB : UVLoop => CloseCB
Expand All @@ -43,14 +49,13 @@ defaultLoop = do
tg <- newTokenGen
ref <- newIORef {a = SnocList EvalST} [<]
cc <- defaultClose
pa <- mallocPtr Async

let loop := MkLoop l tg cc ref 100
let loop := MkLoop l pa tg cc ref 100

pc <- mallocPtr Idle
r1 <- uv_idle_init l pc
r2 <- uv_idle_start pc $ \x => do
r2 <- uv_async_init l pa $ \x => do
readIORef ref >>= \case
[<] => ignore (uv_idle_stop x) >> uv_close x cc
[<] => uv_close x cc
ss => writeIORef ref [<] >> traverse_ eval (ss <>> [])
pure loop

Expand Down
35 changes: 35 additions & 0 deletions src/System/UV/Raw/Async.idr
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
module System.UV.Raw.Async

import System.UV.Raw.Callback
import System.UV.Raw.Handle
import System.UV.Raw.Loop
import System.UV.Raw.Pointer
import System.UV.Raw.Util

%default total

--------------------------------------------------------------------------------
-- FFI
--------------------------------------------------------------------------------

%foreign (idris_uv "uv_async_init")
prim__uv_async_init : Ptr Loop -> Ptr Async -> AnyPtr -> PrimIO Int32

%foreign (idris_uv "uv_async_send")
prim__uv_async_send : Ptr Async -> PrimIO Int32

--------------------------------------------------------------------------------
-- API
--------------------------------------------------------------------------------

parameters {auto has : HasIO io}
export
uv_async_init : Ptr Loop -> Ptr Async -> (Ptr Async -> IO ()) -> io Int32
uv_async_init l h f = do
cb <- ptrCB f
uv_handle_set_data h cb
primIO $ prim__uv_async_init l h cb

export %inline
uv_async_send : Ptr Async -> io Int32
uv_async_send p = primIO $ prim__uv_async_send p
1 change: 0 additions & 1 deletion src/System/UV/Raw/Idle.idr
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
module System.UV.Raw.Idle

import Data.IORef
import System.UV.Raw.Callback
import System.UV.Raw.Handle
import System.UV.Raw.Loop
Expand Down
1 change: 1 addition & 0 deletions uv.ipkg
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ modules = IO.Async
, System.UV.Util
, System.UV.Work

, System.UV.Raw.Async
, System.UV.Raw.Callback
, System.UV.Raw.File
, System.UV.Raw.Handle
Expand Down

0 comments on commit 1801c63

Please sign in to comment.