Skip to content

Commit

Permalink
[Ingest Manager] Support both zip & tar archives from Registry (#76197)
Browse files Browse the repository at this point in the history
* Quick pass at restoring support for both zip & tar

Restored unzip functions from #43764

Persist the `download` value returned by EPR (e.g. `/epr/system/system-0.5.3.zip` or  `/epr/system/system-0.5.3.tar.gz`) as "archive key" for a package name/version combo.

The same name&version should return the same archive. The value initially given by the registry.

Based on that value, we decide which decompression to use.

* Use template literal vs JSON.stringify for keygen

* Factor unzip/untar logic out to getBufferExtractor

 * Add tests for getBufferExtractor
 * Replace `[aA]rchiveKey*` with `[aA]rchiveLocation*`

* Include given name & version in error message

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
  • Loading branch information
John Schulz and elasticmachine authored Sep 1, 2020
1 parent 030d5e1 commit 9a7c418
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 24 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@
"@kbn/test-subj-selector": "0.2.1",
"@kbn/ui-framework": "1.0.0",
"@kbn/ui-shared-deps": "1.0.0",
"@types/yauzl": "^2.9.1",
"JSONStream": "1.3.5",
"abortcontroller-polyfill": "^1.4.0",
"accept": "3.0.2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,18 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { pkgToPkgKey } from './index';

const cache: Map<string, Buffer> = new Map();
export const cacheGet = (key: string) => cache.get(key);
export const cacheSet = (key: string, value: Buffer) => cache.set(key, value);
export const cacheHas = (key: string) => cache.has(key);
export const cacheClear = () => cache.clear();
export const cacheDelete = (key: string) => cache.delete(key);
export const getCacheKey = (key: string) => key + '.tar.gz';

const archiveLocationCache: Map<string, string> = new Map();
export const getArchiveLocation = (name: string, version: string) =>
archiveLocationCache.get(pkgToPkgKey({ name, version }));

export const setArchiveLocation = (name: string, version: string, location: string) =>
archiveLocationCache.set(pkgToPkgKey({ name, version }), location);
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

import tar from 'tar';
import yauzl from 'yauzl';
import { bufferToStream, streamToBuffer } from './streams';

export interface ArchiveEntry {
Expand All @@ -30,3 +31,40 @@ export async function untarBuffer(
deflatedStream.pipe(inflateStream);
});
}

export async function unzipBuffer(
buffer: Buffer,
filter = (entry: ArchiveEntry): boolean => true,
onEntry = (entry: ArchiveEntry): void => {}
): Promise<void> {
const zipfile = await yauzlFromBuffer(buffer, { lazyEntries: true });
zipfile.readEntry();
zipfile.on('entry', async (entry: yauzl.Entry) => {
const path = entry.fileName;
if (!filter({ path })) return zipfile.readEntry();

const entryBuffer = await getZipReadStream(zipfile, entry).then(streamToBuffer);
onEntry({ buffer: entryBuffer, path });
zipfile.readEntry();
});
return new Promise((resolve, reject) => zipfile.on('end', resolve).on('error', reject));
}

function yauzlFromBuffer(buffer: Buffer, opts: yauzl.Options): Promise<yauzl.ZipFile> {
return new Promise((resolve, reject) =>
yauzl.fromBuffer(buffer, opts, (err?: Error, handle?: yauzl.ZipFile) =>
err ? reject(err) : resolve(handle)
)
);
}

function getZipReadStream(
zipfile: yauzl.ZipFile,
entry: yauzl.Entry
): Promise<NodeJS.ReadableStream> {
return new Promise((resolve, reject) =>
zipfile.openReadStream(entry, (err?: Error, readStream?: NodeJS.ReadableStream) =>
err ? reject(err) : resolve(readStream)
)
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,17 @@
*/

import { AssetParts } from '../../../types';
import { pathParts, splitPkgKey } from './index';
import { getBufferExtractor, pathParts, splitPkgKey } from './index';
import { getArchiveLocation } from './cache';
import { untarBuffer, unzipBuffer } from './extract';

jest.mock('./cache', () => {
return {
getArchiveLocation: jest.fn(),
};
});

const mockedGetArchiveLocation = getArchiveLocation as jest.Mock;

const testPaths = [
{
Expand Down Expand Up @@ -80,3 +90,21 @@ describe('splitPkgKey tests', () => {
expect(pkgVersion).toBe('0.13.0-alpha.1+abcd');
});
});

describe('getBufferExtractor', () => {
it('throws if the archive has not been downloaded/cached yet', () => {
expect(() => getBufferExtractor('missing', '1.2.3')).toThrow('no archive location');
});

it('returns unzipBuffer if the archive key ends in .zip', () => {
mockedGetArchiveLocation.mockImplementation(() => '.zip');
const extractor = getBufferExtractor('will-use-mocked-key', 'a.b.c');
expect(extractor).toBe(unzipBuffer);
});

it('returns untarBuffer if the key ends in anything else', () => {
mockedGetArchiveLocation.mockImplementation(() => 'xyz');
const extractor = getBufferExtractor('will-use-mocked-key', 'a.b.c');
expect(extractor).toBe(untarBuffer);
});
});
45 changes: 23 additions & 22 deletions x-pack/plugins/ingest_manager/server/services/epm/registry/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import {
RegistrySearchResults,
RegistrySearchResult,
} from '../../../types';
import { cacheGet, cacheSet, getCacheKey, cacheHas } from './cache';
import { ArchiveEntry, untarBuffer } from './extract';
import { cacheGet, cacheSet, cacheHas, getArchiveLocation, setArchiveLocation } from './cache';
import { ArchiveEntry, untarBuffer, unzipBuffer } from './extract';
import { fetchUrl, getResponse, getResponseStream } from './requests';
import { streamToBuffer } from './streams';
import { getRegistryUrl } from './registry_url';
Expand Down Expand Up @@ -130,17 +130,17 @@ export async function getArchiveInfo(
filter = (entry: ArchiveEntry): boolean => true
): Promise<string[]> {
const paths: string[] = [];
const onEntry = (entry: ArchiveEntry) => {
const archiveBuffer = await getOrFetchArchiveBuffer(pkgName, pkgVersion);
const bufferExtractor = getBufferExtractor(pkgName, pkgVersion);
await bufferExtractor(archiveBuffer, filter, (entry: ArchiveEntry) => {
const { path, buffer } = entry;
const { file } = pathParts(path);
if (!file) return;
if (buffer) {
cacheSet(path, buffer);
paths.push(path);
}
};

await extract(pkgName, pkgVersion, filter, onEntry);
});

return paths;
}
Expand Down Expand Up @@ -175,24 +175,20 @@ export function pathParts(path: string): AssetParts {
} as AssetParts;
}

async function extract(
pkgName: string,
pkgVersion: string,
filter = (entry: ArchiveEntry): boolean => true,
onEntry: (entry: ArchiveEntry) => void
) {
const archiveBuffer = await getOrFetchArchiveBuffer(pkgName, pkgVersion);
export function getBufferExtractor(pkgName: string, pkgVersion: string) {
const archiveLocation = getArchiveLocation(pkgName, pkgVersion);
if (!archiveLocation) throw new Error(`no archive location for ${pkgName} ${pkgVersion}`);
const isZip = archiveLocation.endsWith('.zip');
const bufferExtractor = isZip ? unzipBuffer : untarBuffer;

return untarBuffer(archiveBuffer, filter, onEntry);
return bufferExtractor;
}

async function getOrFetchArchiveBuffer(pkgName: string, pkgVersion: string): Promise<Buffer> {
// assume .tar.gz for now. add support for .zip if/when we need it
const key = getCacheKey(`${pkgName}-${pkgVersion}`);
let buffer = cacheGet(key);
const key = getArchiveLocation(pkgName, pkgVersion);
let buffer = key && cacheGet(key);
if (!buffer) {
buffer = await fetchArchiveBuffer(pkgName, pkgVersion);
cacheSet(key, buffer);
}

if (buffer) {
Expand All @@ -203,16 +199,21 @@ async function getOrFetchArchiveBuffer(pkgName: string, pkgVersion: string): Pro
}

export async function ensureCachedArchiveInfo(name: string, version: string) {
const pkgkey = getCacheKey(`${name}-${version}`);
if (!cacheHas(pkgkey)) {
const pkgkey = getArchiveLocation(name, version);
if (!pkgkey || !cacheHas(pkgkey)) {
await getArchiveInfo(name, version);
}
}

async function fetchArchiveBuffer(pkgName: string, pkgVersion: string): Promise<Buffer> {
const { download: archivePath } = await fetchInfo(pkgName, pkgVersion);
const registryUrl = getRegistryUrl();
return getResponseStream(`${registryUrl}${archivePath}`).then(streamToBuffer);
const archiveUrl = `${getRegistryUrl()}${archivePath}`;
const buffer = await getResponseStream(archiveUrl).then(streamToBuffer);

setArchiveLocation(pkgName, pkgVersion, archivePath);
cacheSet(archivePath, buffer);

return buffer;
}

export function getAsset(key: string) {
Expand Down

0 comments on commit 9a7c418

Please sign in to comment.