Skip to content

Commit

Permalink
feat: make formatters node streams
Browse files Browse the repository at this point in the history
  • Loading branch information
dvirtz committed Aug 20, 2024
1 parent 3ef8701 commit 20c90b5
Show file tree
Hide file tree
Showing 21 changed files with 169 additions and 166 deletions.
1 change: 0 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
"<node_internals>/**"
],
"type": "node",
// "runtimeExecutable": "tsx",
"runtimeArgs": [
"--import",
"tsx",
Expand Down
17 changes: 17 additions & 0 deletions .vscode/tasks.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,23 @@
"presentation": {
"clear": true
}
},
{
"type": "npm",
"script": "unit",
"problemMatcher": [],
"label": "npm: unit",
"detail": "node --import tsx --import ./test/unit/automocks.ts --test test/unit/*.test.ts",
"presentation": {
"clear": true
}
},
{
"type": "npm",
"script": "lint",
"problemMatcher": [],
"label": "npm: lint",
"detail": "eslint --max-warnings=0 \"**/*.ts\""
}
]
}
40 changes: 16 additions & 24 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"."
],
"engines": {
"node": ">=19.0.0",
"node": ">=21.0.0",
"vscode": "^1.82.0"
},
"categories": [
Expand Down Expand Up @@ -205,7 +205,7 @@
"@semantic-release/changelog": "^6.0.3",
"@semantic-release/git": "^10.0.1",
"@types/glob": "^8.1.0",
"@types/node": "^20.14.12",
"@types/node": "^22.4.0",
"@types/split2": "^4.2.3",
"@types/unzipper": "^0.10.8",
"@types/vscode": "^1.46.0",
Expand All @@ -229,7 +229,6 @@
"unzipper": "^0.10.14"
},
"dependencies": {
"@async-generators/to-array": "^0.1.0",
"@dvirtz/parquets": "^0.11.6",
"@vscode-logging/logger": "^1.2.3",
"@vscode-logging/wrapper": "^1.0.1",
Expand Down
8 changes: 3 additions & 5 deletions src/backends/arrow-cpp-backend.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { AsyncRecordBatchStreamReader } from "apache-arrow";
import { AsyncRecordBatchStreamReader } from 'apache-arrow';
import { PassThrough, Readable } from "node:stream";
import { recordBatchTransform } from './record-batch-transform';


export async function arrowCppBackend(path: string): Promise<Readable> {
export async function arrowCppBackend(path: string, signal?: AbortSignal): Promise<Readable> {
const readParquet = await (async () => {
try {
const module = await import("parquet-reader");
Expand All @@ -14,7 +14,5 @@ export async function arrowCppBackend(path: string): Promise<Readable> {
})();
const stream = new PassThrough;
readParquet(path, stream);
return (await AsyncRecordBatchStreamReader.from(stream)).toNodeStream().pipe(recordBatchTransform());
return recordBatchTransform((await AsyncRecordBatchStreamReader.from(stream)).toNodeStream(), signal);
}


42 changes: 21 additions & 21 deletions src/backends/parquet-backend-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,28 @@ import { parquetWasmBackend } from './parquet-wasm-backend';
import { parquetsBackend } from './parquets-backend';

export async function createParquetBackend(backend: BackendName, path: string, token?: CancellationToken): Promise<Readable> {
getLogger().info(`opening ${path} using ${backend} backend`);
const reader = await (async () => {
switch (backend) {
case 'parquet-tools':
return await parquetToolsBackend(path, token);
case 'parquets':
return await parquetsBackend(path);
case 'arrow':
return await arrowCppBackend(path);
case 'parquet-wasm':
return await parquetWasmBackend(path);
}
})();
const abortSignal = (() => {
if (token) {
const controller = new AbortController();

token?.onCancellationRequested(() => {
getLogger().info(`parsing ${path} was cancelled by user`);
reader.destroy();
});
token.onCancellationRequested(() => {
getLogger().info(`parsing ${path} was cancelled by user`);
controller.abort('user cancel');
});

return reader;
}
return controller.signal;
}
})();

export async function* generateParquetRows(backend: BackendName, path: string, token?: CancellationToken) {
yield* await createParquetBackend(backend, path, token);
getLogger().info(`opening ${path} using ${backend} backend`);
switch (backend) {
case 'parquet-tools':
return parquetToolsBackend(path, abortSignal);
case 'parquets':
return parquetsBackend(path, abortSignal);
case 'arrow':
return arrowCppBackend(path, abortSignal);
case 'parquet-wasm':
return parquetWasmBackend(path, abortSignal);
}
}
13 changes: 6 additions & 7 deletions src/backends/parquet-tools-backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,30 @@ import * as vscode from 'vscode';
import { getLogger } from '../logger';
import { parquetTools as getParquetTools } from '../settings';

export async function parquetToolsBackend(path: string, token?: vscode.CancellationToken): Promise<Readable> {

return Readable.from(generateParquetRows(path, token));
export async function parquetToolsBackend(path: string, signal?: AbortSignal): Promise<Readable> {
return Readable.from(generateParquetRows(path, signal));
}

async function* generateParquetRows(path: string, token?: vscode.CancellationToken) {
async function* generateParquetRows(path: string, signal?: AbortSignal) {
const [command, ...args] = await parquetToolsPath();
getLogger().debug(`spawning ${command} cat -j ${path}`);
const childProcess = spawn(command, args.concat(['cat', '-j', path]));

token?.onCancellationRequested(_ => {
signal?.addEventListener('abort', _ => {
childProcess.kill();
});
let stderr = '';
childProcess.stderr.on('data', data => stderr += data);

yield* childProcess.stdout.pipe(split2(JSON.parse));
yield* childProcess.stdout.compose(split2(JSON.parse), signal && { signal: signal });

const exitCode = await new Promise<number>((resolve) => {
childProcess.on('exit', (code) => {
resolve(code || 0);
});
});

if (exitCode != 0 && !token?.isCancellationRequested) {
if (exitCode != 0 && !signal?.aborted) {
throw new Error(`parquet-tools exited with code ${exitCode}\n${stderr}`);
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/backends/parquet-wasm-backend.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { AsyncRecordBatchStreamReader } from "apache-arrow";
import { AsyncRecordBatchStreamReader } from 'apache-arrow';
import { readFile } from "node:fs/promises";
import { Readable } from 'node:stream';
import { readParquet } from "parquet-wasm/node/arrow1";
import { recordBatchTransform } from './record-batch-transform';

export async function parquetWasmBackend(path: string): Promise<Readable> {
export async function parquetWasmBackend(path: string, signal?: AbortSignal): Promise<Readable> {
const data = new Uint8Array(await readFile(path));
const stream = readParquet(data);
return AsyncRecordBatchStreamReader.from(stream.intoIPCStream()).toNodeStream().pipe(recordBatchTransform());
return recordBatchTransform(AsyncRecordBatchStreamReader.from(stream.intoIPCStream()).toNodeStream(), signal);
}
4 changes: 2 additions & 2 deletions src/backends/parquets-backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ async function* parquetsGenerator(path: string) {
await reader.close();
}

export async function parquetsBackend(path: string): Promise<Readable> {
return Readable.from(parquetsGenerator(path), { objectMode: true });
export async function parquetsBackend(path: string, signal?: AbortSignal): Promise<Readable> {
return Readable.from(parquetsGenerator(path), { objectMode: true, signal: signal });
}
11 changes: 8 additions & 3 deletions src/backends/record-batch-transform.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import { RecordBatch } from 'apache-arrow';
import { Transform, TransformCallback } from 'node:stream';
import { Readable, Transform, TransformCallback, addAbortSignal } from 'node:stream';

export function recordBatchTransform() {
return new Transform({
export async function recordBatchTransform(reader: Readable, signal?: AbortSignal) {
const transform = new Transform({
objectMode: true, transform: function (chunk: RecordBatch, _encoding: string, callback: TransformCallback) {
for (const row of chunk) {
this.push(row);
}
callback();
}
});
const composed = reader.compose(transform);
if (signal) {
addAbortSignal(signal, composed);
}
return composed;
}
34 changes: 19 additions & 15 deletions src/formatters/csv-formatter.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,31 @@
import { Transform } from 'node:stream';
import { TransformCallback } from 'stream';
import { csvSeparator } from '../settings';
import { Formatter } from "./formatter";

export class CsvFormatter implements Formatter {
async* format(lines: AsyncGenerator<object>): AsyncGenerator<string> {
const first = await lines.next();
if (first.value) {
yield this.generateHeader(first.value);
yield this.generateRow(first.value);
}
for await (const line of lines) {
yield this.generateRow(line);
}
export class CsvFormatter extends Transform {
private first = true;
private separator = csvSeparator();

constructor() {
super({
objectMode: true
});
}

format_error(message: string): string {
return message;
_transform(chunk: object, _encoding: BufferEncoding, callback: TransformCallback): void {
if (this.first) {
this.first = false;
this.push(this.generateHeader(chunk));
}
this.push(this.generateRow(chunk));
callback();
}

private generateHeader(line: object) {
return Object.keys(line).join(csvSeparator());
return Object.keys(line).join(this.separator);
}

private generateRow(line: object) {
return Object.values(line).join(csvSeparator());
return Object.values(line).join(this.separator);
}
}
4 changes: 2 additions & 2 deletions src/formatters/formatter-factory.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { Transform } from 'node:stream';
import { format } from "../settings";
import { CsvFormatter } from "./csv-formatter";
import { Formatter } from "./formatter";
import { JsonFormatter } from "./json-formatter";

export function createFormatter(): Formatter {
export function createFormatter(): Transform {
switch (format()) {
case 'json':
return new JsonFormatter;
Expand Down
6 changes: 0 additions & 6 deletions src/formatters/formatter.ts

This file was deleted.

Loading

0 comments on commit 20c90b5

Please sign in to comment.