Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework: query return type as a proper async iterator #43

Merged
merged 3 commits into from
Jul 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@ const trino = new Trino({
auth: new BasicAuth('test'),
});

const queryIter = await trino.query('select * from customer limit 100');
const data = await queryIter.fold<QueryData[]>([], (row, acc) => [
...acc,
...(row.data ?? []),
]);
const iter = await trino.query('select * from customer limit 100');
const data = await iter
.map(r => r.data ?? [])
.fold<QueryData[]>([], (row, acc) => [...acc, ...row]);
```

More usage [examples](https://github.com/regadas/trino-js-client/blob/main/tests/it/client.spec.ts) can be found in the [integration tests](https://github.com/regadas/trino-js-client/blob/main/tests/it/client.spec.ts).
Expand Down
133 changes: 74 additions & 59 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class Client {
* @param cfg - AxiosRequestConfig<any>
* @returns The response data.
*/
async request<T>(cfg: AxiosRequestConfig<any>): Promise<T> {
async request<T>(cfg: AxiosRequestConfig<unknown>): Promise<T> {
return axios
.create(this.clientConfig)
.request(cfg)
Expand Down Expand Up @@ -239,7 +239,7 @@ class Client {
* @param {Query | string} query - The query to execute.
* @returns A promise that resolves to a QueryResult object.
*/
async query(query: Query | string): Promise<QueryResult> {
async query(query: Query | string): Promise<Iterator<QueryResult>> {
const req = typeof query === 'string' ? {query} : query;
const headers: Headers = {
[TRINO_USER_HEADER]: req.user,
Expand All @@ -250,13 +250,15 @@ class Client {
req.extraCredential ?? {}
),
};

return this.request({
const requestConfig = {
method: 'POST',
url: '/v1/statement',
data: req.query,
headers: cleanHeaders(headers),
});
};
return this.request<QueryResult>(requestConfig).then(
result => new Iterator(new QueryIterator(this, result))
);
}

/**
Expand All @@ -280,15 +282,76 @@ class Client {
}
}

export class Iterator<T> implements AsyncIterableIterator<T> {
constructor(private readonly iter: AsyncIterableIterator<T>) {}

[Symbol.asyncIterator](): AsyncIterableIterator<T> {
return this;
}

next(): Promise<IteratorResult<T>> {
return this.iter.next();
}

/**
* Calls a defined callback function on each QueryResult, and returns an array that contains the results.
* @param fn A function that accepts a QueryResult. map calls the fn function one time for each QueryResult.
*/
map<B>(fn: (t: T) => B): Iterator<B> {
const that: AsyncIterableIterator<T> = this.iter;
const asyncIterableIterator: AsyncIterableIterator<B> = {
[Symbol.asyncIterator]: () => asyncIterableIterator,
async next() {
return that.next().then(result => {
return <IteratorResult<B>>{
value: fn(result.value),
done: result.done,
};
});
},
};
return new Iterator(asyncIterableIterator);
}

/**
* Performs the specified action for each element.
* @param fn A function that accepts a QueryResult. forEach calls the fn function one time for each QueryResult.
*/
async forEach(fn: (value: T) => void): Promise<void> {
for await (const value of this) {
fn(value);
}
}

/**
* Calls a defined callback function on each QueryResult. The return value of the callback function is the accumulated
* result, and is provided as an argument in the next call to the callback function.
* @param acc The initial value of the accumulator.
* @param fn A function that accepts a QueryResult and accumulator, and returns an accumulator.
*/
async fold<B>(acc: B, fn: (value: T, acc: B) => B): Promise<B> {
await this.forEach(value => (acc = fn(value, acc)));
return acc;
}
}

/**
* Iterator for the query result data.
*/
export class QueryIterator {
export class QueryIterator implements AsyncIterableIterator<QueryResult> {
constructor(
private readonly client: Client,
private queryResult: QueryResult
) {}

[Symbol.asyncIterator](): AsyncIterableIterator<QueryResult> {
return this;
}

async foobar(): Promise<number> {
return Promise.resolve(1);
}

/**
* It returns true if the queryResult object has a nextUri property, and false otherwise
* @returns A boolean value.
Expand All @@ -302,9 +365,9 @@ export class QueryIterator {
* results and the query reached a completion state, successful or failure.
* @returns The next set of results.
*/
async next(): Promise<QueryResult> {
async next(): Promise<IteratorResult<QueryResult>> {
if (!this.hasNext()) {
return this.queryResult;
return Promise.resolve({value: this.queryResult, done: true});
}

this.queryResult = await this.client.request<QueryResult>({
Expand All @@ -318,53 +381,7 @@ export class QueryIterator {
}
}

return this.queryResult;
}

/**
* Closes the iterator which in reallity cancels the running query.
* @returns The query result with the id of the cancelled query.
*/
async close(): Promise<QueryResult> {
this.queryResult = await this.client.cancel(this.queryResult.id);
return this.queryResult;
}

/**
* Performs the specified action for each element.
* @param fn A function that accepts a QueryResult. forEach calls the fn function one time for each QueryResult.
*/
async forEach(fn: (queryResult: QueryResult) => void): Promise<void> {
try {
while (this.hasNext()) {
await this.next();
fn(this.queryResult);
}
} finally {
await this.close();
}
}

/**
* Calls a defined callback function on each QueryResult, and returns an array that contains the results.
* @param fn A function that accepts a QueryResult. map calls the fn function one time for each QueryResult.
*/
async map<T>(fn: (queryResult: QueryResult) => T): Promise<T[]> {
return this.fold(<T[]>[], (qr, acc) => {
acc.push(fn(qr));
return acc;
});
}

/**
* Calls a defined callback function on each QueryResult. The return value of the callback function is the accumulated
* result, and is provided as an argument in the next call to the callback function.
* @param acc The initial value of the accumulator.
* @param fn A function that accepts a QueryResult and accumulator, and returns an accumulator.
*/
async fold<T>(acc: T, fn: (row: QueryResult, acc: T) => T): Promise<T> {
await this.forEach(row => (acc = fn(row, acc)));
return acc;
return Promise.resolve({value: this.queryResult, done: false});
}
}

Expand All @@ -383,10 +400,8 @@ export class Trino {
* @param query - The query to execute.
* @returns A QueryIterator object.
*/
async query(query: Query | string): Promise<QueryIterator> {
return this.client
.query(query)
.then(resp => new QueryIterator(this.client, resp));
async query(query: Query | string): Promise<Iterator<QueryResult>> {
return this.client.query(query);
}

/**
Expand Down
39 changes: 19 additions & 20 deletions tests/it/client.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {BasicAuth, QueryData, Trino} from '../../src';
import {BasicAuth, QueryData, QueryResult, Iterator, Trino} from '../../src';

const allCustomerQuery = 'select * from customer';
const limit = 1;
Expand All @@ -18,11 +18,11 @@ describe('trino', () => {
schema: 'sf100000',
auth: new BasicAuth('test'),
});
const query = await trino.query(singleCustomerQuery);
const data = await query.fold<QueryData[]>([], (row, acc) => [
...acc,
...(row.data ?? []),
]);

const iter = await trino.query(singleCustomerQuery);
const data = await iter
.map(r => r.data ?? [])
.fold<QueryData[]>([], (row, acc) => [...acc, ...row]);

expect(data).toHaveLength(limit);
});
Expand All @@ -35,9 +35,9 @@ describe('trino', () => {
});
const query = await trino.query(allCustomerQuery);
const qr = await query.next();
await query.close();
await trino.cancel(qr.value.id);

const info = await trino.queryInfo(qr.id);
const info = await trino.queryInfo(qr.value.id);

expect(info.state).toBe('FAILED');
});
Expand All @@ -51,8 +51,8 @@ describe('trino', () => {
const query = await trino.query(allCustomerQuery);
const qr = await query.next();

await trino.cancel(qr.id);
const info = await trino.queryInfo(qr.id);
await trino.cancel(qr.value.id);
const info = await trino.queryInfo(qr.value.id);

expect(info.state).toBe('FAILED');
});
Expand All @@ -65,23 +65,22 @@ describe('trino', () => {
});
const query = await trino.query(singleCustomerQuery);
const qr = await query.next();
await query.close();
await trino.cancel(qr.value.id);

const info = await trino.queryInfo(qr.id);
const info = await trino.queryInfo(qr.value.id);
expect(info.query).toBe(singleCustomerQuery);
});

test.concurrent('query request header propagation', async () => {
const trino = new Trino({catalog: 'tpcds', auth: new BasicAuth('test')});
const query = await trino.query(useSchemaQuery);
await query.next();
await query.close();

const sqr = await trino.query(singleCustomerQuery);
const qr = await sqr.next();
await sqr.close();
await trino.cancel(qr.value.id);

const info = await trino.queryInfo(qr.id);
const info = await trino.queryInfo(qr.value.id);
expect(info.query).toBe(singleCustomerQuery);
});

Expand All @@ -93,12 +92,12 @@ describe('trino', () => {
});
const sqr = await trino.query('select * from foobar where id = -1');
const qr = await sqr.next();
expect(qr.error).toBeDefined();
expect(qr.error?.message).toBe(
expect(qr.value.error).toBeDefined();
expect(qr.value.error?.message).toBe(
"line 1:15: Table 'tpcds.sf100000.foobar' does not exist"
);

await sqr.close();
await trino.cancel(qr.value.id);
});

test.concurrent('QueryInfo has failure info', async () => {
Expand All @@ -110,9 +109,9 @@ describe('trino', () => {

const sqr = await trino.query('select * from foobar where id = -1');
const qr = await sqr.next();
await sqr.close();
await trino.cancel(qr.value.id);

const info = await trino.queryInfo(qr.id);
const info = await trino.queryInfo(qr.value.id);
expect(info.state).toBe('FAILED');
expect(info.failureInfo?.message).toBe(
"line 1:15: Table 'tpcds.sf100000.foobar' does not exist"
Expand Down