Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototyping several changes for better support for reading bytes #287

Closed
wants to merge 93 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
93 commits
Select commit Hold shift + click to select a range
538bb41
Add operation stream library
tyoshino Feb 19, 2015
bf34cdd
Fix operation stream and add a sample of network API implementation e…
tyoshino Feb 19, 2015
aef7cb9
Remove test.only
tyoshino Feb 19, 2015
f429621
Add pipe
tyoshino Feb 20, 2015
e1cc665
In pipe test, also check one operation.
tyoshino Feb 20, 2015
02e1a6c
Make pipe work with close
tyoshino Feb 20, 2015
ff8e97c
Factor out fake network reading code into FakeByteSource
tyoshino Feb 20, 2015
c0b48cb
Reorganize strategies in operation stream test
tyoshino Feb 20, 2015
4f65823
Reorganized operation stream file API sample
tyoshino Feb 20, 2015
80a88d9
Tweak on operation stream example
tyoshino Feb 20, 2015
67d4195
Factor out readAndVerify in operation stream test
tyoshino Feb 20, 2015
657f266
Fix checkWritableState. It should just throw.
tyoshino Feb 23, 2015
113628f
Add FakeBufferTakingByteSink
tyoshino Feb 23, 2015
4634d94
Add a pipe test from a buffer taking source to a sink with buffer
tyoshino Feb 23, 2015
5d94d03
Improve readability of "source with a buffer pool" test cases
tyoshino Feb 23, 2015
8241b5c
Simplify piping code in tests
tyoshino Feb 24, 2015
f33a4a6
Factor out view filling code
tyoshino Feb 24, 2015
7dd8a19
Factor out handlers
tyoshino Feb 24, 2015
383196b
Add catch() and asserts
tyoshino Feb 24, 2015
1b48552
Make pipeOperationStreams fulfill the returned promise.
tyoshino Feb 24, 2015
a797e4b
Factor out select() in pipeOperationStreams()
tyoshino Feb 24, 2015
2f4ef65
Factor out selecting function in tests
tyoshino Feb 24, 2015
d405f98
One more select factoring out
tyoshino Feb 24, 2015
22dbba5
Add pool return check
tyoshino Feb 24, 2015
1b99b82
Add waitSpaceChange and make pipeOperationStream() forward window info.
tyoshino Feb 24, 2015
8de0798
Move jointOps into pipeOperationStreams
tyoshino Feb 24, 2015
020d7fd
Add byteCountingPipe
tyoshino Feb 24, 2015
9dd062c
Fix window saving code
tyoshino Feb 24, 2015
2d7de48
Move puller and filler into FakeFile
tyoshino Feb 24, 2015
1d87cce
Bunch of reorganization to merge stream creation methods into source/…
tyoshino Feb 24, 2015
acd8034
Group tests
tyoshino Feb 24, 2015
9b10507
Add comment about createBufferFillingStream
tyoshino Feb 24, 2015
9084385
Remove unnecessary accessor from internal classes
Feb 24, 2015
ce8a331
Clean up
Feb 24, 2015
ae610b5
Remove stale code
Feb 24, 2015
f7ccb6f
Clean up
Feb 24, 2015
815649b
Remove unused code
Feb 24, 2015
3772a9e
Fix precondition check in abort()
tyoshino Feb 25, 2015
8b6462a
Remove ToRace from the variable holding promises to pass to Promise.r…
tyoshino Feb 25, 2015
afb6611
Fix rejection path in pipeOperationStreams()
tyoshino Feb 25, 2015
ed46e5c
Move pipeOperationStreams to ReadableOperationStream and rename to pi…
tyoshino Feb 25, 2015
41a96d6
Small fixes
tyoshino Feb 25, 2015
ce08395
Revert the change to move pipeOperationStreams to pipeTo and fix some…
tyoshino Feb 25, 2015
5cfaa5c
Fix abort/cancel signal propagation in pipeOperationStreams and add t…
tyoshino Feb 25, 2015
25d5076
Add a comment to selectOperationStreams and remove stale asserts
tyoshino Feb 25, 2015
6d2f051
Make jointOps no-op for unknown state
tyoshino Feb 25, 2015
913ade9
Check state on abort/cancel
tyoshino Feb 25, 2015
5c0b6cf
Rename OperationStream to OperationQueue since Stream is not always m…
Feb 26, 2015
a7c50c6
Rename wrappers for OperationQueue so that they don't sound like gene…
Feb 26, 2015
337588f
Explain that createOperationQueue is for creating queue-backed streams
Feb 26, 2015
8394449
Rename read() to readOperation() and add a syntax sugar read()
tyoshino Feb 27, 2015
adfd15c
Rename cancelled to errored
tyoshino Feb 27, 2015
ddd60a5
Rename aborted to errored
tyoshino Feb 27, 2015
6e3f9a1
Change semantics of ready and rename to readable and writable
tyoshino Feb 27, 2015
c9fdd2c
Add writer and reader
tyoshino Feb 27, 2015
3f6cd32
Propagate more errors in pipe
tyoshino Feb 27, 2015
28eb32a
Remove restoreWindowAndReject
tyoshino Feb 27, 2015
35ba1e3
Drastic redesign
tyoshino Feb 27, 2015
d94a040
Add operation queue file
tyoshino Feb 27, 2015
a9f45b9
Make things work after file reorganization
Feb 27, 2015
57a3c91
Reoreder methods
Feb 27, 2015
76df4ca
Use underlying source/sink approach
Feb 27, 2015
ba4d9f1
Hide details of OperationQueue
Feb 27, 2015
2adb0cf
Fix transition to drain
Feb 27, 2015
9918c76
More comments
Feb 27, 2015
6e43b6e
Rename
Feb 27, 2015
43dcaae
Tweak
Feb 27, 2015
aac7c91
Implemented Writer
Feb 27, 2015
b48944f
Use revealing constructor pattern
tyoshino Feb 28, 2015
f86d334
Add unstoppable source example
tyoshino Mar 1, 2015
f63d2de
Move all stuff for implementing operation concept to operation-queue
tyoshino Mar 1, 2015
8a19445
Tweak
tyoshino Mar 1, 2015
5eb050b
Reorganize promise sync methods
tyoshino Mar 1, 2015
9604a12
Reorganize methods based on their purpose
tyoshino Mar 1, 2015
27a72de
Cont'd
tyoshino Mar 1, 2015
ce30189
Split OperationQueue
tyoshino Mar 1, 2015
03e57d9
More reorg
tyoshino Mar 1, 2015
7f252ba
More
tyoshino Mar 1, 2015
19de1e0
Remove cancelled state from Writable and aborted state from Readable
tyoshino Mar 1, 2015
9e9fa78
File reorganization
tyoshino Mar 1, 2015
4527336
Add stream-base.js
tyoshino Mar 1, 2015
b0540ce
State check fix for read()
tyoshino Mar 1, 2015
0d54e6e
Add ThinStream.md
tyoshino Mar 1, 2015
3931c1b
Add close() to ThinStream
tyoshino Mar 1, 2015
06939ad
Thin Stream
tyoshino Mar 4, 2015
b79a7af
Rename
tyoshino Mar 4, 2015
6144e47
Mark work in progress files with a comment WIP WIP ...
tyoshino Mar 4, 2015
2ea1755
Update ThinStream.md
tyoshino Mar 4, 2015
52ec38f
Add ThinAsyncReadableByteStream
tyoshino Mar 8, 2015
4b9f438
Rename streams to Reader. Stream suffix would be given to the source …
tyoshino Mar 9, 2015
4be050f
Add ThinStreamReader delegate revocation
tyoshino Mar 9, 2015
a0d8aea
Implement ThinByobByteStreamReader
tyoshino Mar 11, 2015
406ac64
More tests and comments for ThinByobByteStreamReader
tyoshino Mar 12, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions ThinStream.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
```es6
class UnderlyingSink {
// delegate contains:
// - Back pressure signaling
// - markWaiting(): Tells the stream that backpressure is applied.
// - markWritable(): Tells the stream that backpressure is not applied.
// - onSpaceChange(): Tells the stream that get space() may return something new.
// - Error signaling
// - markErrored(error): Tells the stream that the sink is errored.
start(delegate)

// Takes value and do some processing. May return something. If backpressure state is changed,
// should call appropriate delegate function to update the stream.
write(value)
// Do something to finalize the sink. No more write() or close() comes. May return something.
close()
// Takes reason and do something to abort the sink. May return something.
abort(reason)

// Should returns a number indicating how much data can be written without causing backpressure.
// Maye return undefined if it's unknown.
get space()
}

class ThinWritableStream {
// At the end of construction, calls start() on underlyingSink with delegate functions for this
// stream.
constructor(underlyingSink)

// List of states in the form of:
// X -> A, B, C, ...
// X is state name or a name of a group of states.
// A, B, C, ... are the states X may transit to.
//
// - (write()/close() are allowed) -> "closed", "aborted", "errored"
// - "waiting" (backpressure not applied) -> "writable"
// - "writable" (backpressure applied) -> "waiting"
// - "closed" -> "aborted", "errored"
// - "aborted"
// - "errored"
//
// Distinction between "waiting" and "writable" is a part of the flow control interfaces.
get state()

// Main interfaces.

// Passes value and returns something returned by the underlying sink.
//
// Available in "waiting" and "writable" state.
write(value)
// Tells the underlying sink that no more data will be write()-en and returns something returned
// by the underlying sink.
//
// Available in "waiting" and "writable" state.
close()
// Tells the underlying sink that no more data will be write()-en and returns something returned
// by the underlying sink.
//
// Available in "waiting", "writable" and "closed" state.
abort(reason)

// Error receiving interfaces.

// Returns a promise which gets fulfilled when this instance enters the "errored" state.
get errored()
// Returns an object representing the error when the state is "errored".
get error()

// Flow control interfaces.

// Returns a promise which gets fulfilled when this instance enters "writable" state.
get ready()
// Returns the space available for write.
//
// Available in "waiting" and "writable" state.
get space()
// Returns a promise which gets fulfilled when space() becomes different value than one at the last
// waitSpaceChange() call.
//
// Available in "waiting" and "writable" state.
waitSpaceChange()
}

class UnderlyingSource {
// delegate contains:
// - Readability signaling
// - markWaiting(): Tells the stream that nothing is ready for synchronous reading.
// - markReadable(): Tells the stream that something is ready for synchronous reading.
// - markClosed(): Tells the stream that no more data will become available for read.
// - Error signaling
// - markErrored(error): Tells the stream that the source is errored.
start(delegate)

// Returns something generated. If data availability is changed, should call appropriate
// delegate function to update the stream.
read()
// Do something to cancel the source. May return something.
cancel(reason)

// Should interpret the given number v to update how much data to generate / pull and buffer
// in the source.
get onWindowUpdate(v)
}

class ThinReadableStream {
constructor(underlyingSource)

// - normal -> "cancelled", "errored", "closed"
// - "waiting" -> "readable"
// - "readable" -> "waiting"
// - "closed"
// - "cancelled"
// - "errored"
get state()

// Returns a promise which gets fulfilled when this instance enters "readable" or "closed" state.
get ready()
// Returns something returned by the underlying source.
//
// Available in "readable" state.
read()
// Passes reason and returns something returned by the underlying source.
//
// Available in "waiting" and "readable" state.
cancel(reason)

get errored()
get error()

// Flow control interfaces.

// Available in "waiting" and "readable" state.
get window()
// Passes v to the underlying source. v indicates how much data should be pulled.
//
// Available in "waiting" and "readable" state.
set window(v)
}
```
68 changes: 68 additions & 0 deletions reference-implementation/lib/experimental/byte-stream-queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/// WIP WIP WIP WIP


pull(container) {
if (container === undefined) {
throw new TypeError('container is undefined');
}
if (container.constructor !== Uint8Array) {
throw new TypeError('the only supported container is Uint8Array');
}

this._pendingPulls.push(container);
this._resolvePendingPulls();
}

_resolvePendingPulls() {
let bytesConsumed = 0;
let enqueued = false;

while (this._pendingPulls.length > 0) {
const destView = this._pendingPulls.shift();
let destViewPosition = 0;

while (this._shared._queue.length > 0) {
if (destViewPosition === destView.byteLength) {
this._readableValueQueue.push(destView);
enqueued = true;
break;
}

const entry = this._shared._queue[0];

const srcView = entry.value;
if (srcView === undefined || srcView.constructor !== Uint8Array) {
console.log('not reached');
}

const bytesToCopy = Math.min(destView.byteLength - destViewPosition, srcView.byteLength);
destView.set(srcView.subarray(0, bytesToCopy), destViewPosition);
destViewPosition += bytesToCopy;

if (bytesToCopy === srcView.byteLength) {
this._shared._queue.shift();
} else {
this._shared[0] = srcView.subarray(bytesToCopy);
}
bytesConsumed += bytesToCopy;
}

if (this._shared._queue.length === 0) {
this._readableValueQueue.push(destView.subarray(0, destViewPosition));
enqueued = true;

if (this._shared._draining) {
while (this._pendingPulls.length > 0) {
const destView = this._pendingPulls.shift();
this._readableValueQueue.push(destView.subarray());
}
}
break;
}
}

this._delegate.markReadable();

this._shared._queueSize -= bytesConsumed;
this._sink.onQueueConsume();
}
Loading