diff --git a/storage/lake/archive.js b/storage/lake/archive.js index a0a368f..1855a4d 100644 --- a/storage/lake/archive.js +++ b/storage/lake/archive.js @@ -24,20 +24,23 @@ async function *readBlocks(dataDir, shard, startBlockNumber, endBlockNumber) { const extract = tar.extract(); const gunzip = zlib.createGunzip(); const readStream = fs.createReadStream(inFile); - const pipelinePromise = pipeline(readStream, gunzip, extract); - - for await (const entry of extract) { - // Convert entry stream into data buffer - const data = await new Promise((resolve, reject) => { - const chunks = []; - entry.on('data', (chunk) => chunks.push(chunk)); - entry.on('end', () => resolve(Buffer.concat(chunks))); - entry.on('error', reject); - }); - - const blockHeight = parseInt(entry.header.name.replace('.json', ''), 10); - yield { data, blockHeight }; - } + const pipelinePromise = pipeline(readStream, gunzip, extract, async function *(extract, { signal }) { + for await (const entry of extract) { + if (signal.aborted) { + return; + } + + const data = await new Promise((resolve, reject) => { + const chunks = []; + entry.on('data', (chunk) => chunks.push(chunk)); + entry.on('end', () => resolve(Buffer.concat(chunks))); + entry.on('error', reject); + }); + + const blockHeight = parseInt(entry.header.name.replace('.json', ''), 10); + yield { data, blockHeight }; + } + }); await pipelinePromise; }