forked from whatwg/streams
-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.bs
8353 lines (7001 loc) · 404 KB
/
index.bs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
<pre class=metadata>
Group: WHATWG
H1: Streams
Shortname: streams
Text Macro: TWITTER streamsstandard
Text Macro: LATESTRD 2023-08
Abstract: This specification provides APIs for creating, composing, and consuming streams of data
Abstract: that map efficiently to low-level I/O primitives.
Translation: ja https://triple-underscore.github.io/Streams-ja.html
!Demos: <a href="https://streams.spec.whatwg.org/demos/">streams.spec.whatwg.org/demos</a>
Indent: 1
Markup Shorthands: markdown yes
</pre>
<pre class=link-defaults>
spec:webidl; type:dfn; text:resolve
spec:webidl; type:dfn; text:new
spec:infra; type:dfn; text:list
spec:html; type:dfn; text:entangle
spec:html; type:dfn; text:message port post message steps
spec:html; type:dfn; text:port message queue
</pre>
<pre class="anchors">
urlPrefix: https://tc39.es/ecma262/; spec: ECMASCRIPT
type: constructor
text: %Uint8Array%; url: #sec-typedarray-objects
text: %DataView%; url: #sec-dataview-constructor
text: %ArrayBuffer%; url: #sec-arraybuffer-constructor
type: interface
text: ArrayBuffer; url: #sec-arraybuffer-objects
text: DataView; url: #sec-dataview-objects
text: Number; url: #sec-ecmascript-language-types-number-type
text: SharedArrayBuffer; url: #sec-sharedarraybuffer-objects
text: Uint8Array; url: #sec-typedarray-objects
text: %Object.prototype%; url: #sec-properties-of-the-object-prototype-object
type: dfn
text: abstract operation; url: #sec-algorithm-conventions-abstract-operations
text: array; url: #sec-array-objects
text: async generator; url: #sec-asyncgenerator-objects
text: async iterable; url: #sec-asynciterable-interface
text: completion record; url: #sec-completion-record-specification-type
text: internal slot; url: #sec-object-internal-methods-and-internal-slots
text: iterable; url: #sec-iterable-interface
text: realm; url: #sec-code-realms
text: the current Realm; url: #current-realm
text: the typed array constructors table; url: #table-49
text: typed array; url: #sec-typedarray-objects
text: Number type; url: #sec-ecmascript-language-types-number-type
text: Data Block; url: #sec-data-blocks
type: abstract-op
text: Call; url: #sec-call
text: CloneArrayBuffer; url: #sec-clonearraybuffer
text: CopyDataBlockBytes; url: #sec-copydatablockbytes
text: CreateArrayFromList; url: #sec-createarrayfromlist
text: CreateBuiltinFunction; url: #sec-createbuiltinfunction
text: CreateDataProperty; url: #sec-createdataproperty
text: Construct; url: #sec-construct
text: DetachArrayBuffer; url: #sec-detacharraybuffer
text: Get; url: #sec-get-o-p
text: GetIterator; url: #sec-getiterator
text: GetMethod; url: #sec-getmethod
text: GetV; url: #sec-getv
text: IsDetachedBuffer; url: #sec-isdetachedbuffer
text: IsInteger; url: #sec-isinteger
text: IteratorComplete; url: #sec-iteratorcomplete
text: IteratorNext; url: #sec-iteratornext
text: IteratorValue; url: #sec-iteratorvalue
text: OrdinaryObjectCreate; url: #sec-ordinaryobjectcreate
text: SameValue; url: #sec-samevalue
text: Type; url: #sec-ecmascript-data-types-and-values
text: TypeError; url: #sec-native-error-types-used-in-this-standard-typeerror; type: exception
text: map; url: #sec-array.prototype.map; type: method; for: Array.prototype
urlPrefix: https://webassembly.github.io/spec/js-api/; spec: WASM-JS-API-1
type: interface
text: Memory; url: #memory
type: attribute
text: buffer; for: Memory; url: #dom-memory-buffer
url: https://wicg.github.io/compression/#compressionstream; spec: COMPRESSION; type: interface; text: CompressionStream
</pre>
<style>
div.algorithm + div.algorithm { margin-top: 3em; }
</style>
<h2 id="intro">Introduction</h2>
<div class="non-normative">
<em>This section is non-normative.</em>
Large swathes of the web platform are built on streaming data: that is, data that is created,
processed, and consumed in an incremental fashion, without ever reading all of it into memory. The
Streams Standard provides a common set of APIs for creating and interfacing with such streaming
data, embodied in [=readable streams=], [=writable streams=], and [=transform streams=].
These APIs have been designed to efficiently map to low-level I/O primitives, including
specializations for byte streams where appropriate. They allow easy composition of multiple streams
into [=pipe chains=], or can be used directly via [=/readers=] and [=writers=]. Finally, they are
designed to automatically provide [=backpressure=] and queuing.
This standard provides the base stream primitives which other parts of the web platform can use to
expose their streaming data. For example, [[FETCH]] exposes {{Response}} bodies as
{{ReadableStream}} instances. More generally, the platform is full of streaming abstractions waiting
to be expressed as streams: multimedia streams, file streams, inter-global communication, and more
benefit from being able to process data incrementally instead of buffering it all into memory and
processing it in one go. By providing the foundation for these streams to be exposed to developers,
the Streams Standard enables use cases like:
* Video effects: piping a readable video stream through a transform stream that applies effects in
real time.
* Decompression: piping a file stream through a transform stream that selectively decompresses files
from a <kbd>.tgz</kbd> archive, turning them into <{img}> elements as the user scrolls through an
image gallery.
* Image decoding: piping an HTTP response stream through a transform stream that decodes bytes into
bitmap data, and then through another transform that translates bitmaps into PNGs. If installed
inside the {{ServiceWorkerGlobalScope/fetch}} hook of a service worker, this would allow
developers to transparently polyfill new image formats. [[SERVICE-WORKERS]]
Web developers can also use the APIs described here to create their own streams, with the same APIs
as those provided by the platform. Other developers can then transparently compose platform-provided
streams with those supplied by libraries. In this way, the APIs described here provide unifying
abstraction for all streams, encouraging an ecosystem to grow around these shared and composable
interfaces.
</div>
<h2 id="model">Model</h2>
A <dfn export>chunk</dfn> is a single piece of data that is written to or read from a stream. It can
be of any type; streams can even contain chunks of different types. A chunk will often not be the
most atomic unit of data for a given stream; for example a byte stream might contain chunks
consisting of 16 KiB {{Uint8Array}}s, instead of single bytes.
<h3 id="rs-model">Readable streams</h3>
A <dfn export>readable stream</dfn> represents a source of data, from which you can read. In other
words, data comes
<em>out</em> of a readable stream. Concretely, a readable stream is an instance of the
{{ReadableStream}} class.
Although a readable stream can be created with arbitrary behavior, most readable streams wrap a
lower-level I/O source, called the <dfn>underlying source</dfn>. There are two types of underlying
source: push sources and pull sources.
<dfn lt="push source">Push sources</dfn> push data at you, whether or not you are listening for it.
They may also provide a mechanism for pausing and resuming the flow of data. An example push source
is a TCP socket, where data is constantly being pushed from the OS level, at a rate that can be
controlled by changing the TCP window size.
<dfn lt="pull source">Pull sources</dfn> require you to request data from them. The data may be
available synchronously, e.g. if it is held by the operating system's in-memory buffers, or
asynchronously, e.g. if it has to be read from disk. An example pull source is a file handle, where
you seek to specific locations and read specific amounts.
Readable streams are designed to wrap both types of sources behind a single, unified interface. For
web developer–created streams, the implementation details of a source are provided by <a
href="#underlying-source-api">an object with certain methods and properties</a> that is passed to
the {{ReadableStream()}} constructor.
[=Chunks=] are enqueued into the stream by the stream's [=underlying source=]. They can then be read
one at a time via the stream's public interface, in particular by using a [=readable stream reader=]
acquired using the stream's {{ReadableStream/getReader()}} method.
Code that reads from a readable stream using its public interface is known as a <dfn>consumer</dfn>.
Consumers also have the ability to <dfn lt="cancel a readable stream">cancel</dfn> a readable
stream, using its {{ReadableStream/cancel()}} method. This indicates that the consumer has lost
interest in the stream, and will immediately close the stream, throw away any queued [=chunks=], and
execute any cancellation mechanism of the [=underlying source=].
Consumers can also <dfn lt="tee a readable stream">tee</dfn> a readable stream using its
{{ReadableStream/tee()}} method. This will [=locked to a reader|lock=] the stream, making it
no longer directly usable; however, it will create two new streams, called <dfn lt="branches of a
readable stream tee">branches</dfn>, which can be consumed independently.
For streams representing bytes, an extended version of the [=readable stream=] is provided to handle
bytes efficiently, in particular by minimizing copies. The [=underlying source=] for such a readable
stream is called an <dfn>underlying byte source</dfn>. A readable stream whose underlying source is
an underlying byte source is sometimes called a <dfn export>readable byte stream</dfn>. Consumers of
a readable byte stream can acquire a [=BYOB reader=] using the stream's
{{ReadableStream/getReader()}} method.
<h3 id="ws-model">Writable streams</h3>
A <dfn export>writable stream</dfn> represents a destination for data, into which you can write. In
other words, data goes <em>in</em> to a writable stream. Concretely, a writable stream is an
instance of the {{WritableStream}} class.
Analogously to readable streams, most writable streams wrap a lower-level I/O sink, called the
<dfn>underlying sink</dfn>. Writable streams work to abstract away some of the complexity of the
underlying sink, by queuing subsequent writes and only delivering them to the underlying sink one by
one.
[=Chunks=] are written to the stream via its public interface, and are passed one at a time to the
stream's [=underlying sink=]. For web developer-created streams, the implementation details of the
sink are provided by <a href="#underlying-sink-api">an object with certain methods</a> that is
passed to the {{WritableStream()}} constructor.
Code that writes into a writable stream using its public interface is known as a
<dfn>producer</dfn>.
Producers also have the ability to <dfn lt="abort a writable stream">abort</dfn> a writable stream,
using its {{WritableStream/abort()}} method. This indicates that the producer believes something has
gone wrong, and that future writes should be discontinued. It puts the stream in an errored state,
even without a signal from the [=underlying sink=], and it discards all writes in the stream's
[=internal queue=].
<h3 id="ts-model">Transform streams</h3>
A <dfn export>transform stream</dfn> consists of a pair of streams: a [=writable stream=], known as
its <dfn export>writable side</dfn>, and a [=readable stream=], known as its <dfn export>readable
side</dfn>. In a manner specific to the transform stream in question, writes to the writable side
result in new data being made available for reading from the readable side.
Concretely, any object with a <code>writable</code> property and a <code>readable</code> property
can serve as a transform stream. However, the standard {{TransformStream}} class makes it much
easier to create such a pair that is properly entangled. It wraps a <dfn>transformer</dfn>, which
defines algorithms for the specific transformation to be performed. For web developer–created
streams, the implementation details of a transformer are provided by <a href="#transformer-api">an
object with certain methods and properties</a> that is passed to the {{TransformStream()}}
constructor. Other specifications might use the {{GenericTransformStream}} mixin to create classes
with the same <code>writable</code>/<code>readable</code> property pair but other custom APIs
layered on top.
An <dfn export>identity transform stream</dfn> is a type of transform stream which forwards all
[=chunks=] written to its [=writable side=] to its [=readable side=], without any changes. This can
be useful in <a href="#example-transform-identity">a variety of scenarios</a>. By default, the
{{TransformStream}} constructor will create an identity transform stream, when no
{{Transformer/transform|transform()}} method is present on the [=transformer=] object.
Some examples of potential transform streams include:
* A GZIP compressor, to which uncompressed bytes are written and from which compressed bytes are
read;
* A video decoder, to which encoded bytes are written and from which uncompressed video frames are
read;
* A text decoder, to which bytes are written and from which strings are read;
* A CSV-to-JSON converter, to which strings representing lines of a CSV file are written and from
which corresponding JavaScript objects are read.
<h3 id="pipe-chains">Pipe chains and backpressure</h3>
Streams are primarily used by <dfn>piping</dfn> them to each other. A readable stream can be piped
directly to a writable stream, using its {{ReadableStream/pipeTo()}} method, or it can be piped
through one or more transform streams first, using its {{ReadableStream/pipeThrough()}} method.
A set of streams piped together in this way is referred to as a <dfn>pipe chain</dfn>. In a pipe
chain, the <dfn>original source</dfn> is the [=underlying source=] of the first readable stream in
the chain; the <dfn>ultimate sink</dfn> is the [=underlying sink=] of the final writable stream in
the chain.
Once a pipe chain is constructed, it will propagate signals regarding how fast [=chunks=] should
flow through it. If any step in the chain cannot yet accept chunks, it propagates a signal backwards
through the pipe chain, until eventually the original source is told to stop producing chunks so
fast. This process of normalizing flow from the original source according to how fast the chain can
process chunks is called <dfn>backpressure</dfn>.
Concretely, the [=original source=] is given the
{{ReadableStreamDefaultController/desiredSize|controller.desiredSize}} (or
{{ReadableByteStreamController/desiredSize|byteController.desiredSize}}) value, and can then adjust
its rate of data flow accordingly. This value is derived from the
{{WritableStreamDefaultWriter/desiredSize|writer.desiredSize}} corresponding to the [=ultimate
sink=], which gets updated as the ultimate sink finishes writing [=chunks=]. The
{{ReadableStream/pipeTo()}} method used to construct the chain automatically ensures this
information propagates back through the [=pipe chain=].
When [=tee a readable stream|teeing=] a readable stream, the [=backpressure=] signals from its two
[=branches of a readable stream tee|branches=] will aggregate, such that if neither branch is read
from, a backpressure signal will be sent to the [=underlying source=] of the original stream.
Piping [=locks=] the readable and writable streams, preventing them from being manipulated for the
duration of the pipe operation. This allows the implementation to perform important optimizations,
such as directly shuttling data from the underlying source to the underlying sink while bypassing
many of the intermediate queues.
<h3 id="queuing-strategies">Internal queues and queuing strategies</h3>
Both readable and writable streams maintain <dfn>internal queues</dfn>, which they use for similar
purposes. In the case of a readable stream, the internal queue contains [=chunks=] that have been
enqueued by the [=underlying source=], but not yet read by the consumer. In the case of a writable
stream, the internal queue contains [=chunks=] which have been written to the stream by the
producer, but not yet processed and acknowledged by the [=underlying sink=].
A <dfn>queuing strategy</dfn> is an object that determines how a stream should signal
[=backpressure=] based on the state of its [=internal queue=]. The queuing strategy assigns a size
to each [=chunk=], and compares the total size of all chunks in the queue to a specified number,
known as the <dfn export>high water mark</dfn>. The resulting difference, high water mark minus
total size, is used to determine the <dfn lt="desired size to fill a stream's internal
queue">desired size to fill the stream's queue</dfn>.
For readable streams, an underlying source can use this desired size as a backpressure signal,
slowing down chunk generation so as to try to keep the desired size above or at zero. For writable
streams, a producer can behave similarly, avoiding writes that would cause the desired size to go
negative.
<a href="#qs-api">Concretely</a>, a queuing strategy for web developer–created streams is given by
any JavaScript object with a {{QueuingStrategy/highWaterMark}} property. For byte streams the
{{QueuingStrategy/highWaterMark}} always has units of bytes. For other streams the default unit is
[=chunks=], but a {{QueuingStrategy/size|size()}} function can be included in the strategy object
which returns the size for a given chunk. This permits the {{QueuingStrategy/highWaterMark}} to be
specified in arbitrary floating-point units.
<!-- TODO: https://github.com/whatwg/streams/issues/427 -->
<div class="example" id="example-simple-queuing-strategy">
A simple example of a queuing strategy would be one that assigns a size of one to each chunk, and
has a high water mark of three. This would mean that up to three chunks could be enqueued in a
readable stream, or three chunks written to a writable stream, before the streams are considered to
be applying backpressure.
In JavaScript, such a strategy could be written manually as <code highlight="js">{ highWaterMark:
3, size() { return 1; }}</code>, or using the built-in {{CountQueuingStrategy}} class, as <code
highlight="js">new CountQueuingStrategy({ highWaterMark: 3 })</code>.
</div>
<h3 id="locking">Locking</h3>
A <dfn lt="reader|readable stream reader">readable stream reader</dfn>, or simply reader, is an
object that allows direct reading of [=chunks=] from a [=readable stream=]. Without a reader, a
[=consumer=] can only perform high-level operations on the readable stream: [=cancel a readable
stream|canceling=] the stream, or [=piping=] the readable stream to a writable stream. A reader is
acquired via the stream's {{ReadableStream/getReader()}} method.
A [=readable byte stream=] has the ability to vend two types of readers: <dfn export lt="default
reader">default readers</dfn> and <dfn export lt="BYOB reader">BYOB readers</dfn>. BYOB ("bring your
own buffer") readers allow reading into a developer-supplied buffer, thus minimizing copies. A
non-byte readable stream can only vend default readers. Default readers are instances of the
{{ReadableStreamDefaultReader}} class, while BYOB readers are instances of
{{ReadableStreamBYOBReader}}.
Similarly, a <dfn lt="writer|writable stream writer">writable stream writer</dfn>, or simply
writer, is an object that allows direct writing of [=chunks=] to a [=writable stream=]. Without a
writer, a [=producer=] can only perform the high-level operations of [=abort a writable
stream|aborting=] the stream or [=piping=] a readable stream to the writable stream. Writers are
represented by the {{WritableStreamDefaultWriter}} class.
<p class="note">Under the covers, these high-level operations actually use a reader or writer
themselves.</p>
A given readable or writable stream only has at most one reader or writer at a time. We say in this
case the stream is <dfn lt="lock|locked to a reader|locked to a writer">locked</dfn>, and that the
reader or writer is <dfn lt="active|active reader|active writer">active</dfn>. This state can be
determined using the {{ReadableStream/locked|readableStream.locked}} or
{{WritableStream/locked|writableStream.locked}} properties.
A reader or writer also has the capability to <dfn lt="release a lock|release a read lock|release a
write lock">release its lock</dfn>, which makes it no longer active, and allows further readers or
writers to be acquired. This is done via the
{{ReadableStreamDefaultReader/releaseLock()|defaultReader.releaseLock()}},
{{ReadableStreamBYOBReader/releaseLock()|byobReader.releaseLock()}}, or
{{WritableStreamDefaultWriter/releaseLock()|writer.releaseLock()}} method, as appropriate.
<h2 id="conventions">Conventions</h2>
This specification depends on the Infra Standard. [[!INFRA]]
This specification uses the [=abstract operation=] concept from the JavaScript specification for its
internal algorithms. This includes treating their return values as [=completion records=], and the
use of ! and ? prefixes for unwrapping those completion records. [[!ECMASCRIPT]]
This specification also uses the [=internal slot=] concept and notation from the JavaScript
specification. (Although, the internal slots are on Web IDL [=platform objects=] instead of on
JavaScript objects.)
<p class="note">The reasons for the usage of these foreign JavaScript specification conventions are
largely historical. We urge you to avoid following our example when writing your own web
specifications.
In this specification, all numbers are represented as double-precision 64-bit IEEE 754 floating
point values (like the JavaScript [=Number type=] or Web IDL {{unrestricted double}} type), and all
arithmetic operations performed on them must be done in the standard way for such values. This is
particularly important for the data structure described in [[#queue-with-sizes]]. [[!IEEE-754]]
<h2 id="rs">Readable streams</h2>
<h3 id="rs-intro">Using readable streams</h3>
<div class="example" id="example-basic-pipe-to">
The simplest way to consume a readable stream is to simply [=piping|pipe=] it to a [=writable
stream=]. This ensures that [=backpressure=] is respected, and any errors (either writing or
reading) are propagated through the chain:
<xmp highlight="js">
readableStream.pipeTo(writableStream)
.then(() => console.log("All data successfully written!"))
.catch(e => console.error("Something went wrong!", e));
</xmp>
</div>
<div class="example" id="example-pipe-as-chunks-receiver">
If you simply want to be alerted of each new chunk from a readable stream, you can [=piping|pipe=]
it to a new [=writable stream=] that you custom-create for that purpose:
<xmp highlight="js">
readableStream.pipeTo(new WritableStream({
write(chunk) {
console.log("Chunk received", chunk);
},
close() {
console.log("All data successfully read!");
},
abort(e) {
console.error("Something went wrong!", e);
}
}));
</xmp>
By returning promises from your {{UnderlyingSink/write|write()}} implementation, you can signal
[=backpressure=] to the readable stream.
</div>
<div class="example" id="example-manual-read">
Although readable streams will usually be used by piping them to a writable stream, you can also
read them directly by acquiring a [=/reader=] and using its <code>read()</code> method to get
successive chunks. For example, this code logs the next [=chunk=] in the stream, if available:
<xmp highlight="js">
const reader = readableStream.getReader();
reader.read().then(
({ value, done }) => {
if (done) {
console.log("The stream was already closed!");
} else {
console.log(value);
}
},
e => console.error("The stream became errored and cannot be read from!", e)
);
</xmp>
This more manual method of reading a stream is mainly useful for library authors building new
high-level operations on streams, beyond the provided ones of [=piping=] and [=tee a readable
stream|teeing=].
</div>
<div class="example" id="example-manual-read-bytes">
The above example showed using the readable stream's [=default reader=]. If the stream is a
[=readable byte stream=], you can also acquire a [=BYOB reader=] for it, which allows more
precise control over buffer allocation in order to avoid copies. For example, this code reads the
first 1024 bytes from the stream into a single memory buffer:
<xmp highlight="js">
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes: ", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
</xmp>
An important thing to note here is that the final <code>buffer</code> value is different from the
<code>startingAB</code>, but it (and all intermediate buffers) shares the same backing memory
allocation. At each step, the buffer is <a href="#transfer-array-buffer">transferred</a> to a new
{{ArrayBuffer}} object. The <code>view</code> is destructured from the return value of reading a
new {{Uint8Array}}, with that {{ArrayBuffer}} object as its <code>buffer</code> property, the
offset that bytes were written to as its <code>byteOffset</code> property, and the number of
bytes that were written as its <code>byteLength</code> property.
Note that this example is mostly educational. For practical purposes, the
{{ReadableStreamBYOBReaderReadOptions/min}} option of {{ReadableStreamBYOBReader/read()}}
provides an easier and more direct way to read an exact number of bytes:
<xmp highlight="js">
const reader = readableStream.getReader({ mode: "byob" });
const { value: view, done } = await reader.read(new Uint8Array(1024), { min: 1024 });
console.log("The first 1024 bytes: ", view);
</xmp>
</div>
<h3 id="rs-class">The {{ReadableStream}} class</h3>
The {{ReadableStream}} class is a concrete instance of the general [=readable stream=] concept. It
is adaptable to any [=chunk=] type, and maintains an internal queue to keep track of data supplied
by the [=underlying source=] but not yet read by any consumer.
<h4 id="rs-class-definition">Interface definition</h4>
The Web IDL definition for the {{ReadableStream}} class is given as follows:
<xmp class="idl">
[Exposed=*, Transferable]
interface ReadableStream {
constructor(optional object underlyingSource, optional QueuingStrategy strategy = {});
static ReadableStream from(any asyncIterable);
readonly attribute boolean locked;
Promise<undefined> cancel(optional any reason);
ReadableStreamReader getReader(optional ReadableStreamGetReaderOptions options = {});
ReadableStream pipeThrough(ReadableWritablePair transform, optional StreamPipeOptions options = {});
Promise<undefined> pipeTo(WritableStream destination, optional StreamPipeOptions options = {});
sequence<ReadableStream> tee();
async iterable<any>(optional ReadableStreamIteratorOptions options = {});
};
typedef (ReadableStreamDefaultReader or ReadableStreamBYOBReader) ReadableStreamReader;
enum ReadableStreamReaderMode { "byob" };
dictionary ReadableStreamGetReaderOptions {
ReadableStreamReaderMode mode;
};
dictionary ReadableStreamIteratorOptions {
boolean preventCancel = false;
};
dictionary ReadableWritablePair {
required ReadableStream readable;
required WritableStream writable;
};
dictionary StreamPipeOptions {
boolean preventClose = false;
boolean preventAbort = false;
boolean preventCancel = false;
AbortSignal signal;
};
</xmp>
<h4 id="rs-internal-slots">Internal slots</h4>
Instances of {{ReadableStream}} are created with the internal slots described in the following
table:
<table dfn-for="ReadableStream">
<thead>
<tr>
<th>Internal Slot
<th>Description (<em>non-normative</em>)
<tbody>
<tr>
<td><dfn>\[[controller]]</dfn>
<td class="non-normative">A {{ReadableStreamDefaultController}} or
{{ReadableByteStreamController}} created with the ability to control the state and queue of this
stream
<tr>
<td><dfn export>\[[Detached]]</dfn>
<td class="non-normative">A boolean flag set to true when the stream is transferred
<tr>
<td><dfn>\[[disturbed]]</dfn>
<td class="non-normative">A boolean flag set to true when the stream has been read from or
canceled
<tr>
<td><dfn>\[[reader]]</dfn>
<td class="non-normative">A {{ReadableStreamDefaultReader}} or {{ReadableStreamBYOBReader}}
instance, if the stream is [=locked to a reader=], or undefined if it is not
<tr>
<td><dfn>\[[state]]</dfn>
<td class="non-normative">A string containing the stream's current state, used internally; one
of "<code>readable</code>", "<code>closed</code>", or "<code>errored</code>"
<tr>
<td><dfn>\[[storedError]]</dfn>
<td class="non-normative">A value indicating how the stream failed, to be given as a failure
reason or exception when trying to operate on an errored stream
</table>
<h4 id="underlying-source-api">The underlying source API</h4>
The {{ReadableStream()}} constructor accepts as its first argument a JavaScript object representing
the [=underlying source=]. Such objects can contain any of the following properties:
<xmp class="idl">
dictionary UnderlyingSource {
UnderlyingSourceStartCallback start;
UnderlyingSourcePullCallback pull;
UnderlyingSourceCancelCallback cancel;
ReadableStreamType type;
[EnforceRange] unsigned long long autoAllocateChunkSize;
};
typedef (ReadableStreamDefaultController or ReadableByteStreamController) ReadableStreamController;
callback UnderlyingSourceStartCallback = any (ReadableStreamController controller);
callback UnderlyingSourcePullCallback = Promise<undefined> (ReadableStreamController controller);
callback UnderlyingSourceCancelCallback = Promise<undefined> (optional any reason);
enum ReadableStreamType { "bytes" };
</xmp>
<dl>
<dt><dfn dict-member for="UnderlyingSource" lt="start">start(<var ignore>controller</var>)</dfn></dt>
<dd>
<p>A function that is called immediately during creation of the {{ReadableStream}}.
<p>Typically this is used to adapt a [=push source=] by setting up relevant event listeners, as
in the example of [[#example-rs-push-no-backpressure]], or to acquire access to a
[=pull source=], as in [[#example-rs-pull]].
<p>If this setup process is asynchronous, it can return a promise to signal success or failure;
a rejected promise will error the stream. Any thrown exceptions will be re-thrown by the
{{ReadableStream()}} constructor.
<dt><dfn dict-member for="UnderlyingSource" lt="pull">pull(<var ignore>controller</var>)</dfn></dt>
<dd>
<p>A function that is called whenever the stream's [=internal queue=] of chunks becomes not full,
i.e. whenever the queue's [=desired size to fill a stream's internal queue|desired size=] becomes
positive. Generally, it will be called repeatedly until the queue reaches its [=high water mark=]
(i.e. until the <a lt="desired size to fill a stream's internal queue">desired size</a> becomes
non-positive).
<p>For [=push sources=], this can be used to resume a paused flow, as in
[[#example-rs-push-backpressure]]. For [=pull sources=], it is used to acquire new [=chunks=] to
enqueue into the stream, as in [[#example-rs-pull]].
<p>This function will not be called until {{UnderlyingSource/start|start()}} successfully
completes. Additionally, it will only be called repeatedly if it enqueues at least one chunk or
fulfills a BYOB request; a no-op {{UnderlyingSource/pull|pull()}} implementation will not be
continually called.
<p>If the function returns a promise, then it will not be called again until that promise
fulfills. (If the promise rejects, the stream will become errored.) This is mainly used in the
case of pull sources, where the promise returned represents the process of acquiring a new chunk.
Throwing an exception is treated the same as returning a rejected promise.
<dt><dfn dict-member for="UnderlyingSource" lt="cancel">cancel(<var ignore>reason</var>)</dfn></dt>
<dd>
<p>A function that is called whenever the [=consumer=] [=cancel a readable stream|cancels=] the
stream, via {{ReadableStream/cancel()|stream.cancel()}} or
{{ReadableStreamGenericReader/cancel()|reader.cancel()}}. It takes as its argument the same
value as was passed to those methods by the consumer.
<p>Readable streams can additionally be canceled under certain conditions during [=piping=]; see
the definition of the {{ReadableStream/pipeTo()}} method for more details.
<p>For all streams, this is generally used to release access to the underlying resource; see for
example [[#example-rs-push-no-backpressure]].
<p>If the shutdown process is asynchronous, it can return a promise to signal success or failure;
the result will be communicated via the return value of the <code>cancel()</code> method that was
called. Throwing an exception is treated the same as returning a rejected promise.
<div class="note">
<p>Even if the cancelation process fails, the stream will still close; it will not be put into
an errored state. This is because a failure in the cancelation process doesn't matter to the
consumer's view of the stream, once they've expressed disinterest in it by canceling. The
failure is only communicated to the immediate caller of the corresponding method.
<p>This is different from the behavior of the {{UnderlyingSink/close}} and
{{UnderlyingSink/abort}} options of a {{WritableStream}}'s [=underlying sink=], which upon
failure put the corresponding {{WritableStream}} into an errored state. Those correspond to
specific actions the [=producer=] is requesting and, if those actions fail, they indicate
something more persistently wrong.
</div>
<dt><dfn dict-member for="UnderlyingSource" lt="type"><code>type</code></dfn> (byte streams
only)</dt>
<dd>
<p>Can be set to "<dfn enum-value for="ReadableStreamType">bytes</dfn>" to signal that the
constructed {{ReadableStream}} is a <a>readable byte stream</a>. This ensures that the resulting
{{ReadableStream}} will successfully be able to vend [=BYOB readers=] via its
{{ReadableStream/getReader()}} method. It also affects the |controller| argument passed to the
{{UnderlyingSource/start|start()}} and {{UnderlyingSource/pull|pull()}} methods; see below.
<p>For an example of how to set up a readable byte stream, including using the different
controller interface, see [[#example-rbs-push]].
<p>Setting any value other than "{{ReadableStreamType/bytes}}" or undefined will cause the
{{ReadableStream()}} constructor to throw an exception.
<dt><dfn dict-member for="UnderlyingSource"
lt="autoAllocateChunkSize"><code>autoAllocateChunkSize</code></dfn> (byte streams only)</dt>
<dd>
<p>Can be set to a positive integer to cause the implementation to automatically allocate buffers
for the underlying source code to write into. In this case, when a [=consumer=] is using a
[=default reader=], the stream implementation will automatically allocate an {{ArrayBuffer}} of
the given size, so that {{ReadableByteStreamController/byobRequest|controller.byobRequest}} is
always present, as if the consumer was using a [=BYOB reader=].
<p>This is generally used to cut down on the amount of code needed to handle consumers that use
default readers, as can be seen by comparing [[#example-rbs-push]] without auto-allocation to
[[#example-rbs-pull]] with auto-allocation.
</dl>
The type of the |controller| argument passed to the {{UnderlyingSource/start|start()}} and
{{UnderlyingSource/pull|pull()}} methods depends on the value of the {{UnderlyingSource/type}}
option. If {{UnderlyingSource/type}} is set to undefined (including via omission), then
|controller| will be a {{ReadableStreamDefaultController}}. If it's set to
"{{ReadableStreamType/bytes}}", then |controller| will be a {{ReadableByteStreamController}}.
<h4 id="rs-prototype">Constructor, methods, and properties</h4>
<dl class="domintro non-normative">
<dt><code><var ignore>stream</var> = new {{ReadableStream/constructor(underlyingSource, strategy)|ReadableStream}}(<var ignore>underlyingSource</var>[, <var ignore>strategy</var>])</code>
<dd>
<p>Creates a new {{ReadableStream}} wrapping the provided [=underlying source=]. See
[[#underlying-source-api]] for more details on the <var ignore>underlyingSource</var> argument.
<p>The |strategy| argument represents the stream's [=queuing strategy=], as described in
[[#qs-api]]. If it is not provided, the default behavior will be the same as a
{{CountQueuingStrategy}} with a [=high water mark=] of 1.
<dt><code><var ignore>stream</var> = {{ReadableStream/from(asyncIterable)|ReadableStream.from}}(<var ignore>asyncIterable</var>)</code>
<dd>
<p>Creates a new {{ReadableStream}} wrapping the provided [=iterable=] or [=async iterable=].
<p>This can be used to adapt various kinds of objects into a [=readable stream=], such as an
[=array=], an [=async generator=], or a <a
href="https://nodejs.org/api/stream.html#class-streamreadable">Node.js readable stream</a>.
<dt><code><var ignore>isLocked</var> = <var ignore>stream</var>.{{ReadableStream/locked}}</code>
<dd>
<p>Returns whether or not the readable stream is [=locked to a reader=].
<dt><code>await <var ignore>stream</var>.{{ReadableStream/cancel(reason)|cancel}}([ <var ignore>reason</var> ])</code>
<dd>
<p>[=cancel a readable stream|Cancels=] the stream, signaling a loss of interest in the stream by
a consumer. The supplied <var ignore>reason</var> argument will be given to the underlying
source's {{UnderlyingSource/cancel|cancel()}} method, which might or might not use it.
<p>The returned promise will fulfill if the stream shuts down successfully, or reject if the
underlying source signaled that there was an error doing so. Additionally, it will reject with a
{{TypeError}} (without attempting to cancel the stream) if the stream is currently [=locked to a
reader|locked=].
<dt><code><var ignore>reader</var> = <var ignore>stream</var>.{{ReadableStream/getReader(options)|getReader}}()</code>
<dd>
<p>Creates a {{ReadableStreamDefaultReader}} and [=locked to a reader|locks=] the stream to the
new reader. While the stream is locked, no other reader can be acquired until this one is
[=release a read lock|released=].
<p>This functionality is especially useful for creating abstractions that desire the ability to
consume a stream in its entirety. By getting a reader for the stream, you can ensure nobody else
can interleave reads with yours or cancel the stream, which would interfere with your
abstraction.
<dt><code><var ignore>reader</var> = <var ignore>stream</var>.{{ReadableStream/getReader(options)|getReader}}({ {{ReadableStreamGetReaderOptions/mode}}: "{{ReadableStreamReaderMode/byob}}" })</code>
<dd>
<p>Creates a {{ReadableStreamBYOBReader}} and [=locked to a reader|locks=] the stream to the new
reader.
<p>This call behaves the same way as the no-argument variant, except that it only works on
[=readable byte streams=], i.e. streams which were constructed specifically with the ability to
handle "bring your own buffer" reading. The returned [=BYOB reader=] provides the ability to
directly read individual [=chunks=] from the stream via its {{ReadableStreamBYOBReader/read()}}
method, into developer-supplied buffers, allowing more precise control over allocation.
<dt><code><var ignore>readable</var> = <var ignore>stream</var>.{{ReadableStream/pipeThrough(transform, options)|pipeThrough}}({ {{ReadableWritablePair/writable}}, {{ReadableWritablePair/readable}} }[, { {{StreamPipeOptions/preventClose}}, {{StreamPipeOptions/preventAbort}}, {{StreamPipeOptions/preventCancel}}, {{StreamPipeOptions/signal}} }])</code></dt>
<dd>
<p>Provides a convenient, chainable way of [=piping=] this [=readable stream=] through a
[=transform stream=] (or any other <code>{ writable, readable }</code> pair). It simply pipes the
stream into the writable side of the supplied pair, and returns the readable side for further use.
<p>Piping a stream will [=locked to a reader|lock=] it for the duration of the pipe, preventing
any other consumer from acquiring a reader.
<dt><code>await <var ignore>stream</var>.{{ReadableStream/pipeTo(destination, options)|pipeTo}}(<var ignore>destination</var>[, { {{StreamPipeOptions/preventClose}}, {{StreamPipeOptions/preventAbort}}, {{StreamPipeOptions/preventCancel}}, {{StreamPipeOptions/signal}} }])</code></dt>
<dd>
<p>[=piping|Pipes=] this [=readable stream=] to a given [=writable stream=] |destination|. The
way in which the piping process behaves under various error conditions can be customized with a
number of passed options. It returns a promise that fulfills when the piping process completes
successfully, or rejects if any errors were encountered.
Piping a stream will [=locked to a reader|lock=] it for the duration of the pipe, preventing any
other consumer from acquiring a reader.
Errors and closures of the source and destination streams propagate as follows:
* An error in this source [=readable stream=] will [=abort a writable stream|abort=]
|destination|, unless {{StreamPipeOptions/preventAbort}} is truthy. The returned promise will be
rejected with the source's error, or with any error that occurs during aborting the destination.
* An error in |destination| will [=cancel a readable stream|cancel=] this source [=readable
stream=], unless {{StreamPipeOptions/preventCancel}} is truthy. The returned promise will be
rejected with the destination's error, or with any error that occurs during canceling the
source.
* When this source [=readable stream=] closes, |destination| will be closed, unless
{{StreamPipeOptions/preventClose}} is truthy. The returned promise will be fulfilled once this
process completes, unless an error is encountered while closing the destination, in which case
it will be rejected with that error.
* If |destination| starts out closed or closing, this source [=readable stream=] will be [=cancel
a readable stream|canceled=], unless {{StreamPipeOptions/preventCancel}} is true. The returned
promise will be rejected with an error indicating piping to a closed stream failed, or with any
error that occurs during canceling the source.
<p>The {{StreamPipeOptions/signal}} option can be set to an {{AbortSignal}} to allow aborting an
ongoing pipe operation via the corresponding {{AbortController}}. In this case, this source
[=readable stream=] will be [=cancel a readable stream|canceled=], and |destination| [=abort a
writable stream|aborted=], unless the respective options {{StreamPipeOptions/preventCancel}} or
{{StreamPipeOptions/preventAbort}} are set.
<dt><code>[<var ignore>branch1</var>, <var ignore>branch2</var>] = <var ignore>stream</var>.{{ReadableStream/tee()|tee}}()</code>
<dd>
<p>[=tee a readable stream|Tees=] this readable stream, returning a two-element array containing
the two resulting branches as new {{ReadableStream}} instances.
<p>Teeing a stream will [=locked to a reader|lock=] it, preventing any other consumer from
acquiring a reader. To [=cancel a readable stream|cancel=] the stream, cancel both of the
resulting branches; a composite cancellation reason will then be propagated to the stream's
[=underlying source=].
<p>If this stream is a [=readable byte stream=], then each branch will receive its own copy of
each [=chunk=]. If not, then the chunks seen in each branch will be the same object.
If the chunks are not immutable, this could allow interference between the two branches.
</dl>
<div algorithm>
The <dfn id="rs-constructor" constructor for="ReadableStream" lt="ReadableStream(underlyingSource,
strategy)">new ReadableStream(|underlyingSource|, |strategy|)</dfn> constructor steps are:
1. If |underlyingSource| is missing, set it to null.
1. Let |underlyingSourceDict| be |underlyingSource|, [=converted to an IDL value=] of type
{{UnderlyingSource}}.
<p class="note">We cannot declare the |underlyingSource| argument as having the
{{UnderlyingSource}} type directly, because doing so would lose the reference to the original
object. We need to retain the object so we can [=invoke=] the various methods on it.
1. Perform ! [$InitializeReadableStream$]([=this=]).
1. If |underlyingSourceDict|["{{UnderlyingSource/type}}"] is "{{ReadableStreamType/bytes}}":
1. If |strategy|["{{QueuingStrategy/size}}"] [=map/exists=], throw a {{RangeError}} exception.
1. Let |highWaterMark| be ? [$ExtractHighWaterMark$](|strategy|, 0).
1. Perform ? [$SetUpReadableByteStreamControllerFromUnderlyingSource$]([=this=],
|underlyingSource|, |underlyingSourceDict|, |highWaterMark|).
1. Otherwise,
1. Assert: |underlyingSourceDict|["{{UnderlyingSource/type}}"] does not [=map/exist=].
1. Let |sizeAlgorithm| be ! [$ExtractSizeAlgorithm$](|strategy|).
1. Let |highWaterMark| be ? [$ExtractHighWaterMark$](|strategy|, 1).
1. Perform ? [$SetUpReadableStreamDefaultControllerFromUnderlyingSource$]([=this=],
|underlyingSource|, |underlyingSourceDict|, |highWaterMark|, |sizeAlgorithm|).
</div>
<div algorithm>
The static <dfn id="rs-from" method for="ReadableStream">from(|asyncIterable|)</dfn> method steps
are:
1. Return ? [$ReadableStreamFromIterable$](|asyncIterable|).
</div>
<div algorithm>
The <dfn id="rs-locked" attribute for="ReadableStream">locked</dfn> getter steps are:
1. Return ! [$IsReadableStreamLocked$]([=this=]).
</div>
<div algorithm>
The <dfn id="rs-cancel" method for="ReadableStream">cancel(|reason|)</dfn> method steps are:
1. If ! [$IsReadableStreamLocked$]([=this=]) is true, return [=a promise rejected with=] a
{{TypeError}} exception.
1. Return ! [$ReadableStreamCancel$]([=this=], |reason|).
</div>
<div algorithm>
The <dfn id="rs-get-reader" method for="ReadableStream">getReader(|options|)</dfn> method steps
are:
1. If |options|["{{ReadableStreamGetReaderOptions/mode}}"] does not [=map/exist=], return ?
[$AcquireReadableStreamDefaultReader$]([=this=]).
1. Assert: |options|["{{ReadableStreamGetReaderOptions/mode}}"] is
"{{ReadableStreamReaderMode/byob}}".
1. Return ? [$AcquireReadableStreamBYOBReader$]([=this=]).
<div class="example" id="example-read-all-chunks">
An example of an abstraction that might benefit from using a reader is a function like the
following, which is designed to read an entire readable stream into memory as an array of
[=chunks=].
<xmp highlight="js">
function readAllChunks(readableStream) {
const reader = readableStream.getReader();
const chunks = [];
return pump();
function pump() {
return reader.read().then(({ value, done }) => {
if (done) {
return chunks;
}
chunks.push(value);
return pump();
});
}
}
</xmp>
Note how the first thing it does is obtain a reader, and from then on it uses the reader
exclusively. This ensures that no other consumer can interfere with the stream, either by reading
chunks or by [=cancel a readable stream|canceling=] the stream.
</div>
</div>
<div algorithm>
The <dfn id="rs-pipe-through" method for="ReadableStream">pipeThrough(|transform|, |options|)</dfn>
method steps are:
1. If ! [$IsReadableStreamLocked$]([=this=]) is true, throw a {{TypeError}} exception.
1. If ! [$IsWritableStreamLocked$](|transform|["{{ReadableWritablePair/writable}}"]) is true, throw
a {{TypeError}} exception.
1. Let |signal| be |options|["{{StreamPipeOptions/signal}}"] if it [=map/exists=], or undefined
otherwise.
1. Let |promise| be ! [$ReadableStreamPipeTo$]([=this=],
|transform|["{{ReadableWritablePair/writable}}"],
|options|["{{StreamPipeOptions/preventClose}}"],
|options|["{{StreamPipeOptions/preventAbort}}"],
|options|["{{StreamPipeOptions/preventCancel}}"], |signal|).
1. Set |promise|.\[[PromiseIsHandled]] to true.
1. Return |transform|["{{ReadableWritablePair/readable}}"].
<div class="example" id="example-pipe-chain">
A typical example of constructing [=pipe chain=] using {{ReadableStream/pipeThrough(transform,
options)}} would look like
<xmp highlight="js">
httpResponseBody
.pipeThrough(decompressorTransform)
.pipeThrough(ignoreNonImageFilesTransform)
.pipeTo(mediaGallery);
</xmp>
</div>
</div>
<div algorithm>
The <dfn id="rs-pipe-to" method for="ReadableStream">pipeTo(|destination|, |options|)</dfn>
method steps are:
1. If ! [$IsReadableStreamLocked$]([=this=]) is true, return [=a promise rejected with=] a
{{TypeError}} exception.
1. If ! [$IsWritableStreamLocked$](|destination|) is true, return [=a promise rejected with=] a
{{TypeError}} exception.
1. Let |signal| be |options|["{{StreamPipeOptions/signal}}"] if it [=map/exists=], or undefined
otherwise.
1. Return ! [$ReadableStreamPipeTo$]([=this=], |destination|,
|options|["{{StreamPipeOptions/preventClose}}"],
|options|["{{StreamPipeOptions/preventAbort}}"],
|options|["{{StreamPipeOptions/preventCancel}}"], |signal|).
<div class="example" id="example-pipe-abortsignal">
An ongoing [=pipe=] operation can be stopped using an {{AbortSignal}}, as follows:
<xmp highlight="js">
const controller = new AbortController();
readable.pipeTo(writable, { signal: controller.signal });
// ... some time later ...
controller.abort();
</xmp>
(The above omits error handling for the promise returned by {{ReadableStream/pipeTo()}}.
Additionally, the impact of the {{StreamPipeOptions/preventAbort}} and
{{StreamPipeOptions/preventCancel}} options what happens when piping is stopped are worth
considering.)
</div>
<div class="example" id="example-pipe-switch-dest">
The above technique can be used to switch the {{ReadableStream}} being piped, while writing into
the same {{WritableStream}}:
<xmp highlight="js">
const controller = new AbortController();
const pipePromise = readable1.pipeTo(writable, { preventAbort: true, signal: controller.signal });
// ... some time later ...
controller.abort();
// Wait for the pipe to complete before starting a new one:
try {
await pipePromise;
} catch (e) {
// Swallow "AbortError" DOMExceptions as expected, but rethrow any unexpected failures.
if (e.name !== "AbortError") {
throw e;
}
}
// Start the new pipe!
readable2.pipeTo(writable);
</xmp>
</div>
</div>
<div algorithm>
The <dfn id="rs-tee" method for="ReadableStream">tee()</dfn> method steps are:
1. Return ? [$ReadableStreamTee$]([=this=], false).
<div class="example" id="example-tee-and-pipe">
Teeing a stream is most useful when you wish to let two independent consumers read from the stream
in parallel, perhaps even at different speeds. For example, given a writable stream
<code>cacheEntry</code> representing an on-disk file, and another writable stream