Skip to content

Library for async Javascript with support for error handling and recovery.

Notifications You must be signed in to change notification settings

srikumarks/IO.js

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

54 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

IO.js: A library for composeable asynchronous actions

IO is a Javascript library for managing sequencing of asynchronous actions. This infoq article summarizes the state of the art in an interview conducted with the creators of the existing libraries for managing asynchronous operations in JS and features interviews with the creators of these libraries - Do, Step, Flow-js, node-promise, Async, Async.js, FuturesJS and slide-flow-control. What stands out is that most of them say relatively little on "error management". Here is also a Hacker News thread discussing Python creator Guido van Rossum's objections to a callback based API, based on its poor ability to work with exceptions.

The focus of IO, therefore, is to provide for flexible error management, in particular --

  1. Trapping errors and deciding what to do with them,
  2. Recovering from errors and resuming operations,
  3. Managing control flow between error handlers,
  4. Showing clearly the scope of control of error handlers that are in effect.

... and, of course, do the rest of the stuff that the other libraries do fairly well.

With regard to efficiency, IO takes the view that the tasks that you're executing ought to be much more complex than any overheads IO might add.

Core concepts

IO, at its core, is a library for creating "actions" and "running" them, usually using IO.run(input, action).

Actions

An action is a function that does something, possibly asynchronously, and chooses what to do next based on what it did. Actions are usually created and composed using the various functions provided by IO, but you can write your own as well. You run an action like this -

IO.run("some input data", action);

User supplied actions can come in one of four forms. The forms are detected using the number of arguments that the function has -

  1. Ordinary functions of the form --

    function (input) { return something; }
    

    These actions succeed by returning a result which is passed along, or fail by throwing an exception. If the return value is an action, that action is inserted into the sequence right there, supplying the same given input. This gets us "dynamic actions". If the return value is not an action and is a datum, it is considered to be the output of this action that is to be passed to the steps further ahead in the sequence. If the return value is undefined, the execution sequence stops right there.

  2. Pure action of the form --

    function (callback, errback) { ... }
    

    This is in the common "callback/errback" style where the callback is a one argument function used for continuing with the output of this action and errback is a one argument function that starts an error processing sequence.

  3. Input processing form --

    function (input, callback, errback) { ... }
    

    The input flows in at the point the action is executed and callback and errback are as described above. You'll mostly use the previous form and this form.

  4. Fully customizeable form --

    function (M, input, success, failure) { ...  }
    

    M is the currently active orchestrator, input is the input available at the point the action is executed. The most important point to note is this --

    success and failure are also actions in this form.

    You'll rarely need this, but this allows you to change orchestrators on the fly, start other action sequences using the "current orchestrator", whatever that happens to be, tweak the control flow by affecting what comes before or after success and failure, etc. This form permits actions to be composed in IO and helps separate "what to do" from "when it is being done". If you want to trap the full continuations at any point to do something strange with them, you can use this form.

Orchestrators

IO provides (currently) two ways to run actions -- IO.run and IO.trace. These two correspond to the IO.Ex and IO.Tracer objects called "orchestrators". Orchestrators are used for customizing the execution pipeline. (Note: This is still work-in-progress and only some basic functionality is available for customization.)

  1. IO.run(input, action) will run the action normally, passing the given input object to it. It is an alias for IO.Ex.

    IO.run(input, action) = IO.Ex.run(input, action)
    
  2. If you want to trace the steps involved as an action runs on console.log, you can turn an action into a traced action using IO.trace like this - IO.trace(action). So if you run IO.run(input, IO.trace(action)), you'll get tracing output on the console.

    IO.run(input, IO.trace(action)) = IO.Tracer(IO.Ex).run(input, action)
    

    IO.trace works by changing the orchestrator on the fly to a tracer built on the original orchestrator used to run the action. The semantics of the action aren't affected by the insertion of the trace. This design, as opposed to merely exposing the tracer, also lets you trace selected portions of a longer action sequence. You can use IO.trace as a replacement for IO.do.

Core actions

IO.do(actions)

Makes an action that performs the given actions in the given order, passing the output of each action to the next. The resulting compound action can be further composed with other actions.

IO.do(a1, a2, ...)
IO.do([a1, a2, ...])

IO.try(actions, handler)

An action that performs the given actions and if any failure occurs, deems the actions to have failed and passes the error to the handler. The handler is joined to whatever follows after the try and can therefore continue by simply succeeding. If the handler fails, the whole try is considered to fail.

IO.try(action, handler)
IO.try([a1, a2, ...], handler)

IO.try(IO.log("some action"), IO.log("oops! Here is the error - ", true))

IO.alt(actions)

Short for "alternatives". The actions are tried in sequence and the first one to succeed passes its output to what follows the IO.alt. The whole alt action is semantically the same as that succeeding action. All actions receive the same input, unlike IO.try where the handler receives the error object of the failed action.

IO.alt(a1, a2, ...)
IO.alt([a1, a2, ...])

IO.raise(info)

Raises an in-sequence error meant for handling by whatever handlers have been setup. The info is arbitrary and is just passed along with the error object in the error field.

IO.raise("some error object")

IO.catch(onerror)

Sets up a "catch point" for trapping errors raised using IO.raise. This is useful for implementing commit-rollback semantics. onerror is itself an action.

The closest catch point gets to have a go first and can do a variety of things -

  1. Decide that it cannot handle the error and pass on to catch points "higher up". To do this, the handler must "fail".

    IO.catch(IO.fail)
    IO.catch(function (err, restart, giveup) {
        // ...
        giveup("some reason, maybe?");
    })
    
  2. Do something and try the sequence of actions immediately following this catch point once more. This is called a "restart". You can even setup loops this way. To do this, the handler must "succeed".

    IO.catch(function (err, restart, giveup) {
        // do something
        restart("new input");
    })
    
    // Ex: This does an infinite restart loop.
    IO.do(IO.log("one")
        , IO.catch(function (err, restart, giveup) {
              restart("again");
          })
        , IO.log("two")
        , IO.raise("forever"))
    
  3. Take some corrective action and resume from the raise point as though it succeeded. This "resume" action is available as the "resume" field of the error object, which you can call like error.resume(value) and the given value will be injected there.

    IO.catch(function (err, restart, giveup) {
        // take corrective action here
        err.resume("new input");
    })
    
    // Ex: 
    IO.run("input", 
        IO.trace(IO.log("one")
            , IO.log("two")
            , IO.catch(function (err, restart, giveup) {
                err.resume("YAY!");
              })
            , IO.log("three")
            , IO.raise("BOMB!")
            , IO.log("surprise!")))
    
  4. Do the "rollback" sequence again from the error point. This action is available in the error object and is invoked as error.rollback(value). The value you pass to the rollback function will usually be the error object itself.

    IO.catch(function (err, restart, giveup) {
        // Oh, we figured we can retry!
        err.rollback(err);
    })
    
  5. Deep customization relative to the error point is available through the success and failure actions stored in the error object. You can use this to, for example, change what happens before or after the resume completes, for example, log an error in a database.

    IO.catch(function (M, err, success, failure) {
        // Note that 'success' and 'failure' are complete
        // actions in themselves and not in the
        // "callback/errback" one-argument style.
        M.call(some_complex_action, err, success, failure);
    })
    

IO.finally(cleanup, action)

Composes the given action with a cleanup action such that the cleanup action will run after action completes successfully or just before action fails and control is about to leave all steps in action. The cleanup action is passed the same input passed to action and the output of the cleanup action is discarded before continuing.

You want the cleanup action to be as invisible to the surrounding context as possible, so for semantic purposes IO.finally(cleanup, action) would just be equivalent to action as though there is a garbage collector automatically cleaning up whenever necessary.

Note that when an IO.finally clause is preceded by a IO.catch (somewhere), the cleanup action will run before control gets to the catch. If the catch then decides to resume using err.resume(value), the steps from the finally and onwards will run and not right at the error point. So if you want to trap errors before the cleanup action of a finally gets to run, you have to put the catch within the finally's scope.

IO.fork(actions, progress)

Makes an action that starts off all the actions in the given actions array asynchronously. The resultant action will continue only after all of these actions complete (successfully, or by failing). The results of the actions are accumulated in an array, which is passed further down the sequence once all the actions finish. The array will contain either success values or IO.Error objects if errors occurred.

You can optionally supply a progress function that will be called to be notified as and when each of the actions complete. The progress argument has to be a function of the form -

function (i, done, total) {...}

where i is the index of the action that completed, done is the total number of actions completed up to the progress call and total is the total actions given.

IO.do(IO.log("before")
    , IO.fork([a1, a2, ...])
    , IO.log("after"))

IO.tee(action)

Spawns off the given action without joining it with the following steps. IO.tee always succeeds. The errors in the action are not part of the sequence that the tee belongs to and won't touch it in any way.

If you want to fork out some actions but don't want to wait for them to join back again, do this -

IO.tee(IO.fork([a1, a2, ...]))

IO.chan()

Makes a CSP-style channel that can be used to communicate between two asynchronous sequences that may potentially be running within two different orchestrators. The return value is an object with two properties named 'send' and 'recv' which are themselves actions. The 'send' action will send its input to receivers. Usually, there should only be one receiver per channel, but if there are many, then the order in which the data sent through send will be delivered to them is unspecified.

Here is a silly example -

var c = IO.chan();
var A = IO.do([
    IO.log('A1'),
    IO.delay(5000),
    IO.log('A2'),
    IO.supply('hello'),
    c.send,
    IO.delay(1000),
    IO.log('A3'),
    c.recv,
    IO.log('A4', true)
]);
var B = IO.do([
    IO.log('B1'),
    c.recv,
    IO.log('B2', true),
    IO.delay(2000),
    IO.log('B3'),
    IO.supply('greetings'),
    c.send
]);
IO.run('start', A);
IO.run('stop', B);

IO.interruptible(func)

Given a function (oninterrupt) -> action, produces an action that can be interrupted when it is running. The result action is given a property called interrupt, which is an action that can be executed in any sequence to interrupt the result action when it is running.

oninterrupt is a function of type function (handler) -> void, where handler is a zero-argument procedure that should be called when the result action is interrupted.

For example, IO.Browser.get produces such an interruptible action. In general, IO.interruptible is intended for wrapping such low level async routines such as XMLHttpRequest. Actions composed using IO are better dealt with using IO.try, IO.finally and IO.catch.

IO.timeout(ms, action, ontimeout)

Makes an action that couples the given action with a watchdog timer. If the watchdog fires before the action finishes, then the main action sequence will be considered to fail and the ontimeout action will be run before proceeding to fail. The input to the ontimeout action is the whole timeout action itself, passed as a zero-argument function, so that it can be restarted if necessary. Note that this is a weak timeout, in the sense that the action is not aborted. Only the continuation is aborted.

IO.gen(generatorFunc, delay_ms = 0)

An action that will run the following continuation with the values produced by generatorFunc until it returns undefined. It will wait for delay_ms between the invocations.

This action is called a "generator". It will pause when it detects an IO.PauseCondition being raised in the actions that follow. The entity responsible for raising the condition can subsequently resume the generator when the time comes. IO.atomic is one such action that can raise an IO.PauseCondition.

IO.pause

An action that will forever pause any generators driving the action sequence that it is a part of. It is called IO.pause because it raises an IO.PauseCondition.

IO.spray(array)

A generator that enumerates the array and sends its values down the action chain. The array argument is optional. If omitted, then the input is expected to be an array to enumerate.

IO.run(["hello", "world"], IO.do(IO.spray, IO.log("-> ", true)));
// will produce
-> 'hello'
-> 'world'

IO.cycle(array)

Similar to IO.spray, except that it cyclically enumerates the array forever. The array argument is optional. If omitted, then the input is expected to be an array to enumerate.

IO.enumFrom(from, step, to)

A generator that sends number sequences down the chain. The sequence values are from <= i < to stepping by step. If to is omitted, the sequence increments or decrements according to step forever. If step is omitted, it defaults to 1. If from is omitted, it defaults to 0.

IO.collectUntil(test)

Accumuates the input that arrives into an array, sending the array out every time a new value is added. The collection terminates when the input satisfies the given test function (function (input) -> Boolean). If no test function is given, then the collection will terminate when the input is undefined.

IO.atomic(action)

Builds an action that makes the given action operate "atomically". In other words, no matter how many action sequences use the resultant atomic action, there will, at any given time, be at most one running instance of it. If the action takes a long time, all those run calls will accumulate at the entry point of this action and will be processed in sequence one at a time. This is useful if the intermediate steps contain any "open database -> do something -> close database" kind of actions.

IO.atomic(a1, a2, ...)
IO.atomic([a1, a2, ...])
    = IO.atomic(IO.do(a1, a2, ...))

Note : IO.atomic maintains a finite sized buffer of processing requests. Once the buffer gets full, it raises an IO.PauseCondition that are trapped by generators (IO.gen and ilk) that pump requests to the atomic action. Once buffer space is available, the paused generators will be resumed. All this is usually transparent to you. You need to be aware of it if you're continuously driving atomic actions using IO.run, in which case you need to put in an appropriate handler for IO.PauseCondition.

IO.pipeline(actions)

Builds a pipeline of the actions in the given array, turning each action into a FIFO processor. This is just a convenient shorthand for an action that can be easily built using IO.do and IO.atomic as shown below.

IO.pipeline(a1, a2, ...)
    = IO.pipeline([a1, a2, ...])
    = IO.do([a1, a2, ...].map(IO.atomic))

IO.cond(branches)

A simple kind of dynamic action that chooses from a given list of actions by pattern matching on the input. branches is an array of two-element arrays -- i.e. it is of the form [[pat1, action1], [pat2, action2], ...]. The patN are patterns to be matched against the input and if patK is the first pattern that matches, then actionK is run as though the whole cond was that action from the beginning.

If none of the patterns match, cond raises an error (the error action is IO.cond.error). You're supposed to think of all valid cases and support them. The "raises error" behaviour is there to help you do that. You can customize this by passing in an action to be used in this case as an extra argument.

Patterns

Patterns can be -

  1. Simple literals such as string and numbers that are matched against the input using ===.

    IO.cond([
        //...
        ["literal", IO.log("some action"), ...]
        //...
    ]);
    
  2. Object literals. In this case, the input must be an object and the keys provided must all exist in the input and the values must also pattern match. The "patterns" in the value can be, again, of any type in this list.

    IO.cond([
        //...
        [{key1: pat1, key2: pat2, ...}, IO.log("some action"), ...]
        //...
    ]);
    
  3. A one argument function whose boolean result decides the success of the pattern match. Since functions can't usually be meaningfully compared or pattern matched against, this is a useful way to provide customizeable matching support. IO.cond.true is a trivial pattern matcher that always succeeds and IO.cond.false is one that always fails, which can be used as such matchers.

    IO.cond([
        //...
        [function (val) { return val > 3 && val < 10; }, IO.log("some action"), ...]
        //...
    ])
    

IO.map(fn)

An action that transforms the input it receives using the given function fn and sends that value as its output.

IO.filter(pred)

An action that will let only those inputs pass through which satisfy the given one-argument predicate function. Other inputs are just drained.

IO.reduce(reductionFn, initialValue)

Makes a reducing action that will, for each input received, apply the given reduction function (of the form reductionFn(accum, value)) to accumulate some result and send the accumulated result to its output.

Trivial but useful actions

IO.pass

This is a trivial action that you can use as return values from functions in case the function wants to return a no-op.

IO.cond([
    //...
    ["some pat", IO.pass]
    //..
])

IO.fail

The trivial fail action jumps to the failure continuation.

IO.cond([
    //...
    ["some pat", IO.fail]
    //...
])

IO.forgive

A simple action for use with catch that discards the error object and restarts the sequence after the catch using the input provided in the error object.

IO.do(
    IO.catch(IO.cond([
        [{error: "Biff!"}, IO.log("Biff!"), IO.forgive],
        [{error: "Whew!"}, IO.log("Whew!"), IO.fail]
    ])),
    IO.log("fight"),
    function (input) {
        return IO.raise(Math.random() < 0.8 ? "Biff!" : "Whew!");
    }
)

IO.log(string, inputAlso)

An action that logs the string when it is hit and continues normally. If inputAlso is true, then the input is also printed using JSON.stringify.

IO.log("hello")

IO.probe(func)

Generates an action that calls func with the input that came into the action, but otherwise doesn't affect the flow and the sequence will automatically continue. Notice that func is not given access to the sequencing object, so it cannot affect the sequence, not even by throwing an exception.

IO.probe(function (input) {
    document.write("input received");
})

IO.add(input)

This generates an action that adds the keys of the supplied input object to the input that arrived via the action sequence. This is useful for inserting additional information into the "data stream".

IO.run({one: "ek"}, 
    IO.trace(IO.log("before", true)
           , IO.add({two: "do"})
           , IO.log("after", true)));

IO.supply(input)

Similar to IO.add, but ignores the input that came in through the action sequence and replaces it with the given input object.

IO.run({one: "ek"}, 
    IO.trace(IO.log("before", true)
           , IO.supply({two: "do"})
           , IO.log("after", true)));

IO.delay(ms)

Generates an action that will wait for the given number of milliseconds before proceeding.delay the execution of the given action by the given amount of time.

IO.do(IO.delay(1000), IO.log("bing!"))

IO.clock(ms)

A clock triggers the action sequence that follows periodically. You can put a clock in front of an action by composing it using IO.do like this -

var periodicAction = IO.do(IO.clock(500), someAction);

The above clock will then send an incrementing counter to someAction every 500ms.

The clock is inactive upon creation. You start the clock by running the composed action with "start" as the input. Running it again with "stop" will stop the periodic runs and running it with "reset" will reset the clock counter back to 0.

WARNING : It is up to you to make sure that the action being triggered by the clock will not take longer than the time interval between two ticks. Otherwise, actions will start accumulating and your system, at some point, will go down. This is intended for stuff like periodic cleanup actions and keep-alives, where the time taken by the action is minuscule compared to the tick interval.

IO.sync(N)

Returns an object with two actions with keys "now" and "later". You can use them in two different action sequences that you want to synchronize at a certain point. The "now" action will continue when at least N of the "later" actions have happened.

N defaults to 1 if you either leave it out or pass 0, since it makes no sense to make a "now" action that will never fire .. which is what N = 0 would have to mean.

IO.WebServer(port, options)

This is an experimental continuation based web server built atop IO for running in Node.js. For a simple example, see the Arc challenge written using IO.WebServer. The web server is loaded in Node.js like this -

var IO = require('./path/to/IO.WebServer.js');

You make a "web server instance" like this -

var WS = IO.WebServer(8080);

Following this, you setup the routes and actions using the WS object and finally start the web server using -

WS.start();

If options.key and options.cert are set to the security credentials of the server, then the server uses https, otherwise it uses http. The options argument can be omitted. You can have multiple web server instances running on different ports.

Once you setup routes, which are mappings between URLs and IO actions, the server will route GET and POST requests to the corresponding actions, passing a "connection" object to the action. The handler action uses the connection object to provide response material to the client.

While GET requests are simple, the server collects form arguments for POST requests and makes them available as a dictionary in the data property of the connection object passed on as input to route actions.

The connection object passed to handler actions has the following properties -

  • .id is the request id number.
  • .route is the route object. The main property of concern to the user are .route.path which is the URL fragment.
  • .request is the Node.js http request object.
  • .response is the Node.js http response object to which the handler writes responses.
  • .data is, in the case of POST requests, a dictionary of form keys to decoded values.

WS.route(path, action, recursive)

This adds a route that results in the given action (as in IO action) being triggered when a request for that route arrives. recursive being true indicates that this route mapping applies to all sub-routes if a more specific route entry isn't found. For recursive routes, the path must end in '/'.

WS.route('/greeting', WS.page(WS.write('Hello world!')));

The "input" that is passed to the action is the Node.js connection object, so the receiving action can make use of any form values passed in.

WS.serveFile(path, options)

An action that will serve up the file identified by the given local path.

options.mime_type can be the MIME type of the file if you know it. Otherwise it will be guessed.

options.encoding can be given if known. If unspecified, utf8 is assumed for text files and binary for others.

WS.route('/logo.png', WS.serveFile('./static_files/artwork/logo.png'));

WS.serveDir(dir)

Maps all sub-paths of the route URL to sub-paths of the given directory.

WS.serveURL(urlroot)

Like WS.serveFile and WS.serveDir but pipes the contents of the given URL to the client.

WS.page(actions)

Wraps actions that generate content for a given response page.

WS.route('/greet', WS.page(WS.write('Hello World!')));

WS.write(content)

An action that writes the given content to the client connection. The content can be either a string or a function (W, conn) -> String, in which case the function will be called with the appropriate session object and client request information.

WS.expire(timeout_secs)

Causes the current session to expire in timeout_secs. Session expiry condition is indicated by an IO.raise('session_expired') happening when a client request is being handled.

Dynamic links

Inside an action, you can generate dynamic URLs bound to newly determined actions to continue with other steps that the client might take. The session object is simply the "orchestrator" active during the execution of any action sequence. This orchestrator (call it W) has a couple of methods for generating such dynamic links. The dynamic links generated are subject to the expiration setting specified using WS.expire(timeout_secs).

W.link(action)

Returns a URL which will result in the given action being run when a request is made to it.

W.links(name_action_map)

Given a name-to-action map in the form of an object, returns an isomorphic object with the same names but whose values are URLs that will result in those actions being triggered. The names are merely for programming convenience and may be useful in template engines.

About

Library for async Javascript with support for error handling and recovery.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published