Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ingest Manager] Support both zip & tar archives from Registry #76197

Merged
merged 7 commits into from
Sep 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@
"@kbn/test-subj-selector": "0.2.1",
"@kbn/ui-framework": "1.0.0",
"@kbn/ui-shared-deps": "1.0.0",
"@types/yauzl": "^2.9.1",
"JSONStream": "1.3.5",
"abortcontroller-polyfill": "^1.4.0",
"accept": "3.0.2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,18 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { pkgToPkgKey } from './index';

const cache: Map<string, Buffer> = new Map();
export const cacheGet = (key: string) => cache.get(key);
export const cacheSet = (key: string, value: Buffer) => cache.set(key, value);
export const cacheHas = (key: string) => cache.has(key);
export const cacheClear = () => cache.clear();
export const cacheDelete = (key: string) => cache.delete(key);
export const getCacheKey = (key: string) => key + '.tar.gz';

const archiveLocationCache: Map<string, string> = new Map();
export const getArchiveLocation = (name: string, version: string) =>
archiveLocationCache.get(pkgToPkgKey({ name, version }));

export const setArchiveLocation = (name: string, version: string, location: string) =>
archiveLocationCache.set(pkgToPkgKey({ name, version }), location);
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

import tar from 'tar';
import yauzl from 'yauzl';
import { bufferToStream, streamToBuffer } from './streams';

export interface ArchiveEntry {
Expand All @@ -30,3 +31,40 @@ export async function untarBuffer(
deflatedStream.pipe(inflateStream);
});
}

export async function unzipBuffer(
buffer: Buffer,
filter = (entry: ArchiveEntry): boolean => true,
onEntry = (entry: ArchiveEntry): void => {}
): Promise<void> {
const zipfile = await yauzlFromBuffer(buffer, { lazyEntries: true });
zipfile.readEntry();
zipfile.on('entry', async (entry: yauzl.Entry) => {
const path = entry.fileName;
if (!filter({ path })) return zipfile.readEntry();

const entryBuffer = await getZipReadStream(zipfile, entry).then(streamToBuffer);
onEntry({ buffer: entryBuffer, path });
zipfile.readEntry();
});
return new Promise((resolve, reject) => zipfile.on('end', resolve).on('error', reject));
}

function yauzlFromBuffer(buffer: Buffer, opts: yauzl.Options): Promise<yauzl.ZipFile> {
return new Promise((resolve, reject) =>
yauzl.fromBuffer(buffer, opts, (err?: Error, handle?: yauzl.ZipFile) =>
err ? reject(err) : resolve(handle)
)
);
}

function getZipReadStream(
zipfile: yauzl.ZipFile,
entry: yauzl.Entry
): Promise<NodeJS.ReadableStream> {
return new Promise((resolve, reject) =>
zipfile.openReadStream(entry, (err?: Error, readStream?: NodeJS.ReadableStream) =>
err ? reject(err) : resolve(readStream)
)
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,17 @@
*/

import { AssetParts } from '../../../types';
import { pathParts, splitPkgKey } from './index';
import { getBufferExtractor, pathParts, splitPkgKey } from './index';
import { getArchiveLocation } from './cache';
import { untarBuffer, unzipBuffer } from './extract';

jest.mock('./cache', () => {
return {
getArchiveLocation: jest.fn(),
};
});

const mockedGetArchiveLocation = getArchiveLocation as jest.Mock;

const testPaths = [
{
Expand Down Expand Up @@ -80,3 +90,21 @@ describe('splitPkgKey tests', () => {
expect(pkgVersion).toBe('0.13.0-alpha.1+abcd');
});
});

describe('getBufferExtractor', () => {
it('throws if the archive has not been downloaded/cached yet', () => {
expect(() => getBufferExtractor('missing', '1.2.3')).toThrow('no archive location');
});

it('returns unzipBuffer if the archive key ends in .zip', () => {
mockedGetArchiveLocation.mockImplementation(() => '.zip');
const extractor = getBufferExtractor('will-use-mocked-key', 'a.b.c');
expect(extractor).toBe(unzipBuffer);
});

it('returns untarBuffer if the key ends in anything else', () => {
mockedGetArchiveLocation.mockImplementation(() => 'xyz');
const extractor = getBufferExtractor('will-use-mocked-key', 'a.b.c');
expect(extractor).toBe(untarBuffer);
});
});
45 changes: 23 additions & 22 deletions x-pack/plugins/ingest_manager/server/services/epm/registry/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import {
RegistrySearchResults,
RegistrySearchResult,
} from '../../../types';
import { cacheGet, cacheSet, getCacheKey, cacheHas } from './cache';
import { ArchiveEntry, untarBuffer } from './extract';
import { cacheGet, cacheSet, cacheHas, getArchiveLocation, setArchiveLocation } from './cache';
import { ArchiveEntry, untarBuffer, unzipBuffer } from './extract';
import { fetchUrl, getResponse, getResponseStream } from './requests';
import { streamToBuffer } from './streams';
import { getRegistryUrl } from './registry_url';
Expand Down Expand Up @@ -130,17 +130,17 @@ export async function getArchiveInfo(
filter = (entry: ArchiveEntry): boolean => true
): Promise<string[]> {
const paths: string[] = [];
const onEntry = (entry: ArchiveEntry) => {
const archiveBuffer = await getOrFetchArchiveBuffer(pkgName, pkgVersion);
const bufferExtractor = getBufferExtractor(pkgName, pkgVersion);
await bufferExtractor(archiveBuffer, filter, (entry: ArchiveEntry) => {
const { path, buffer } = entry;
const { file } = pathParts(path);
if (!file) return;
if (buffer) {
cacheSet(path, buffer);
paths.push(path);
}
};

await extract(pkgName, pkgVersion, filter, onEntry);
});

return paths;
}
Expand Down Expand Up @@ -175,24 +175,20 @@ export function pathParts(path: string): AssetParts {
} as AssetParts;
}

async function extract(
pkgName: string,
pkgVersion: string,
filter = (entry: ArchiveEntry): boolean => true,
onEntry: (entry: ArchiveEntry) => void
) {
const archiveBuffer = await getOrFetchArchiveBuffer(pkgName, pkgVersion);
export function getBufferExtractor(pkgName: string, pkgVersion: string) {
const archiveLocation = getArchiveLocation(pkgName, pkgVersion);
if (!archiveLocation) throw new Error(`no archive location for ${pkgName} ${pkgVersion}`);
const isZip = archiveLocation.endsWith('.zip');
const bufferExtractor = isZip ? unzipBuffer : untarBuffer;

return untarBuffer(archiveBuffer, filter, onEntry);
return bufferExtractor;
}

async function getOrFetchArchiveBuffer(pkgName: string, pkgVersion: string): Promise<Buffer> {
// assume .tar.gz for now. add support for .zip if/when we need it
const key = getCacheKey(`${pkgName}-${pkgVersion}`);
let buffer = cacheGet(key);
const key = getArchiveLocation(pkgName, pkgVersion);
let buffer = key && cacheGet(key);
if (!buffer) {
buffer = await fetchArchiveBuffer(pkgName, pkgVersion);
cacheSet(key, buffer);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved the caching from this getOrFetch function to inside the fetchArchiveBuffer

}

if (buffer) {
Expand All @@ -203,16 +199,21 @@ async function getOrFetchArchiveBuffer(pkgName: string, pkgVersion: string): Pro
}

export async function ensureCachedArchiveInfo(name: string, version: string) {
const pkgkey = getCacheKey(`${name}-${version}`);
if (!cacheHas(pkgkey)) {
const pkgkey = getArchiveLocation(name, version);
if (!pkgkey || !cacheHas(pkgkey)) {
await getArchiveInfo(name, version);
}
}

async function fetchArchiveBuffer(pkgName: string, pkgVersion: string): Promise<Buffer> {
const { download: archivePath } = await fetchInfo(pkgName, pkgVersion);
const registryUrl = getRegistryUrl();
return getResponseStream(`${registryUrl}${archivePath}`).then(streamToBuffer);
const archiveUrl = `${getRegistryUrl()}${archivePath}`;
const buffer = await getResponseStream(archiveUrl).then(streamToBuffer);

setArchiveLocation(pkgName, pkgVersion, archivePath);
cacheSet(archivePath, buffer);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ensure we always cache a fetched archive buffer

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this cache invalidated if the current package is removed? This is mainly important for package development.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ruflin No, but that's the same behavior as now. I'm not sure if this means it is an issue or if it's being avoided/worked around in some other way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@skh @neptunian I thought one of you solve this issue recently or at least debugged it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is #68890 -- it's still open because I misunderstood the original description and tested the wrong things, and then it got moved back in the queue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@skh Thanks for the link. This basically means we can move forward with what we have in this PR and will follow up later in #68890


return buffer;
}

export function getAsset(key: string) {
Expand Down