Skip to content

Commit

Permalink
fix(downloads): make path/saveAs work when connected remotely (#3634)
Browse files Browse the repository at this point in the history
- saveAs uses a stream internally and pipes it to the local file;
- path throws an error when called on a remote browser.
  • Loading branch information
dgozman authored Aug 26, 2020
1 parent a87614a commit 8d7ec3a
Show file tree
Hide file tree
Showing 14 changed files with 185 additions and 87 deletions.
1 change: 1 addition & 0 deletions src/client/browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export class Browser extends ChannelOwner<channels.BrowserChannel, channels.Brow
private _isClosedOrClosing = false;
private _closedPromise: Promise<void>;
readonly _browserType: BrowserType;
_isRemote = false;

static from(browser: channels.BrowserChannel): Browser {
return (browser as any)._object;
Expand Down
1 change: 1 addition & 0 deletions src/client/browserType.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ export class BrowserType extends ChannelOwner<channels.BrowserTypeChannel, chann
ws.addEventListener('open', async () => {
const browser = (await connection.waitForObjectWithKnownName('connectedBrowser')) as Browser;
browser._logger = logger;
browser._isRemote = true;
const closeListener = () => {
// Emulate all pages, contexts and the browser closing upon disconnect.
for (const context of browser.contexts()) {
Expand Down
23 changes: 22 additions & 1 deletion src/client/download.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,21 @@ import * as channels from '../protocol/channels';
import { ChannelOwner } from './channelOwner';
import { Readable } from 'stream';
import { Stream } from './stream';
import { Browser } from './browser';
import { BrowserContext } from './browserContext';
import * as fs from 'fs';
import { mkdirIfNeeded } from '../utils/utils';

export class Download extends ChannelOwner<channels.DownloadChannel, channels.DownloadInitializer> {
private _browser: Browser | undefined;

static from(download: channels.DownloadChannel): Download {
return (download as any)._object;
}

constructor(parent: ChannelOwner, type: string, guid: string, initializer: channels.DownloadInitializer) {
super(parent, type, guid, initializer);
this._browser = (parent as BrowserContext)._browser;
}

url(): string {
Expand All @@ -37,12 +44,26 @@ export class Download extends ChannelOwner<channels.DownloadChannel, channels.Do
}

async path(): Promise<string | null> {
if (this._browser && this._browser._isRemote)
throw new Error(`Path is not available when using browserType.connect(). Use download.saveAs() to save a local copy.`);
return (await this._channel.path()).value || null;
}

async saveAs(path: string): Promise<void> {
return this._wrapApiCall('download.saveAs', async () => {
await this._channel.saveAs({ path });
if (!this._browser || !this._browser._isRemote) {
await this._channel.saveAs({ path });
return;
}

const result = await this._channel.saveAsStream();
const stream = Stream.from(result.stream);
await mkdirIfNeeded(path);
await new Promise((resolve, reject) => {
stream.stream().pipe(fs.createWriteStream(path))
.on('finish' as any, resolve)
.on('error' as any, reject);
});
});
}

Expand Down
6 changes: 6 additions & 0 deletions src/client/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,10 @@ class StreamImpl extends Readable {
else
this.push(null);
}

_destroy(error: Error | null, callback: (error: Error | null) => void): void {
// Stream might be destroyed after the connection was closed.
this._channel.close().catch(e => null);
super._destroy(error, callback);
}
}
60 changes: 53 additions & 7 deletions src/dispatchers/downloadDispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import { Download } from '../server/download';
import * as channels from '../protocol/channels';
import { Dispatcher, DispatcherScope } from './dispatcher';
import { StreamDispatcher } from './streamDispatcher';
import * as fs from 'fs';
import * as util from 'util';
import { mkdirIfNeeded } from '../utils/utils';

export class DownloadDispatcher extends Dispatcher<Download, channels.DownloadInitializer> implements channels.DownloadChannel {
constructor(scope: DispatcherScope, download: Download) {
Expand All @@ -28,20 +31,63 @@ export class DownloadDispatcher extends Dispatcher<Download, channels.DownloadIn
}

async path(): Promise<channels.DownloadPathResult> {
const path = await this._object.path();
const path = await this._object.localPath();
return { value: path || undefined };
}

async saveAs(params: channels.DownloadSaveAsParams): Promise<void> {
await this._object.saveAs(params.path);
async saveAs(params: channels.DownloadSaveAsParams): Promise<channels.DownloadSaveAsResult> {
return await new Promise((resolve, reject) => {
this._object.saveAs(async (localPath, error) => {
if (error !== undefined) {
reject(error);
return;
}

try {
await mkdirIfNeeded(params.path);
await util.promisify(fs.copyFile)(localPath, params.path);
resolve();
} catch (e) {
reject(e);
}
});
});
}

async saveAsStream(): Promise<channels.DownloadSaveAsStreamResult> {
return await new Promise((resolve, reject) => {
this._object.saveAs(async (localPath, error) => {
if (error !== undefined) {
reject(error);
return;
}

try {
const readable = fs.createReadStream(localPath);
await new Promise(f => readable.on('readable', f));
const stream = new StreamDispatcher(this._scope, readable);
// Resolve with a stream, so that client starts saving the data.
resolve({ stream });
// Block the download until the stream is consumed.
await new Promise<void>(resolve => {
readable.on('close', resolve);
readable.on('end', resolve);
readable.on('error', resolve);
});
} catch (e) {
reject(e);
}
});
});
}

async stream(): Promise<channels.DownloadStreamResult> {
const stream = await this._object.createReadStream();
if (!stream)
const fileName = await this._object.localPath();
if (!fileName)
return {};
await new Promise(f => stream.on('readable', f));
return { stream: new StreamDispatcher(this._scope, stream) };
const readable = fs.createReadStream(fileName);
await new Promise(f => readable.on('readable', f));
return { stream: new StreamDispatcher(this._scope, readable) };
}

async failure(): Promise<channels.DownloadFailureResult> {
Expand Down
4 changes: 4 additions & 0 deletions src/dispatchers/streamDispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,8 @@ export class StreamDispatcher extends Dispatcher<stream.Readable, channels.Strea
const buffer = this._object.read(Math.min(this._object.readableLength, params.size || this._object.readableLength));
return { binary: buffer ? buffer.toString('base64') : '' };
}

async close() {
this._object.destroy();
}
}
10 changes: 10 additions & 0 deletions src/protocol/channels.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2104,6 +2104,7 @@ export type DownloadInitializer = {
export interface DownloadChannel extends Channel {
path(params?: DownloadPathParams): Promise<DownloadPathResult>;
saveAs(params: DownloadSaveAsParams): Promise<DownloadSaveAsResult>;
saveAsStream(params?: DownloadSaveAsStreamParams): Promise<DownloadSaveAsStreamResult>;
failure(params?: DownloadFailureParams): Promise<DownloadFailureResult>;
stream(params?: DownloadStreamParams): Promise<DownloadStreamResult>;
delete(params?: DownloadDeleteParams): Promise<DownloadDeleteResult>;
Expand All @@ -2120,6 +2121,11 @@ export type DownloadSaveAsOptions = {

};
export type DownloadSaveAsResult = void;
export type DownloadSaveAsStreamParams = {};
export type DownloadSaveAsStreamOptions = {};
export type DownloadSaveAsStreamResult = {
stream: StreamChannel,
};
export type DownloadFailureParams = {};
export type DownloadFailureOptions = {};
export type DownloadFailureResult = {
Expand All @@ -2138,6 +2144,7 @@ export type DownloadDeleteResult = void;
export type StreamInitializer = {};
export interface StreamChannel extends Channel {
read(params: StreamReadParams): Promise<StreamReadResult>;
close(params?: StreamCloseParams): Promise<StreamCloseResult>;
}
export type StreamReadParams = {
size?: number,
Expand All @@ -2148,6 +2155,9 @@ export type StreamReadOptions = {
export type StreamReadResult = {
binary: Binary,
};
export type StreamCloseParams = {};
export type StreamCloseOptions = {};
export type StreamCloseResult = void;

// ----------- CDPSession -----------
export type CDPSessionInitializer = {};
Expand Down
7 changes: 7 additions & 0 deletions src/protocol/protocol.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1769,10 +1769,16 @@ Download:
returns:
value: string?

# Blocks path/failure/delete/context.close until saved to the local |path|.
saveAs:
parameters:
path: string

# Blocks path/failure/delete/context.close until the stream is closed.
saveAsStream:
returns:
stream: Stream

failure:
returns:
error: string?
Expand All @@ -1796,6 +1802,7 @@ Stream:
returns:
binary: binary

close:


CDPSession:
Expand Down
2 changes: 2 additions & 0 deletions src/protocol/validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -804,12 +804,14 @@ export function createScheme(tChannel: (name: string) => Validator): Scheme {
scheme.DownloadSaveAsParams = tObject({
path: tString,
});
scheme.DownloadSaveAsStreamParams = tOptional(tObject({}));
scheme.DownloadFailureParams = tOptional(tObject({}));
scheme.DownloadStreamParams = tOptional(tObject({}));
scheme.DownloadDeleteParams = tOptional(tObject({}));
scheme.StreamReadParams = tObject({
size: tOptional(tNumber),
});
scheme.StreamCloseParams = tOptional(tObject({}));
scheme.CDPSessionSendParams = tObject({
method: tString,
params: tOptional(tAny),
Expand Down
45 changes: 15 additions & 30 deletions src/server/download.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@ import * as path from 'path';
import * as fs from 'fs';
import * as util from 'util';
import { Page } from './page';
import { Readable } from 'stream';
import { assert, mkdirIfNeeded } from '../utils/utils';
import { assert } from '../utils/utils';

type SaveCallback = (localPath: string, error?: string) => Promise<void>;

export class Download {
private _downloadsPath: string;
private _uuid: string;
private _finishedCallback: () => void;
private _finishedPromise: Promise<void>;
private _saveAsRequests: { fulfill: () => void; reject: (error?: any) => void; path: string }[] = [];
private _saveCallbacks: SaveCallback[] = [];
private _finished: boolean = false;
private _page: Page;
private _acceptDownloads: boolean;
Expand Down Expand Up @@ -63,7 +64,7 @@ export class Download {
return this._suggestedFilename!;
}

async path(): Promise<string | null> {
async localPath(): Promise<string | null> {
if (!this._acceptDownloads)
throw new Error('Pass { acceptDownloads: true } when you are creating your browser context.');
const fileName = path.join(this._downloadsPath, this._uuid);
Expand All @@ -73,7 +74,7 @@ export class Download {
return fileName;
}

async saveAs(path: string) {
saveAs(saveCallback: SaveCallback) {
if (!this._acceptDownloads)
throw new Error('Pass { acceptDownloads: true } when you are creating your browser context.');
if (this._deleted)
Expand All @@ -82,17 +83,10 @@ export class Download {
throw new Error('Download not found on disk. Check download.failure() for details.');

if (this._finished) {
await this._saveAs(path);
saveCallback(path.join(this._downloadsPath, this._uuid));
return;
}

return new Promise((fulfill, reject) => this._saveAsRequests.push({fulfill, reject, path}));
}

async _saveAs(downloadPath: string) {
const fileName = path.join(this._downloadsPath, this._uuid);
await mkdirIfNeeded(downloadPath);
await util.promisify(fs.copyFile)(fileName, downloadPath);
this._saveCallbacks.push(saveCallback);
}

async failure(): Promise<string | null> {
Expand All @@ -102,15 +96,10 @@ export class Download {
return this._failure;
}

async createReadStream(): Promise<Readable | null> {
const fileName = await this.path();
return fileName ? fs.createReadStream(fileName) : null;
}

async delete(): Promise<void> {
if (!this._acceptDownloads)
return;
const fileName = await this.path();
const fileName = await this.localPath();
if (this._deleted)
return;
this._deleted = true;
Expand All @@ -123,18 +112,14 @@ export class Download {
this._failure = error || null;

if (error) {
for (const { reject } of this._saveAsRequests)
reject(error);
for (const callback of this._saveCallbacks)
callback('', error);
} else {
for (const { fulfill, reject, path } of this._saveAsRequests) {
try {
await this._saveAs(path);
fulfill();
} catch (err) {
reject(err);
}
}
const fullPath = path.join(this._downloadsPath, this._uuid);
for (const callback of this._saveCallbacks)
await callback(fullPath);
}
this._saveCallbacks = [];

this._finishedCallback();
}
Expand Down
46 changes: 0 additions & 46 deletions test/browsertype-connect-subprocess.spec.ts

This file was deleted.

Loading

0 comments on commit 8d7ec3a

Please sign in to comment.