Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ingest Manager] Support both zip & tar archives from Registry #76197

Merged
merged 7 commits into from
Sep 1, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
"@kbn/test-subj-selector": "0.2.1",
"@kbn/ui-framework": "1.0.0",
"@kbn/ui-shared-deps": "1.0.0",
"@types/yauzl": "^2.9.1",
"JSONStream": "1.3.5",
"abortcontroller-polyfill": "^1.4.0",
"accept": "3.0.2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,12 @@ const cache: Map<string, Buffer> = new Map();
export const cacheGet = (key: string) => cache.get(key);
export const cacheSet = (key: string, value: Buffer) => cache.set(key, value);
export const cacheHas = (key: string) => cache.has(key);
export const getCacheKey = (key: string) => key + '.tar.gz';

const archiveKeyCache: Map<string, string> = new Map();
const stableKey = ({ name, version }: { name: string; version: string }) => `${name}-${version}`;

export const getArchiveKey = (name: string, version: string) =>
archiveKeyCache.get(stableKey({ name, version }));

export const setArchiveKey = (name: string, version: string, location: string) =>
archiveKeyCache.set(stableKey({ name, version }), location);
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

import tar from 'tar';
import yauzl from 'yauzl';
import { bufferToStream, streamToBuffer } from './streams';

export interface ArchiveEntry {
Expand All @@ -30,3 +31,40 @@ export async function untarBuffer(
deflatedStream.pipe(inflateStream);
});
}

export async function unzipBuffer(
buffer: Buffer,
filter = (entry: ArchiveEntry): boolean => true,
onEntry = (entry: ArchiveEntry): void => {}
): Promise<void> {
const zipfile = await yauzlFromBuffer(buffer, { lazyEntries: true });
zipfile.readEntry();
zipfile.on('entry', async (entry: yauzl.Entry) => {
const path = entry.fileName;
if (!filter({ path })) return zipfile.readEntry();

const entryBuffer = await getZipReadStream(zipfile, entry).then(streamToBuffer);
onEntry({ buffer: entryBuffer, path });
zipfile.readEntry();
});
return new Promise((resolve, reject) => zipfile.on('end', resolve).on('error', reject));
}

function yauzlFromBuffer(buffer: Buffer, opts: yauzl.Options): Promise<yauzl.ZipFile> {
return new Promise((resolve, reject) =>
yauzl.fromBuffer(buffer, opts, (err?: Error, handle?: yauzl.ZipFile) =>
err ? reject(err) : resolve(handle)
)
);
}

function getZipReadStream(
zipfile: yauzl.ZipFile,
entry: yauzl.Entry
): Promise<NodeJS.ReadableStream> {
return new Promise((resolve, reject) =>
zipfile.openReadStream(entry, (err?: Error, readStream?: NodeJS.ReadableStream) =>
err ? reject(err) : resolve(readStream)
)
);
}
30 changes: 19 additions & 11 deletions x-pack/plugins/ingest_manager/server/services/epm/registry/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import {
RegistrySearchResults,
RegistrySearchResult,
} from '../../../types';
import { cacheGet, cacheSet, getCacheKey, cacheHas } from './cache';
import { ArchiveEntry, untarBuffer } from './extract';
import { cacheGet, cacheSet, cacheHas, getArchiveKey, setArchiveKey } from './cache';
import { ArchiveEntry, untarBuffer, unzipBuffer } from './extract';
import { fetchUrl, getResponse, getResponseStream } from './requests';
import { streamToBuffer } from './streams';
import { getRegistryUrl } from './registry_url';
Expand Down Expand Up @@ -182,17 +182,20 @@ async function extract(
onEntry: (entry: ArchiveEntry) => void
) {
const archiveBuffer = await getOrFetchArchiveBuffer(pkgName, pkgVersion);
const archiveKey = getArchiveKey(pkgName, pkgVersion);
// shouldn't be possible since getOrFetchArchiveBuffer -> fetchArchiveBuffer sets the key
if (!archiveKey) throw new Error('no archive key');
const isZip = archiveKey.endsWith('.zip');
const libExtract = isZip ? unzipBuffer : untarBuffer;

return untarBuffer(archiveBuffer, filter, onEntry);
return libExtract(archiveBuffer, filter, onEntry);
}

async function getOrFetchArchiveBuffer(pkgName: string, pkgVersion: string): Promise<Buffer> {
// assume .tar.gz for now. add support for .zip if/when we need it
const key = getCacheKey(`${pkgName}-${pkgVersion}`);
let buffer = cacheGet(key);
const key = getArchiveKey(pkgName, pkgVersion);
let buffer = key && cacheGet(key);
if (!buffer) {
buffer = await fetchArchiveBuffer(pkgName, pkgVersion);
cacheSet(key, buffer);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved the caching from this getOrFetch function to inside the fetchArchiveBuffer

}

if (buffer) {
Expand All @@ -203,16 +206,21 @@ async function getOrFetchArchiveBuffer(pkgName: string, pkgVersion: string): Pro
}

export async function ensureCachedArchiveInfo(name: string, version: string) {
const pkgkey = getCacheKey(`${name}-${version}`);
if (!cacheHas(pkgkey)) {
const pkgkey = getArchiveKey(name, version);
if (!pkgkey || !cacheHas(pkgkey)) {
await getArchiveInfo(name, version);
}
}

async function fetchArchiveBuffer(pkgName: string, pkgVersion: string): Promise<Buffer> {
const { download: archivePath } = await fetchInfo(pkgName, pkgVersion);
const registryUrl = getRegistryUrl();
return getResponseStream(`${registryUrl}${archivePath}`).then(streamToBuffer);
const archiveUrl = `${getRegistryUrl()}${archivePath}`;
const buffer = await getResponseStream(archiveUrl).then(streamToBuffer);

setArchiveKey(pkgName, pkgVersion, archivePath);
cacheSet(archivePath, buffer);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ensure we always cache a fetched archive buffer

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this cache invalidated if the current package is removed? This is mainly important for package development.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ruflin No, but that's the same behavior as now. I'm not sure if this means it is an issue or if it's being avoided/worked around in some other way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@skh @neptunian I thought one of you solve this issue recently or at least debugged it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is #68890 -- it's still open because I misunderstood the original description and tested the wrong things, and then it got moved back in the queue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@skh Thanks for the link. This basically means we can move forward with what we have in this PR and will follow up later in #68890


return buffer;
}

export function getAsset(key: string) {
Expand Down