Skip to content

Commit

Permalink
Use .pipe in .import if available
Browse files Browse the repository at this point in the history
Required for #71
  • Loading branch information
Tpt authored Aug 17, 2022
1 parent 736be04 commit df0af6d
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 7 deletions.
20 changes: 13 additions & 7 deletions lib/JsonLdParser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as RDF from "@rdfjs/types";
// tslint:disable-next-line:no-var-requires
const Parser = require('jsonparse');
import {ERROR_CODES, ErrorCoded, IDocumentLoader, JsonLdContext, Util as ContextUtil} from "jsonld-context-parser";
import {PassThrough, Transform} from "readable-stream";
import {PassThrough, Transform, Readable} from "readable-stream";
import {EntryHandlerArrayValue} from "./entryhandler/EntryHandlerArrayValue";
import {EntryHandlerContainer} from "./entryhandler/EntryHandlerContainer";
import {EntryHandlerInvalidFallback} from "./entryhandler/EntryHandlerInvalidFallback";
Expand Down Expand Up @@ -157,12 +157,18 @@ export class JsonLdParser extends Transform implements RDF.Sink<EventEmitter, RD
* @return {RDF.Stream} A quad stream.
*/
public import(stream: EventEmitter): RDF.Stream {
const output = new PassThrough({ readableObjectMode: true });
stream.on('error', (error) => parsed.emit('error', error));
stream.on('data', (data) => output.push(data));
stream.on('end', () => output.push(null));
const parsed = output.pipe(new JsonLdParser(this.options));
return parsed;
if('pipe' in stream) {
stream.on('error', (error) => parsed.emit('error', error));
const parsed = (<Readable>stream).pipe(new JsonLdParser(this.options));
return parsed;
} else {
const output = new PassThrough({ readableObjectMode: true });
stream.on('error', (error) => parsed.emit('error', error));
stream.on('data', (data) => output.push(data));
stream.on('end', () => output.push(null));
const parsed = output.pipe(new JsonLdParser(this.options));
return parsed;
}
}

public _transform(chunk: any, encoding: string, callback: (error?: Error | null, data?: any) => void): void {
Expand Down
25 changes: 25 additions & 0 deletions test/JsonLdParser-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {JsonLdParser} from "../index";
import arrayifyStream from 'arrayify-stream';
const streamifyString = require('streamify-string');
import * as RDF from "@rdfjs/types";
import { EventEmitter } from 'events';
import {DataFactory} from "rdf-data-factory";
import each from 'jest-each';
import "jest-rdf";
Expand Down Expand Up @@ -12445,10 +12446,34 @@ describe('JsonLdParser', () => {
]);
});


it('should parse a bad stream', async () => {
const stream = new EventEmitter();
const result = parser.import(stream);
stream.emit("data", `
{
"@id": "http://example.org/node",
"http://example.org/p": "def"
}`);
stream.emit("end");
return expect(await arrayifyStream(result)).toBeRdfIsomorphic([
DF.quad(DF.namedNode('http://example.org/node'),
DF.namedNode('http://example.org/p'),
DF.literal('def')),
]);
});

it('should forward error events', async () => {
const stream = new PassThrough();
stream._read = () => stream.emit('error', new Error('my error'));
return expect(arrayifyStream(parser.import(stream))).rejects.toThrow(new Error('my error'));
});

it('should forward error events with a bad stream', async () => {
const stream = new EventEmitter();
const result = parser.import(stream);
stream.emit('error', new Error('my error'));
return expect(arrayifyStream(result)).rejects.toThrow(new Error('my error'));
});
});
});

0 comments on commit df0af6d

Please sign in to comment.