Skip to content

Commit

Permalink
Try to fix premature stream close error with pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
vgrichina committed Feb 19, 2024
1 parent d5b2675 commit db96b3b
Showing 1 changed file with 17 additions and 14 deletions.
31 changes: 17 additions & 14 deletions storage/lake/archive.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,23 @@ async function *readBlocks(dataDir, shard, startBlockNumber, endBlockNumber) {
const extract = tar.extract();
const gunzip = zlib.createGunzip();
const readStream = fs.createReadStream(inFile);
const pipelinePromise = pipeline(readStream, gunzip, extract);

for await (const entry of extract) {
// Convert entry stream into data buffer
const data = await new Promise((resolve, reject) => {
const chunks = [];
entry.on('data', (chunk) => chunks.push(chunk));
entry.on('end', () => resolve(Buffer.concat(chunks)));
entry.on('error', reject);
});

const blockHeight = parseInt(entry.header.name.replace('.json', ''), 10);
yield { data, blockHeight };
}
const pipelinePromise = pipeline(readStream, gunzip, extract, async function *(extract, { signal }) {
for await (const entry of extract) {
if (signal.aborted) {
return;
}

const data = await new Promise((resolve, reject) => {
const chunks = [];
entry.on('data', (chunk) => chunks.push(chunk));
entry.on('end', () => resolve(Buffer.concat(chunks)));
entry.on('error', reject);
});

const blockHeight = parseInt(entry.header.name.replace('.json', ''), 10);
yield { data, blockHeight };
}
});

await pipelinePromise;
}
Expand Down

0 comments on commit db96b3b

Please sign in to comment.