Skip to content

Commit

Permalink
refactor: factor cancellation to base class
Browse files Browse the repository at this point in the history
  • Loading branch information
dvirtz committed Jul 30, 2023
1 parent 198367b commit 1e5a261
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 129 deletions.
42 changes: 13 additions & 29 deletions src/arrow-backend.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { AsyncRecordBatchStreamReader } from 'apache-arrow/Arrow';
import { PassThrough } from 'stream';
import { CancellationToken } from 'vscode';
import { getLogger } from './logger';
import { ParquetBackend } from './parquet-backend';
import { jsonSpace } from './settings';
import { readParquet } from 'parquet-reader';
Expand All @@ -14,36 +13,21 @@ function bigIntToJson(value: bigint) {
return value.toString();
}

export class ArrowBackend implements ParquetBackend {
public async * toJson(parquetPath: string, token?: CancellationToken | undefined): AsyncGenerator<string> {
getLogger().info(`opening ${parquetPath}`)
try {
const stream = new PassThrough;
readParquet(parquetPath, stream);
const batches = await AsyncRecordBatchStreamReader.from(stream);
export class ArrowBackend extends ParquetBackend {
public async * toJsonImpl(parquetPath: string, _token?: CancellationToken): AsyncGenerator<string> {
const stream = new PassThrough;
readParquet(parquetPath, stream);
const batches = await AsyncRecordBatchStreamReader.from(stream);

// read all records from the file and print them
for await (const batch of batches) {
for await (const row of batch) {
if (token?.isCancellationRequested) {
break;
}
yield JSON.stringify(row, (key, value) => {
return typeof value === 'bigint'
? bigIntToJson(value)
: value // return everything else unchanged
}, jsonSpace());
}
// read all records from the file and print them
for await (const batch of batches) {
for await (const row of batch) {
yield JSON.stringify(row, (key, value) => {
return typeof value === 'bigint'
? bigIntToJson(value)
: value // return everything else unchanged
}, jsonSpace());
}
} catch (error) {
const message = `while reading ${parquetPath}: ${error}`;
getLogger().error(message);
throw Error(message);
}

if (token?.isCancellationRequested) {
getLogger().info(`parsing ${parquetPath} was cancelled by user`);
}
}

}
18 changes: 15 additions & 3 deletions src/parquet-backend.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
import * as vscode from 'vscode';
import { CancellationToken } from 'vscode';
import { getLogger } from './logger';

export interface ParquetBackend {
toJson(parquetPath: string, token?: vscode.CancellationToken): AsyncGenerator<string>;
export abstract class ParquetBackend {
public async* toJson(parquetPath: string, token?: CancellationToken) {
getLogger().info(`opening ${parquetPath}`)
for await (const line of this.toJsonImpl(parquetPath, token)) {
if (token?.isCancellationRequested) {
getLogger().info(`parsing ${parquetPath} was cancelled by user`);
break;
}
yield line;
}
}

abstract toJsonImpl(parquetPath: string, token?: CancellationToken): AsyncGenerator<string>;
}
67 changes: 33 additions & 34 deletions src/parquet-document.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,8 @@ export default class ParquetDocument implements vscode.Disposable {
this._parquetPath = this._uri.fsPath.replace(/\.as\.json$/, '');
const watcher = vscode.workspace.createFileSystemWatcher(new vscode.RelativePattern(this._parquetPath, "*"));
this._disposables.push(watcher);
this._disposables.push(watcher.onDidChange(async uri => {
assert(uri.fsPath == this._parquetPath);
return await this.populate();
}));
this._disposables.push(watcher.onDidCreate(async uri => {
assert(uri.fsPath == this._parquetPath);
return await this.populate();
}));
this._disposables.push(watcher.onDidChange(this.tryPopulate.bind(this)));
this._disposables.push(watcher.onDidCreate(this.tryPopulate.bind(this)));
}

dispose() {
Expand All @@ -52,44 +46,49 @@ export default class ParquetDocument implements vscode.Disposable {
}

public static async create(uri: vscode.Uri, emitter: vscode.EventEmitter<vscode.Uri>): Promise<ParquetDocument> {
const parquet = new ParquetDocument(uri, emitter);
await parquet.populate();
return parquet;
try {
const parquet = new ParquetDocument(uri, emitter);
await parquet.populate();
return parquet;
} catch (error) {
const message = `while reading ${uri}: ${error}`;
getLogger().error(message);
await vscode.window.showErrorMessage(`${error}`);
throw Error(message);
}
}

get value() {
return this._lines.join(os.EOL) + os.EOL;
}

private tryPopulate(uri: vscode.Uri) {
assert(uri.fsPath == this._parquetPath);
this.populate().catch(error => getLogger().warn(`failed to populate ${this._parquetPath}: ${error}`))
}

private async populate() {
// protect against onCreate firing right after create
try {
const {mtimeMs} = await promises.stat(this._parquetPath);
if (mtimeMs == this._lastMod) {
getLogger().debug("skipping populate() as modification timestamp hasn't changed");
return;
}
this._lastMod = mtimeMs;
} catch (err) {
getLogger().warn(`failed populating ${this._parquetPath}: ${err}`);
const { mtimeMs } = await promises.stat(this._parquetPath);
if (mtimeMs == this._lastMod) {
getLogger().debug("skipping populate() as modification timestamp hasn't changed");
return;
}
this._lastMod = mtimeMs;

const lines: string[] = [];

try {
await vscode.window.withProgress({
location: vscode.ProgressLocation.Notification,
title: `opening ${path.basename(this._parquetPath)}`,
cancellable: true
},
async (progress, token) => {
for await (const line of this._backend.toJson(this._parquetPath, token)) {
lines.push(line);
}
});
} catch (err) {
await vscode.window.showErrorMessage(`${err}`);
}
await vscode.window.withProgress({
location: vscode.ProgressLocation.Notification,
title: `opening ${path.basename(this._parquetPath)}`,
cancellable: true
},
async (progress, token) => {
for await (const line of this._backend.toJson(this._parquetPath, token)) {
lines.push(line);
}
}
);
if (lines != this._lines) {
this._lines = lines;
this._emitter.fire(this._uri);
Expand Down
28 changes: 3 additions & 25 deletions src/parquet-tools-backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { parquetTools as getParquetTools } from './settings';
import { createInterface } from 'readline';
import { ParquetBackend } from './parquet-backend';

export class ParquetToolsBackend implements ParquetBackend {
export class ParquetToolsBackend extends ParquetBackend {

public static async* spawnParquetTools(params: string[], token?: vscode.CancellationToken): AsyncGenerator<string> {
const [command, ...args] = await ParquetToolsBackend.parquetToolsPath();
Expand Down Expand Up @@ -54,29 +54,7 @@ export class ParquetToolsBackend implements ParquetBackend {
return [parquetTools];
}

public async * toJson(parquetPath: string, token?: vscode.CancellationToken): AsyncGenerator<string> {
const cancelledMessage = `parsing ${parquetPath} was cancelled by user`;
if (token?.isCancellationRequested) {
getLogger().info(cancelledMessage);
return;
}

token?.onCancellationRequested(_ => {
getLogger().info(cancelledMessage);
});

try {
yield* ParquetToolsBackend.spawnParquetTools(['cat', '-j', parquetPath], token);
} catch (e) {
let message = `while reading ${parquetPath}: `;
message += (_ => {
if (e instanceof Error) {
return e.message;
}
return `${e}`;
})();
getLogger().error(message);
throw Error(message);
}
public async * toJsonImpl(parquetPath: string, token?: vscode.CancellationToken): AsyncGenerator<string> {
yield* ParquetToolsBackend.spawnParquetTools(['cat', '-j', parquetPath], token);
}
}
36 changes: 11 additions & 25 deletions src/parquets-backend.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,19 @@
import * as vscode from 'vscode';
import { getLogger } from './logger';
import { CancellationToken } from 'vscode';
import { ParquetReader } from '@dvirtz/parquets';
import { ParquetBackend } from './parquet-backend';
import { jsonSpace } from './settings';

export class ParquetsBackend implements ParquetBackend {
public async * toJson(parquetPath: string, token?: vscode.CancellationToken): AsyncGenerator<string> {
const cancelledMessage = `parsing ${parquetPath} was cancelled by user`;
if (token?.isCancellationRequested) {
getLogger().info(cancelledMessage);
return;
}

getLogger().info(`opening ${parquetPath}`)
try {
const reader = await ParquetReader.openFile(parquetPath);
const cursor = reader.getCursor();
export class ParquetsBackend extends ParquetBackend {
public async * toJsonImpl(parquetPath: string, _token?: CancellationToken): AsyncGenerator<string> {
const reader = await ParquetReader.openFile(parquetPath);
const cursor = reader.getCursor();

// read all records from the file and print them
let record = null;
while (!token?.isCancellationRequested && (record = await cursor.next())) {
yield JSON.stringify(record, null, jsonSpace());
}

await reader.close();
} catch (error) {
const message = `while reading ${parquetPath}: ${error}`;
getLogger().error(message);
throw Error(message);
// read all records from the file and print them
let record = null;
while ((record = await cursor.next())) {
yield JSON.stringify(record, null, jsonSpace());
}

await reader.close();
}
}
18 changes: 15 additions & 3 deletions test/unit/arrow-backend.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ describe("ArrowBackend tests", () => {
});

test("Error on not existing file", async function () {
await expect(toArray(backend.toJson("no-such-file"))).rejects.toMatchObject({
'message': expect.stringMatching(/while reading no-such-file: Error: Failed to open no-such-file: Failed to open local file 'no-such-file'/)
});
await expect(toArray(backend.toJson("no-such-file"))).rejects.toThrow(
"Failed to open no-such-file: Failed to open local file 'no-such-file'"
);
});

test.each([0, 2, 10, "\t", "###"])('Test space %s', async function (space) {
Expand All @@ -51,4 +51,16 @@ describe("ArrowBackend tests", () => {

expect(json).toEqual(expected);
});

test("cancellation", async function () {
const token = {
get isCancellationRequested() {
return this.isCancellationRequestedMock();
},
isCancellationRequestedMock: jest.fn().mockReturnValueOnce(false).mockReturnValue(true),
onCancellationRequested: jest.fn()
};
expect(await toArray(backend.toJson(path.join(workspace, `small.parquet`), token))).toHaveLength(1);
expect(token.isCancellationRequestedMock).toBeCalledTimes(2);
});
});
21 changes: 16 additions & 5 deletions test/unit/parquet-tools-backend.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,36 @@ jest.mock('vscode', () => {

describe("ParquetToolsBackend tests", () => {
const backend = new ParquetToolsBackend();
const workspace = path.join(rootDir, 'test', 'workspace');

test.each(
["small", "large"]
)('Converts %s parquet to JSON', async function (name) {
const workspace = path.join(rootDir, 'test', 'workspace');
const json = (await toArray(backend.toJson(path.join(workspace, `${name}.parquet`))));
const expected = await toArray(createInterface({input: createReadStream(path.join(workspace, `${name}.json`))}));
const expected = await toArray(createInterface({ input: createReadStream(path.join(workspace, `${name}.json`)) }));

expect(json).toEqual(expected);
});

test("Error on not existing file", async function () {
await expect(toArray(backend.toJson("no-such-file"))).rejects.toMatchObject({
'message': expect.stringMatching(/while reading no-such-file: parquet-tools exited with code 1\n.*java.io.FileNotFoundException: File no-such-file does not exist/s)
});
await expect(toArray(backend.toJson("no-such-file"))).rejects.toThrow(
/parquet-tools exited with code 1\n.*java.io.FileNotFoundException: File no-such-file does not exist/s);
});

test("-h works", async function () {
const stdout = (await ParquetToolsBackend.spawnParquetTools(['-h']).next()).value;
expect(stdout).toContain('parquet-tools cat:');
});

test("cancellation", async function () {
const token = {
get isCancellationRequested() {
return this.isCancellationRequestedMock();
},
isCancellationRequestedMock: jest.fn().mockReturnValueOnce(false).mockReturnValue(true),
onCancellationRequested: jest.fn()
};
expect(await toArray(backend.toJson(path.join(workspace, `small.parquet`), token))).toHaveLength(1);
expect(token.isCancellationRequestedMock).toBeCalledTimes(2);
});
});
20 changes: 15 additions & 5 deletions test/unit/parquets-backend.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,34 @@ describe("ParquetsBackend tests", () => {
["small", "large"]
)('Converts %s parquet to JSON', async function (name) {
const json = (await toArray(backend.toJson(path.join(workspace, `${name}.parquet`)))).map(line => line.trim());
const expected = await toArray(createInterface({input: createReadStream(path.join(workspace, `${name}.json`))}));
const expected = await toArray(createInterface({ input: createReadStream(path.join(workspace, `${name}.json`)) }));

expect(json).toEqual(expected);
});

test("Error on not existing file", async function () {
await expect(toArray(backend.toJson("no-such-file"))).rejects.toMatchObject({
'message': expect.stringMatching(/while reading no-such-file: Error: ENOENT: no such file or directory, stat '.*no-such-file'/)
});
await expect(toArray(backend.toJson("no-such-file"))).rejects.toThrow(/ENOENT: no such file or directory, stat '.*no-such-file'/);
});

test.each([0, 2, 10, "\t", "###"])('Test space %s', async function (space) {
jest.mocked(jsonSpace).mockReturnValue(space);

const json = (await toArray(backend.toJson(path.join(workspace, `small.parquet`)))).map(line => line.trim());
const records = await toArray(createInterface({input: createReadStream(path.join(workspace, `small.json`))}));
const records = await toArray(createInterface({ input: createReadStream(path.join(workspace, `small.json`)) }));
const expected = records.map(record => JSON.stringify(JSON.parse(record), null, space));

expect(json).toEqual(expected);
});

test("cancellation", async function () {
const token = {
get isCancellationRequested() {
return this.isCancellationRequestedMock();
},
isCancellationRequestedMock: jest.fn().mockReturnValueOnce(false).mockReturnValue(true),
onCancellationRequested: jest.fn()
};
expect(await toArray(backend.toJson(path.join(workspace, `small.parquet`), token))).toHaveLength(1);
expect(token.isCancellationRequestedMock).toBeCalledTimes(2);
});
});

0 comments on commit 1e5a261

Please sign in to comment.