diff --git a/package.json b/package.json index 5c95f538a00e1..97062e04d8d1b 100644 --- a/package.json +++ b/package.json @@ -145,6 +145,7 @@ "@kbn/test-subj-selector": "0.2.1", "@kbn/ui-framework": "1.0.0", "@kbn/ui-shared-deps": "1.0.0", + "@types/yauzl": "^2.9.1", "JSONStream": "1.3.5", "abortcontroller-polyfill": "^1.4.0", "accept": "3.0.2", diff --git a/x-pack/plugins/ingest_manager/server/services/epm/registry/cache.ts b/x-pack/plugins/ingest_manager/server/services/epm/registry/cache.ts index af11bc7f6c831..e9c8317a6251d 100644 --- a/x-pack/plugins/ingest_manager/server/services/epm/registry/cache.ts +++ b/x-pack/plugins/ingest_manager/server/services/epm/registry/cache.ts @@ -3,6 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ +import { pkgToPkgKey } from './index'; const cache: Map = new Map(); export const cacheGet = (key: string) => cache.get(key); @@ -10,4 +11,10 @@ export const cacheSet = (key: string, value: Buffer) => cache.set(key, value); export const cacheHas = (key: string) => cache.has(key); export const cacheClear = () => cache.clear(); export const cacheDelete = (key: string) => cache.delete(key); -export const getCacheKey = (key: string) => key + '.tar.gz'; + +const archiveLocationCache: Map = new Map(); +export const getArchiveLocation = (name: string, version: string) => + archiveLocationCache.get(pkgToPkgKey({ name, version })); + +export const setArchiveLocation = (name: string, version: string, location: string) => + archiveLocationCache.set(pkgToPkgKey({ name, version }), location); diff --git a/x-pack/plugins/ingest_manager/server/services/epm/registry/extract.ts b/x-pack/plugins/ingest_manager/server/services/epm/registry/extract.ts index 1f708c5edbcc7..6d029b54a6317 100644 --- a/x-pack/plugins/ingest_manager/server/services/epm/registry/extract.ts +++ b/x-pack/plugins/ingest_manager/server/services/epm/registry/extract.ts @@ -5,6 +5,7 @@ */ import tar from 'tar'; +import yauzl from 'yauzl'; import { bufferToStream, streamToBuffer } from './streams'; export interface ArchiveEntry { @@ -30,3 +31,40 @@ export async function untarBuffer( deflatedStream.pipe(inflateStream); }); } + +export async function unzipBuffer( + buffer: Buffer, + filter = (entry: ArchiveEntry): boolean => true, + onEntry = (entry: ArchiveEntry): void => {} +): Promise { + const zipfile = await yauzlFromBuffer(buffer, { lazyEntries: true }); + zipfile.readEntry(); + zipfile.on('entry', async (entry: yauzl.Entry) => { + const path = entry.fileName; + if (!filter({ path })) return zipfile.readEntry(); + + const entryBuffer = await getZipReadStream(zipfile, entry).then(streamToBuffer); + onEntry({ buffer: entryBuffer, path }); + zipfile.readEntry(); + }); + return new Promise((resolve, reject) => zipfile.on('end', resolve).on('error', reject)); +} + +function yauzlFromBuffer(buffer: Buffer, opts: yauzl.Options): Promise { + return new Promise((resolve, reject) => + yauzl.fromBuffer(buffer, opts, (err?: Error, handle?: yauzl.ZipFile) => + err ? reject(err) : resolve(handle) + ) + ); +} + +function getZipReadStream( + zipfile: yauzl.ZipFile, + entry: yauzl.Entry +): Promise { + return new Promise((resolve, reject) => + zipfile.openReadStream(entry, (err?: Error, readStream?: NodeJS.ReadableStream) => + err ? reject(err) : resolve(readStream) + ) + ); +} diff --git a/x-pack/plugins/ingest_manager/server/services/epm/registry/index.test.ts b/x-pack/plugins/ingest_manager/server/services/epm/registry/index.test.ts index 085dc990fa376..b40638eefbae2 100644 --- a/x-pack/plugins/ingest_manager/server/services/epm/registry/index.test.ts +++ b/x-pack/plugins/ingest_manager/server/services/epm/registry/index.test.ts @@ -5,7 +5,17 @@ */ import { AssetParts } from '../../../types'; -import { pathParts, splitPkgKey } from './index'; +import { getBufferExtractor, pathParts, splitPkgKey } from './index'; +import { getArchiveLocation } from './cache'; +import { untarBuffer, unzipBuffer } from './extract'; + +jest.mock('./cache', () => { + return { + getArchiveLocation: jest.fn(), + }; +}); + +const mockedGetArchiveLocation = getArchiveLocation as jest.Mock; const testPaths = [ { @@ -80,3 +90,21 @@ describe('splitPkgKey tests', () => { expect(pkgVersion).toBe('0.13.0-alpha.1+abcd'); }); }); + +describe('getBufferExtractor', () => { + it('throws if the archive has not been downloaded/cached yet', () => { + expect(() => getBufferExtractor('missing', '1.2.3')).toThrow('no archive location'); + }); + + it('returns unzipBuffer if the archive key ends in .zip', () => { + mockedGetArchiveLocation.mockImplementation(() => '.zip'); + const extractor = getBufferExtractor('will-use-mocked-key', 'a.b.c'); + expect(extractor).toBe(unzipBuffer); + }); + + it('returns untarBuffer if the key ends in anything else', () => { + mockedGetArchiveLocation.mockImplementation(() => 'xyz'); + const extractor = getBufferExtractor('will-use-mocked-key', 'a.b.c'); + expect(extractor).toBe(untarBuffer); + }); +}); diff --git a/x-pack/plugins/ingest_manager/server/services/epm/registry/index.ts b/x-pack/plugins/ingest_manager/server/services/epm/registry/index.ts index b635378960468..61c8cd4aabb7b 100644 --- a/x-pack/plugins/ingest_manager/server/services/epm/registry/index.ts +++ b/x-pack/plugins/ingest_manager/server/services/epm/registry/index.ts @@ -17,8 +17,8 @@ import { RegistrySearchResults, RegistrySearchResult, } from '../../../types'; -import { cacheGet, cacheSet, getCacheKey, cacheHas } from './cache'; -import { ArchiveEntry, untarBuffer } from './extract'; +import { cacheGet, cacheSet, cacheHas, getArchiveLocation, setArchiveLocation } from './cache'; +import { ArchiveEntry, untarBuffer, unzipBuffer } from './extract'; import { fetchUrl, getResponse, getResponseStream } from './requests'; import { streamToBuffer } from './streams'; import { getRegistryUrl } from './registry_url'; @@ -130,7 +130,9 @@ export async function getArchiveInfo( filter = (entry: ArchiveEntry): boolean => true ): Promise { const paths: string[] = []; - const onEntry = (entry: ArchiveEntry) => { + const archiveBuffer = await getOrFetchArchiveBuffer(pkgName, pkgVersion); + const bufferExtractor = getBufferExtractor(pkgName, pkgVersion); + await bufferExtractor(archiveBuffer, filter, (entry: ArchiveEntry) => { const { path, buffer } = entry; const { file } = pathParts(path); if (!file) return; @@ -138,9 +140,7 @@ export async function getArchiveInfo( cacheSet(path, buffer); paths.push(path); } - }; - - await extract(pkgName, pkgVersion, filter, onEntry); + }); return paths; } @@ -175,24 +175,20 @@ export function pathParts(path: string): AssetParts { } as AssetParts; } -async function extract( - pkgName: string, - pkgVersion: string, - filter = (entry: ArchiveEntry): boolean => true, - onEntry: (entry: ArchiveEntry) => void -) { - const archiveBuffer = await getOrFetchArchiveBuffer(pkgName, pkgVersion); +export function getBufferExtractor(pkgName: string, pkgVersion: string) { + const archiveLocation = getArchiveLocation(pkgName, pkgVersion); + if (!archiveLocation) throw new Error(`no archive location for ${pkgName} ${pkgVersion}`); + const isZip = archiveLocation.endsWith('.zip'); + const bufferExtractor = isZip ? unzipBuffer : untarBuffer; - return untarBuffer(archiveBuffer, filter, onEntry); + return bufferExtractor; } async function getOrFetchArchiveBuffer(pkgName: string, pkgVersion: string): Promise { - // assume .tar.gz for now. add support for .zip if/when we need it - const key = getCacheKey(`${pkgName}-${pkgVersion}`); - let buffer = cacheGet(key); + const key = getArchiveLocation(pkgName, pkgVersion); + let buffer = key && cacheGet(key); if (!buffer) { buffer = await fetchArchiveBuffer(pkgName, pkgVersion); - cacheSet(key, buffer); } if (buffer) { @@ -203,16 +199,21 @@ async function getOrFetchArchiveBuffer(pkgName: string, pkgVersion: string): Pro } export async function ensureCachedArchiveInfo(name: string, version: string) { - const pkgkey = getCacheKey(`${name}-${version}`); - if (!cacheHas(pkgkey)) { + const pkgkey = getArchiveLocation(name, version); + if (!pkgkey || !cacheHas(pkgkey)) { await getArchiveInfo(name, version); } } async function fetchArchiveBuffer(pkgName: string, pkgVersion: string): Promise { const { download: archivePath } = await fetchInfo(pkgName, pkgVersion); - const registryUrl = getRegistryUrl(); - return getResponseStream(`${registryUrl}${archivePath}`).then(streamToBuffer); + const archiveUrl = `${getRegistryUrl()}${archivePath}`; + const buffer = await getResponseStream(archiveUrl).then(streamToBuffer); + + setArchiveLocation(pkgName, pkgVersion, archivePath); + cacheSet(archivePath, buffer); + + return buffer; } export function getAsset(key: string) {