Skip to content

hazae41/cascade

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

Cascade

Never let streams give you a headache again

npm i @hazae41/cascade

Node Package πŸ“¦

Why

It took me 1 year to manage JavaScript streams in such a way as to avoid headaches, fortunately you can skip this struggle and never get any headache (and even avoid Safari bugs!)

Features

Current features

  • 100% TypeScript and ESM
  • No external dependencies
  • No need for controller access
  • Simplex, Half-Duplex, Full-Duplex
  • Use Symbol.dispose to close streams

Usage

Streams

For basic scenarios, Cascade provides streams but with an accessible controller

const encoder = new SuperTransformStream<Uint8Array, Uint8Array>({ write: onWrite })

async function onWrite(chunk: Uint8Array) {
  /** 
   * No need to get the controller 
   */
  await stream.enqueue(encode(chunk))
}

example.readable
  .pipeTo(encoder.substream.writable)
  .then(() => console.log("Closed"))
  .catch(e => console.error("Errored", e))

encoder.substream.readable
  .pipeTo(example.writable)
  .then(() => console.log("Closed"))
  .catch(e => console.error("Errored", e))

/**
 * You can close it at any time
 */
encoder.terminate()

Plexes

For more complex scenarios, Cascade provides plexes, which are streams with events, and you can associate them to build complex and reliable structures

Simplex

A basic in-out stream with events

const simplex = new Simplex<Uint8Array>()

/**
 * You can use pipes
 */
example.readable
  .pipeTo(simplex.writable)
  .then(() => console.log("Closed"))
  .catch(e => console.error("Errored", e))

simplex.readable
  .pipeTo(example.writable)
  .then(() => console.log("Closed"))
  .catch(e => console.error("Errored", e))

/**
 * You can also use events
 */
const simplex = new Simplex<Uint8Array>({
  /**
   * When the simplex starts
   */
  async start() {
    this.enqueue(new Uint8Array([0, 1, 2]))
  },
  /**
   * When the simplex is closing
   */
  async close() {
    this.enqueue(new Uint8Array([7, 8, 9]))
  },
  /**
   * When the simplex is erroring
   */
  async error(error) {
    console.log("Errored", error)
  },
  /**
   * When the simplex receives chunks
   */
  async write(chunk) {
    /**
     * Pass the chunk to the readable side
     */
    this.enqueue(chunk)
  },
})

/**
 * You can use enqueue at any time
 */
simplex.enqueue(new Uint8Array([4, 5, 6]))

/**
 * You can use error at any time
 */
simplex.error(new Error())

/**
 * You can use close at any time
 */
simplex.close()

/**
 * You can check if it's closed
 */
simplex.closed

/**
 * You can check if it's errored
 */
simplex.errored

/**
 * You can check if it's closed or errored
 */
simplex.stopped

Full-Duplex

A pair of simplexes that are closed independently

  • When one side is errored, the other is automatically errored
  • When one side is closed, the other is NOT automatically closed

Events

  • error β€” called ONCE when input OR output are errored
  • close β€” called ONCE when input AND output are closed
class Crypter extends FullDuplex<Uint8Array, Uint8Array> {

  constructor() {
    super({
      input: { message: m => this.#onInputMessage(m) },
      output: { message: m => this.#onOutputMessage(m) },
      close: () => this.#onClose(),
      error: e => this.#onError(e)
    })
  }

  async #onInputMessage(data: Uint8Array) {
    this.input.enqueue(await encrypt(data))
  }

  async #onOutputMessage(data: Uint8Array) {
    this.output.enqueue(await decrypt(data))
  }

  async #onError(reason?: unknown) {
    console.error("Errored", reason)
  }

  async #onClose() {
    console.log("Closed")
  }

}

function crypt(subprotocol: FullDuplex<Uint8Array, Uint8Array>) {
  const crypter = new Crypter()

  subprotocol.outer.readable.pipeTo(crypter.inner.writable).catch(() => { })
  crypter.inner.readable.pipeTo(subprotocol.outer.writable).catch(() => { })

  return crypter
}

Half-Duplex

A pair of simplexes that are closed together

  • When one side is errored, the other is automatically errored
  • When one side is closed, the other is automatically closed

Events

  • error β€” called ONCE when input OR output are errored
  • close β€” called ONCE when input OR output are closed
class MySocket extends EventTarget {

  readonly #duplex = new HalfDuplex<string, string>({
    input: { message: m => this.#onMessage(m) },
    error: e => this.#onError(e),
    close: () => this.#onClose(),
  })

  get inner() {
    return this.#duplex.inner
  }

  send(message: string) {
    this.#duplex.output.enqueue(message)
  }

  error(reason?: unknown) {
    this.#duplex.output.error(reason)
  }

  close() {
    this.#duplex.output.close()
  }

  async #onMessage(data: string) {
    this.dispatchEvent(new MessageEvent("message", { data }))
  }

  async #onError(reason?: unknown) {
    this.dispatchEvent(new ErrorEvent("error", { error: reason }))
  }

  async #onClose() {
    this.dispatchEvent(new Event("close"))
  }

}