This repository has been archived by the owner on Jun 24, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor: Refactor to remove streams and match js reader api changes
Removes streams for massive code reduction. Entire utils gone. About a million lines from main file gone. All gone. No one is left. There all dead dave. BREAKING CHANGE: API is now a single async function. Input changed to an array of feed objects. Output changed to a string bundle.
- Loading branch information
1 parent
d6a07f0
commit 79cdef6
Showing
6 changed files
with
1,757 additions
and
1,365 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,82 +1,28 @@ | ||
'use strict'; | ||
|
||
const { Readable, PassThrough, Stream } = require('readable-stream'); | ||
const { parse } = require('JSONStream'); | ||
const mergeStream = require('merge-stream'); | ||
const { dedupe, sort, setOrder } = require('./util'); | ||
const assert = require('assert'); | ||
|
||
module.exports = class Reader extends Readable { | ||
constructor(streams) { | ||
super(); | ||
|
||
assert( | ||
streams instanceof Stream || Array.isArray(streams), | ||
`Expected first argument to new Reader() to be a stream or array of streams. | ||
Instead got ${typeof stream}` | ||
); | ||
|
||
if (!Array.isArray(streams)) { | ||
streams = [streams]; | ||
} | ||
|
||
assert( | ||
streams.every(stream => stream instanceof Stream), | ||
'Expected any/all arguments given to Reader constructor to be subclasses of Stream.' | ||
); | ||
|
||
assert( | ||
streams.length, | ||
'Expected at least one stream to be provided to new Reader(). Got none.' | ||
); | ||
|
||
const merged = mergeStream(); | ||
|
||
let count = 0; | ||
streams.forEach((readStream, index) => { | ||
const tmpStream = new PassThrough({ objectMode: true }); | ||
merged.add(tmpStream); | ||
|
||
readStream.on('file found', file => { | ||
this.emit('file found', file); | ||
readStream | ||
.pipe(parse('*')) | ||
.on('error', err => { | ||
this.emit('error', err); | ||
}) | ||
.pipe(setOrder(index)) | ||
.pipe(tmpStream); | ||
|
||
count++; | ||
if (count === streams.length) { | ||
this.emit('pipeline ready'); | ||
} | ||
}); | ||
|
||
readStream.on('file not found', file => { | ||
this.emit('file not found', file); | ||
tmpStream.end(); | ||
|
||
count++; | ||
if (count === streams.length) { | ||
this.emit('pipeline ready'); | ||
} | ||
}); | ||
}); | ||
|
||
this.data = merged.pipe(sort()).pipe(dedupe()); | ||
this.data.pause(); | ||
|
||
this.data.on('data', chunk => { | ||
this.push(`${chunk.content.trim()}\n\n`); | ||
}); | ||
|
||
this.data.on('end', () => { | ||
this.push(null); | ||
}); | ||
} | ||
|
||
_read() { | ||
return this.data.resume(); | ||
} | ||
module.exports = async function reader(feeds = []) { | ||
assert( | ||
feeds.length, | ||
`Expected at least 1 feed to be given. Instead got "${feeds.length}"` | ||
); | ||
assert( | ||
feeds.every(Array.isArray), | ||
`Expected every feed to be an array. Instead got "${feeds.join(', ')}"` | ||
); | ||
|
||
feeds = feeds.reduce((prev, current) => prev.concat(current), []); | ||
|
||
const map = new Map(); | ||
feeds.forEach(feed => { | ||
// because map preserves order, when we get a duplicate we actually | ||
// need to append to the end of the map. We do that by deleting | ||
// first and then adding to the map | ||
map.delete(feed.id); | ||
map.set(feed.id, feed); | ||
}); | ||
return Array.from(map.values()) | ||
.map(feed => feed.content.trim()) | ||
.join('\n\n'); | ||
}; |
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.