Skip to content
This repository has been archived by the owner on Mar 13, 2021. It is now read-only.

Commit

Permalink
Replace custom FixedSource with built-in Readable#from
Browse files Browse the repository at this point in the history
  • Loading branch information
Florent Biville authored and fbiville committed Apr 20, 2020
1 parent f570060 commit 5842f89
Show file tree
Hide file tree
Showing 9 changed files with 36 additions and 73 deletions.
4 changes: 0 additions & 4 deletions spec/helpers/factories.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const { Message, Headers } = require("@projectriff/message");
const MappingTransform = require("../../lib/mapping-transform");
const FixedSource = require("./fixed-source");

function tuplesToObject(headers) {
const result = {};
Expand Down Expand Up @@ -57,9 +56,6 @@ module.exports = {
data: outputFrame,
};
},
newFixedSource: (data) => {
return new FixedSource(data);
},
newMappingTransform: (fn) => {
return new MappingTransform(fn);
},
Expand Down
23 changes: 0 additions & 23 deletions spec/helpers/fixed-source.js

This file was deleted.

14 changes: 5 additions & 9 deletions spec/input-unmarshaller.errors.spec.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
const { TextEncoder } = require("util");
const {
newFixedSource,
newInputFrame,
newInputSignal,
} = require("./helpers/factories");
const { newInputFrame, newInputSignal } = require("./helpers/factories");
const InputUnmarshaller = require("../lib/input-unmarshaller");
const { finished } = require("stream");
const { finished, Readable } = require("stream");

describe("input unmarshaller =>", () => {
const textEncoder = new TextEncoder();
Expand All @@ -25,7 +21,7 @@ describe("input unmarshaller =>", () => {
let unsupportedMediaTypeInputs;

beforeEach(() => {
unsupportedMediaTypeInputs = newFixedSource([
unsupportedMediaTypeInputs = Readable.from([
newInputSignal(
newInputFrame(
0,
Expand Down Expand Up @@ -64,7 +60,7 @@ describe("input unmarshaller =>", () => {
let invalidInputs;

beforeEach(() => {
invalidInputs = newFixedSource([
invalidInputs = Readable.from([
newInputSignal(
newInputFrame(
0,
Expand Down Expand Up @@ -107,7 +103,7 @@ describe("input unmarshaller =>", () => {
unmarshaller = new InputUnmarshaller((message) => {
throw new Error(message.payload + " ko");
});
inputs = newFixedSource([
inputs = Readable.from([
newInputSignal(
newInputFrame(
0,
Expand Down
15 changes: 6 additions & 9 deletions spec/input-unmarshaller.spec.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
const { TextEncoder } = require("util");
const {
newFixedSource,
newInputFrame,
newInputSignal,
} = require("./helpers/factories");
const { Readable } = require("stream");
const { newInputFrame, newInputSignal } = require("./helpers/factories");
const InputUnmarshaller = require("../lib/input-unmarshaller");

describe("input unmarshaller =>", () => {
Expand All @@ -26,7 +23,7 @@ describe("input unmarshaller =>", () => {
const expectedPayloadCount = expectedPayloads.length;

beforeEach(() => {
inputs = newFixedSource([
inputs = Readable.from([
newInputSignal(
newInputFrame(
0,
Expand Down Expand Up @@ -83,7 +80,7 @@ describe("input unmarshaller =>", () => {
const expectedPayloadCount = expectedPayloads.length;

beforeEach(() => {
inputs = newFixedSource([
inputs = Readable.from([
newInputSignal(
newInputFrame(
0,
Expand Down Expand Up @@ -136,7 +133,7 @@ describe("input unmarshaller =>", () => {

beforeEach(() => {
unmarshaller = new InputUnmarshaller((msg) => msg.payload.age);
inputs = newFixedSource([
inputs = Readable.from([
newInputSignal(
newInputFrame(
0,
Expand Down Expand Up @@ -189,7 +186,7 @@ describe("input unmarshaller =>", () => {
unmarshaller = new InputUnmarshaller((msg) =>
msg.headers.getValue("X-Files")
);
inputs = newFixedSource([
inputs = Readable.from([
newInputSignal(
newInputFrame(
0,
Expand Down
5 changes: 2 additions & 3 deletions spec/output-marshaller.errors.spec.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const { newFixedSource } = require("./helpers/factories");
const OutputMarshaller = require("../lib/output-marshaller");
const { finished } = require("stream");
const { finished, Readable } = require("stream");

describe("output marshaller =>", () => {
["application/json", "application/cloudevents+json"].forEach(
Expand All @@ -11,7 +10,7 @@ describe("output marshaller =>", () => {

beforeEach(() => {
marshaller = new OutputMarshaller(0, mediaType);
outputPayloadSource = newFixedSource([Symbol(42)]);
outputPayloadSource = Readable.from([Symbol(42)]);
});

afterEach(() => {
Expand Down
6 changes: 3 additions & 3 deletions spec/output-marshaller.spec.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
const { TextEncoder } = require("util");
const { Readable } = require("stream");
const outputSignalCustomEqual = require("./helpers/output-signal-custom-equality");
const OutputMarshaller = require("../lib/output-marshaller");
const {
newFixedSource,
newOutputFrame,
newOutputSignal,
newRiffHeaders,
Expand All @@ -23,7 +23,7 @@ describe("output marshaller =>", () => {
describe(`with ${mediaType} data =>`, () => {
beforeEach(() => {
jasmine.addCustomEqualityTester(outputSignalCustomEqual);
source = newFixedSource(outputPayloads);
source = Readable.from(outputPayloads);
marshaller = new OutputMarshaller(expectedIndex, mediaType);
});

Expand Down Expand Up @@ -74,7 +74,7 @@ describe("output marshaller =>", () => {
const messageHeaders = newRiffHeaders()
.addHeader("Content-Type", "text/csv", "ignored")
.addHeader("X-Custom-Header", "custom value");
source = newFixedSource([newRiffMessage(messageHeaders, payload)]);
source = Readable.from([newRiffMessage(messageHeaders, payload)]);
marshaller = new OutputMarshaller(0, mediaType);
});

Expand Down
6 changes: 3 additions & 3 deletions spec/request-reply-promoter.spec.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const { newFixedSource, newMappingTransform } = require("./helpers/factories");
const { newMappingTransform } = require("./helpers/factories");
const promoteFunction = require("../lib/request-reply-promoter");
const { PassThrough } = require("stream");
const { PassThrough, Readable } = require("stream");

describe("function promoter =>", () => {
const data = [1, 2, 4];
Expand All @@ -17,7 +17,7 @@ describe("function promoter =>", () => {
let source;

beforeEach(() => {
source = newFixedSource(data);
source = Readable.from(data);
streamingOutput = new PassThrough({ objectMode: true });
});

Expand Down
27 changes: 13 additions & 14 deletions spec/streaming-pipeline.errors.spec.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
const { TextEncoder } = require("util");
const StreamingPipeline = require("../lib/streaming-pipeline");
const { finished, PassThrough, Transform } = require("stream");
const { finished, PassThrough, Readable, Transform } = require("stream");
const {
newFixedSource,
newInputFrame,
newInputSignal,
newMappingTransform,
Expand Down Expand Up @@ -52,7 +51,7 @@ describe("streaming pipeline =>", () => {

describe("with malformed input signals =>", () => {
beforeEach(() => {
fixedSource = newFixedSource(["not a signal"]);
fixedSource = Readable.from(["not a signal"]);
});

it("cancels the invocation", (done) => {
Expand Down Expand Up @@ -81,7 +80,7 @@ describe("streaming pipeline =>", () => {

describe("with an incomplete input signal =>", () => {
beforeEach(() => {
fixedSource = newFixedSource([newInputSignal(null)]);
fixedSource = Readable.from([newInputSignal(null)]);
});

it("cancels the invocation", (done) => {
Expand Down Expand Up @@ -110,7 +109,7 @@ describe("streaming pipeline =>", () => {

describe("with too many start signals =>", () => {
beforeEach(() => {
fixedSource = newFixedSource([
fixedSource = Readable.from([
newStartSignal(
newStartFrame(["text/plain"], ["input"], ["output"])
),
Expand Down Expand Up @@ -152,7 +151,7 @@ describe("streaming pipeline =>", () => {

describe("with a start signal with too many output content types =>", () => {
beforeEach(() => {
fixedSource = newFixedSource([
fixedSource = Readable.from([
newStartSignal(
newStartFrame(
["text/plain", "text/sgml", "text/yaml"],
Expand Down Expand Up @@ -189,7 +188,7 @@ describe("streaming pipeline =>", () => {

describe("with no start signal to start with =>", () => {
beforeEach(() => {
fixedSource = newFixedSource([
fixedSource = Readable.from([
newInputSignal(
newInputFrame(
42,
Expand Down Expand Up @@ -236,7 +235,7 @@ describe("streaming pipeline =>", () => {
userFunction,
destinationStream
);
fixedSource = newFixedSource([
fixedSource = Readable.from([
newStartSignal(
newStartFrame(["text/zglorbf"], ["in"], ["out"])
),
Expand Down Expand Up @@ -282,7 +281,7 @@ describe("streaming pipeline =>", () => {
userFunction,
destinationStream
);
fixedSource = newFixedSource([
fixedSource = Readable.from([
newStartSignal(newStartFrame(["text/plain"], ["in"], ["out"])),
newInputSignal(
newInputFrame(
Expand Down Expand Up @@ -327,7 +326,7 @@ describe("streaming pipeline =>", () => {
userFunction,
destinationStream
);
fixedSource = newFixedSource([
fixedSource = Readable.from([
newStartSignal(newStartFrame(["text/plain"], ["in"], ["out"])),
]);
});
Expand Down Expand Up @@ -368,7 +367,7 @@ describe("streaming pipeline =>", () => {
userFunction,
destinationStream
);
fixedSource = newFixedSource([
fixedSource = Readable.from([
newStartSignal(newStartFrame(["text/plain"], ["in"], ["out"])),
newInputSignal(
newInputFrame(
Expand Down Expand Up @@ -417,7 +416,7 @@ describe("streaming pipeline =>", () => {
userFunction,
destinationStream
);
fixedSource = newFixedSource([
fixedSource = Readable.from([
newStartSignal(newStartFrame(["text/plain"], ["in"], ["out"])),
newInputSignal(
newInputFrame(
Expand Down Expand Up @@ -463,7 +462,7 @@ describe("streaming pipeline =>", () => {
userFunction,
destinationStream
);
fixedSource = newFixedSource([
fixedSource = Readable.from([
newStartSignal(newStartFrame(["text/plain"], ["in"], ["out"])),
newInputSignal(
newInputFrame(
Expand Down Expand Up @@ -549,7 +548,7 @@ describe("streaming pipeline =>", () => {
userFunction,
destinationStream
);
fixedSource = newFixedSource([
fixedSource = Readable.from([
newStartSignal(
newStartFrame(["text/plain"], ["in1", "in2"], ["out"])
),
Expand Down
9 changes: 4 additions & 5 deletions spec/streaming-pipeline.spec.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
const { TextEncoder } = require("util");
const StreamingPipeline = require("../lib/streaming-pipeline");
const { PassThrough } = require("stream");
const { PassThrough, Readable } = require("stream");
const {
newFixedSource,
newInputFrame,
newInputSignal,
newMappingTransform,
Expand Down Expand Up @@ -45,7 +44,7 @@ describe("streaming pipeline =>", () => {

describe("with valid input signals =>", () => {
beforeEach(() => {
fixedSource = newFixedSource([
fixedSource = Readable.from([
newStartSignal(
newStartFrame(["text/plain"], ["input1"], ["output1"])
),
Expand Down Expand Up @@ -92,7 +91,7 @@ describe("streaming pipeline =>", () => {

describe("with a closed input stream =>", () => {
beforeEach(() => {
fixedSource = newFixedSource([
fixedSource = Readable.from([
newStartSignal(newStartFrame([], ["ignored"], [])),
]);
});
Expand Down Expand Up @@ -125,7 +124,7 @@ describe("streaming pipeline =>", () => {
describe("with an immediately closing output stream =>", () => {
const data = ["1", "4", "9"];
beforeEach(() => {
fixedSource = newFixedSource([
fixedSource = Readable.from([
newStartSignal(
newStartFrame(
["text/plain", "text/plain"],
Expand Down

0 comments on commit 5842f89

Please sign in to comment.