Skip to content

Commit

Permalink
chore: add logging stream
Browse files Browse the repository at this point in the history
  • Loading branch information
ascorbic committed Oct 17, 2023
1 parent 297cf3f commit dcf3226
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 1 deletion.
5 changes: 4 additions & 1 deletion packages/demo-site/app/entry.server.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import { PassThrough } from 'node:stream'

import type { AppLoadContext, EntryContext } from '@remix-run/node'
import { createReadableStreamFromReadable } from '@remix-run/node'
import { createReadableStreamFromReadable } from './stream'
import { RemixServer } from '@remix-run/react'
import isbot from 'isbot'
import { renderToPipeableStream } from 'react-dom/server'
Expand Down Expand Up @@ -103,6 +103,9 @@ function handleBrowserRequest(
onShellError(error: unknown) {
reject(error)
},
onAllReady() {
console.log('all ready')
},
onError(error: unknown) {
responseStatusCode = 500
// Log streaming rendering errors from inside the shell. Don't log
Expand Down
120 changes: 120 additions & 0 deletions packages/demo-site/app/stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import type { Readable } from 'node:stream'
import { Stream } from 'node:stream'

export const createReadableStreamFromReadable = (source: Readable & { readableHighWaterMark?: number }) => {
let pump = new StreamPump(source)
let stream = new ReadableStream(pump, pump)
return stream
}

class StreamPump {
public highWaterMark: number
public accumalatedSize: number
private stream: Stream & {
readableHighWaterMark?: number
readable?: boolean
resume?: () => void
pause?: () => void
destroy?: (error?: Error) => void
}
private controller?: ReadableStreamController<Uint8Array>

constructor(
stream: Stream & {
readableHighWaterMark?: number
readable?: boolean
resume?: () => void
pause?: () => void
destroy?: (error?: Error) => void
},
) {
this.highWaterMark = stream.readableHighWaterMark || new Stream.Readable().readableHighWaterMark
this.accumalatedSize = 0
this.stream = stream
this.enqueue = this.enqueue.bind(this)
this.error = this.error.bind(this)
this.close = this.close.bind(this)
}

size(chunk: Uint8Array) {
return chunk?.byteLength || 0
}

start(controller: ReadableStreamController<Uint8Array>) {
console.log('start')
this.controller = controller
this.stream.on('data', this.enqueue)
this.stream.once('error', this.error)
this.stream.once('end', this.close)
this.stream.once('close', this.close)
}

pull() {
console.log('pull')
this.resume()
}

cancel(reason?: Error) {
console.log('cancel')
if (this.stream.destroy) {
this.stream.destroy(reason)
}

this.stream.off('data', this.enqueue)
this.stream.off('error', this.error)
this.stream.off('end', this.close)
this.stream.off('close', this.close)
}

enqueue(chunk: Uint8Array | string) {
console.log('enqueue')
if (this.controller) {
try {
let bytes = chunk instanceof Uint8Array ? chunk : Buffer.from(chunk)

let available = (this.controller.desiredSize || 0) - bytes.byteLength
this.controller.enqueue(bytes)
if (available <= 0) {
this.pause()
}
} catch (error: any) {
this.controller.error(
new Error(
'Could not create Buffer, chunk must be of type string or an instance of Buffer, ArrayBuffer, or Array or an Array-like Object',
),
)
this.cancel()
}
}
}

pause() {
console.log('pause')
if (this.stream.pause) {
this.stream.pause()
}
}

resume() {
console.log('resume')
if (this.stream.readable && this.stream.resume) {
this.stream.resume()
}
}

close() {
console.log('close')
if (this.controller) {
this.controller.close()
delete this.controller
}
}

error(error: Error) {
console.log('error')
if (this.controller) {
this.controller.error(error)
delete this.controller
}
}
}

0 comments on commit dcf3226

Please sign in to comment.