diff --git a/spec/helpers/factories.js b/spec/helpers/factories.js index 3c649d6..c842f5d 100644 --- a/spec/helpers/factories.js +++ b/spec/helpers/factories.js @@ -1,6 +1,5 @@ const { Message, Headers } = require("@projectriff/message"); const MappingTransform = require("../../lib/mapping-transform"); -const FixedSource = require("./fixed-source"); function tuplesToObject(headers) { const result = {}; @@ -57,9 +56,6 @@ module.exports = { data: outputFrame, }; }, - newFixedSource: (data) => { - return new FixedSource(data); - }, newMappingTransform: (fn) => { return new MappingTransform(fn); }, diff --git a/spec/helpers/fixed-source.js b/spec/helpers/fixed-source.js deleted file mode 100644 index 62204e4..0000000 --- a/spec/helpers/fixed-source.js +++ /dev/null @@ -1,23 +0,0 @@ -const { Readable } = require("stream"); -const { min } = Math; - -module.exports = class FixedSource extends Readable { - constructor(values) { - super({ objectMode: true }); - this.values = values; - this.length = this.values.length; - this._index = 0; - } - - _read(size) { - if (this._index < this.length) { - const end = min(this._index + size, this.length); - for (let i = this._index; i < end; i++) { - this.push(this.values[i]); - } - this._index += end; - } else { - this.push(null); - } - } -}; diff --git a/spec/input-unmarshaller.errors.spec.js b/spec/input-unmarshaller.errors.spec.js index b0b0abe..a63ea82 100644 --- a/spec/input-unmarshaller.errors.spec.js +++ b/spec/input-unmarshaller.errors.spec.js @@ -1,11 +1,7 @@ const { TextEncoder } = require("util"); -const { - newFixedSource, - newInputFrame, - newInputSignal, -} = require("./helpers/factories"); +const { newInputFrame, newInputSignal } = require("./helpers/factories"); const InputUnmarshaller = require("../lib/input-unmarshaller"); -const { finished } = require("stream"); +const { finished, Readable } = require("stream"); describe("input unmarshaller =>", () => { const textEncoder = new TextEncoder(); @@ -25,7 +21,7 @@ describe("input unmarshaller =>", () => { let unsupportedMediaTypeInputs; beforeEach(() => { - unsupportedMediaTypeInputs = newFixedSource([ + unsupportedMediaTypeInputs = Readable.from([ newInputSignal( newInputFrame( 0, @@ -64,7 +60,7 @@ describe("input unmarshaller =>", () => { let invalidInputs; beforeEach(() => { - invalidInputs = newFixedSource([ + invalidInputs = Readable.from([ newInputSignal( newInputFrame( 0, @@ -107,7 +103,7 @@ describe("input unmarshaller =>", () => { unmarshaller = new InputUnmarshaller((message) => { throw new Error(message.payload + " ko"); }); - inputs = newFixedSource([ + inputs = Readable.from([ newInputSignal( newInputFrame( 0, diff --git a/spec/input-unmarshaller.spec.js b/spec/input-unmarshaller.spec.js index 135716c..d16a313 100644 --- a/spec/input-unmarshaller.spec.js +++ b/spec/input-unmarshaller.spec.js @@ -1,9 +1,6 @@ const { TextEncoder } = require("util"); -const { - newFixedSource, - newInputFrame, - newInputSignal, -} = require("./helpers/factories"); +const { Readable } = require("stream"); +const { newInputFrame, newInputSignal } = require("./helpers/factories"); const InputUnmarshaller = require("../lib/input-unmarshaller"); describe("input unmarshaller =>", () => { @@ -26,7 +23,7 @@ describe("input unmarshaller =>", () => { const expectedPayloadCount = expectedPayloads.length; beforeEach(() => { - inputs = newFixedSource([ + inputs = Readable.from([ newInputSignal( newInputFrame( 0, @@ -83,7 +80,7 @@ describe("input unmarshaller =>", () => { const expectedPayloadCount = expectedPayloads.length; beforeEach(() => { - inputs = newFixedSource([ + inputs = Readable.from([ newInputSignal( newInputFrame( 0, @@ -136,7 +133,7 @@ describe("input unmarshaller =>", () => { beforeEach(() => { unmarshaller = new InputUnmarshaller((msg) => msg.payload.age); - inputs = newFixedSource([ + inputs = Readable.from([ newInputSignal( newInputFrame( 0, @@ -189,7 +186,7 @@ describe("input unmarshaller =>", () => { unmarshaller = new InputUnmarshaller((msg) => msg.headers.getValue("X-Files") ); - inputs = newFixedSource([ + inputs = Readable.from([ newInputSignal( newInputFrame( 0, diff --git a/spec/output-marshaller.errors.spec.js b/spec/output-marshaller.errors.spec.js index 9417cea..d260ce3 100644 --- a/spec/output-marshaller.errors.spec.js +++ b/spec/output-marshaller.errors.spec.js @@ -1,6 +1,5 @@ -const { newFixedSource } = require("./helpers/factories"); const OutputMarshaller = require("../lib/output-marshaller"); -const { finished } = require("stream"); +const { finished, Readable } = require("stream"); describe("output marshaller =>", () => { ["application/json", "application/cloudevents+json"].forEach( @@ -11,7 +10,7 @@ describe("output marshaller =>", () => { beforeEach(() => { marshaller = new OutputMarshaller(0, mediaType); - outputPayloadSource = newFixedSource([Symbol(42)]); + outputPayloadSource = Readable.from([Symbol(42)]); }); afterEach(() => { diff --git a/spec/output-marshaller.spec.js b/spec/output-marshaller.spec.js index 5ad10cf..81de73a 100644 --- a/spec/output-marshaller.spec.js +++ b/spec/output-marshaller.spec.js @@ -1,8 +1,8 @@ const { TextEncoder } = require("util"); +const { Readable } = require("stream"); const outputSignalCustomEqual = require("./helpers/output-signal-custom-equality"); const OutputMarshaller = require("../lib/output-marshaller"); const { - newFixedSource, newOutputFrame, newOutputSignal, newRiffHeaders, @@ -23,7 +23,7 @@ describe("output marshaller =>", () => { describe(`with ${mediaType} data =>`, () => { beforeEach(() => { jasmine.addCustomEqualityTester(outputSignalCustomEqual); - source = newFixedSource(outputPayloads); + source = Readable.from(outputPayloads); marshaller = new OutputMarshaller(expectedIndex, mediaType); }); @@ -74,7 +74,7 @@ describe("output marshaller =>", () => { const messageHeaders = newRiffHeaders() .addHeader("Content-Type", "text/csv", "ignored") .addHeader("X-Custom-Header", "custom value"); - source = newFixedSource([newRiffMessage(messageHeaders, payload)]); + source = Readable.from([newRiffMessage(messageHeaders, payload)]); marshaller = new OutputMarshaller(0, mediaType); }); diff --git a/spec/request-reply-promoter.spec.js b/spec/request-reply-promoter.spec.js index b095f09..810c91f 100644 --- a/spec/request-reply-promoter.spec.js +++ b/spec/request-reply-promoter.spec.js @@ -1,6 +1,6 @@ -const { newFixedSource, newMappingTransform } = require("./helpers/factories"); +const { newMappingTransform } = require("./helpers/factories"); const promoteFunction = require("../lib/request-reply-promoter"); -const { PassThrough } = require("stream"); +const { PassThrough, Readable } = require("stream"); describe("function promoter =>", () => { const data = [1, 2, 4]; @@ -17,7 +17,7 @@ describe("function promoter =>", () => { let source; beforeEach(() => { - source = newFixedSource(data); + source = Readable.from(data); streamingOutput = new PassThrough({ objectMode: true }); }); diff --git a/spec/streaming-pipeline.errors.spec.js b/spec/streaming-pipeline.errors.spec.js index 229aa11..a21a744 100644 --- a/spec/streaming-pipeline.errors.spec.js +++ b/spec/streaming-pipeline.errors.spec.js @@ -1,8 +1,7 @@ const { TextEncoder } = require("util"); const StreamingPipeline = require("../lib/streaming-pipeline"); -const { finished, PassThrough, Transform } = require("stream"); +const { finished, PassThrough, Readable, Transform } = require("stream"); const { - newFixedSource, newInputFrame, newInputSignal, newMappingTransform, @@ -52,7 +51,7 @@ describe("streaming pipeline =>", () => { describe("with malformed input signals =>", () => { beforeEach(() => { - fixedSource = newFixedSource(["not a signal"]); + fixedSource = Readable.from(["not a signal"]); }); it("cancels the invocation", (done) => { @@ -81,7 +80,7 @@ describe("streaming pipeline =>", () => { describe("with an incomplete input signal =>", () => { beforeEach(() => { - fixedSource = newFixedSource([newInputSignal(null)]); + fixedSource = Readable.from([newInputSignal(null)]); }); it("cancels the invocation", (done) => { @@ -110,7 +109,7 @@ describe("streaming pipeline =>", () => { describe("with too many start signals =>", () => { beforeEach(() => { - fixedSource = newFixedSource([ + fixedSource = Readable.from([ newStartSignal( newStartFrame(["text/plain"], ["input"], ["output"]) ), @@ -152,7 +151,7 @@ describe("streaming pipeline =>", () => { describe("with a start signal with too many output content types =>", () => { beforeEach(() => { - fixedSource = newFixedSource([ + fixedSource = Readable.from([ newStartSignal( newStartFrame( ["text/plain", "text/sgml", "text/yaml"], @@ -189,7 +188,7 @@ describe("streaming pipeline =>", () => { describe("with no start signal to start with =>", () => { beforeEach(() => { - fixedSource = newFixedSource([ + fixedSource = Readable.from([ newInputSignal( newInputFrame( 42, @@ -236,7 +235,7 @@ describe("streaming pipeline =>", () => { userFunction, destinationStream ); - fixedSource = newFixedSource([ + fixedSource = Readable.from([ newStartSignal( newStartFrame(["text/zglorbf"], ["in"], ["out"]) ), @@ -282,7 +281,7 @@ describe("streaming pipeline =>", () => { userFunction, destinationStream ); - fixedSource = newFixedSource([ + fixedSource = Readable.from([ newStartSignal(newStartFrame(["text/plain"], ["in"], ["out"])), newInputSignal( newInputFrame( @@ -327,7 +326,7 @@ describe("streaming pipeline =>", () => { userFunction, destinationStream ); - fixedSource = newFixedSource([ + fixedSource = Readable.from([ newStartSignal(newStartFrame(["text/plain"], ["in"], ["out"])), ]); }); @@ -368,7 +367,7 @@ describe("streaming pipeline =>", () => { userFunction, destinationStream ); - fixedSource = newFixedSource([ + fixedSource = Readable.from([ newStartSignal(newStartFrame(["text/plain"], ["in"], ["out"])), newInputSignal( newInputFrame( @@ -417,7 +416,7 @@ describe("streaming pipeline =>", () => { userFunction, destinationStream ); - fixedSource = newFixedSource([ + fixedSource = Readable.from([ newStartSignal(newStartFrame(["text/plain"], ["in"], ["out"])), newInputSignal( newInputFrame( @@ -463,7 +462,7 @@ describe("streaming pipeline =>", () => { userFunction, destinationStream ); - fixedSource = newFixedSource([ + fixedSource = Readable.from([ newStartSignal(newStartFrame(["text/plain"], ["in"], ["out"])), newInputSignal( newInputFrame( @@ -549,7 +548,7 @@ describe("streaming pipeline =>", () => { userFunction, destinationStream ); - fixedSource = newFixedSource([ + fixedSource = Readable.from([ newStartSignal( newStartFrame(["text/plain"], ["in1", "in2"], ["out"]) ), diff --git a/spec/streaming-pipeline.spec.js b/spec/streaming-pipeline.spec.js index 17aa696..606acf6 100644 --- a/spec/streaming-pipeline.spec.js +++ b/spec/streaming-pipeline.spec.js @@ -1,8 +1,7 @@ const { TextEncoder } = require("util"); const StreamingPipeline = require("../lib/streaming-pipeline"); -const { PassThrough } = require("stream"); +const { PassThrough, Readable } = require("stream"); const { - newFixedSource, newInputFrame, newInputSignal, newMappingTransform, @@ -45,7 +44,7 @@ describe("streaming pipeline =>", () => { describe("with valid input signals =>", () => { beforeEach(() => { - fixedSource = newFixedSource([ + fixedSource = Readable.from([ newStartSignal( newStartFrame(["text/plain"], ["input1"], ["output1"]) ), @@ -92,7 +91,7 @@ describe("streaming pipeline =>", () => { describe("with a closed input stream =>", () => { beforeEach(() => { - fixedSource = newFixedSource([ + fixedSource = Readable.from([ newStartSignal(newStartFrame([], ["ignored"], [])), ]); }); @@ -125,7 +124,7 @@ describe("streaming pipeline =>", () => { describe("with an immediately closing output stream =>", () => { const data = ["1", "4", "9"]; beforeEach(() => { - fixedSource = newFixedSource([ + fixedSource = Readable.from([ newStartSignal( newStartFrame( ["text/plain", "text/plain"],