Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(Google BigQuery Node): Send timeoutMs in query, pagination support #10205

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jest.mock('../../../v2/transport', () => {
const originalModule = jest.requireActual('../../../v2/transport');
return {
...originalModule,
googleApiRequest: jest.fn(async (method: IHttpRequestMethods, resource: string) => {
googleBigQueryApiRequest: jest.fn(async (method: IHttpRequestMethods, resource: string) => {
if (resource === '/v2/projects/test-project/jobs' && method === 'POST') {
return {
jobReference: {
Expand All @@ -25,7 +25,7 @@ jest.mock('../../../v2/transport', () => {
return {};
}
}),
// googleApiRequestAllItems: jest.fn(async () => {}),
googleBigQueryApiRequestAllItems: jest.fn(async () => ({ rows: [], schema: {} })),
};
});

Expand All @@ -47,8 +47,9 @@ describe('Test Google BigQuery V2, executeQuery', () => {
const testNode = async (testData: WorkflowTestData, types: INodeTypes) => {
const { result } = await executeWorkflow(testData, types);

expect(transport.googleApiRequest).toHaveBeenCalledTimes(2);
expect(transport.googleApiRequest).toHaveBeenCalledWith(
expect(transport.googleBigQueryApiRequest).toHaveBeenCalledTimes(1);
expect(transport.googleBigQueryApiRequestAllItems).toHaveBeenCalledTimes(1);
expect(transport.googleBigQueryApiRequest).toHaveBeenCalledWith(
'POST',
'/v2/projects/test-project/jobs',
{
Expand All @@ -60,11 +61,11 @@ describe('Test Google BigQuery V2, executeQuery', () => {
},
},
);
expect(transport.googleApiRequest).toHaveBeenCalledWith(
expect(transport.googleBigQueryApiRequestAllItems).toHaveBeenCalledWith(
'GET',
'/v2/projects/test-project/queries/job_123',
undefined,
{},
{ location: undefined, maxResults: 1000, timeoutMs: 10000 },
);

expect(result.finished).toEqual(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jest.mock('../../../v2/transport', () => {
const originalModule = jest.requireActual('../../../v2/transport');
return {
...originalModule,
googleApiRequest: jest.fn(async (method: IHttpRequestMethods, resource: string) => {
googleBigQueryApiRequest: jest.fn(async (method: IHttpRequestMethods, resource: string) => {
if (
resource ===
'/v2/projects/test-project/datasets/bigquery_node_dev_test_dataset/tables/num_text' &&
Expand Down Expand Up @@ -55,13 +55,13 @@ describe('Test Google BigQuery V2, insert auto map', () => {
const testNode = async (testData: WorkflowTestData, types: INodeTypes) => {
const { result } = await executeWorkflow(testData, types);

expect(transport.googleApiRequest).toHaveBeenCalledTimes(2);
expect(transport.googleApiRequest).toHaveBeenCalledWith(
expect(transport.googleBigQueryApiRequest).toHaveBeenCalledTimes(2);
expect(transport.googleBigQueryApiRequest).toHaveBeenCalledWith(
'GET',
'/v2/projects/test-project/datasets/bigquery_node_dev_test_dataset/tables/num_text',
{},
);
expect(transport.googleApiRequest).toHaveBeenCalledWith(
expect(transport.googleBigQueryApiRequest).toHaveBeenCalledWith(
'POST',
'/v2/projects/test-project/datasets/bigquery_node_dev_test_dataset/tables/num_text/insertAll',
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jest.mock('../../../v2/transport', () => {
const originalModule = jest.requireActual('../../../v2/transport');
return {
...originalModule,
googleApiRequest: jest.fn(async (method: IHttpRequestMethods, resource: string) => {
googleBigQueryApiRequest: jest.fn(async (method: IHttpRequestMethods, resource: string) => {
if (
resource ===
'/v2/projects/test-project/datasets/bigquery_node_dev_test_dataset/tables/test_json' &&
Expand Down Expand Up @@ -56,13 +56,13 @@ describe('Test Google BigQuery V2, insert define manually', () => {
const testNode = async (testData: WorkflowTestData, types: INodeTypes) => {
const { result } = await executeWorkflow(testData, types);

expect(transport.googleApiRequest).toHaveBeenCalledTimes(2);
expect(transport.googleApiRequest).toHaveBeenCalledWith(
expect(transport.googleBigQueryApiRequest).toHaveBeenCalledTimes(2);
expect(transport.googleBigQueryApiRequest).toHaveBeenCalledWith(
'GET',
'/v2/projects/test-project/datasets/bigquery_node_dev_test_dataset/tables/test_json',
{},
);
expect(transport.googleApiRequest).toHaveBeenCalledWith(
expect(transport.googleBigQueryApiRequest).toHaveBeenCalledWith(
'POST',
'/v2/projects/test-project/datasets/bigquery_node_dev_test_dataset/tables/test_json/insertAll',
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { ApplicationError, NodeOperationError, sleep } from 'n8n-workflow';
import type { ResponseWithJobReference } from '../../helpers/interfaces';

import { prepareOutput } from '../../helpers/utils';
import { googleApiRequest } from '../../transport';
import { googleBigQueryApiRequestAllItems, googleBigQueryApiRequest } from '../../transport';
import { getResolvables, updateDisplayOptions } from '@utils/utilities';

const properties: INodeProperties[] = [
Expand Down Expand Up @@ -108,18 +108,21 @@ const properties: INodeProperties[] = [
'Limits the bytes billed for this query. Queries with bytes billed above this limit will fail (without incurring a charge). String in <a href="https://developers.google.com/discovery/v1/type-format?utm_source=cloud.google.com&utm_medium=referral" target="_blank">Int64Value</a> format',
},
{
displayName: 'Max Results',
displayName: 'Max Results Per Page',
name: 'maxResults',
type: 'number',
default: 1000,
description: 'The maximum number of rows of data to return',
description:
'Maximum number of results to return per page of results. This is particularly useful when dealing with large datasets. It will not affect the total number of results returned, e.g. rows in a table. You can use LIMIT in your SQL query to limit the number of rows returned.',
},
{
displayName: 'Timeout',
name: 'timeoutMs',
type: 'number',
default: 10000,
description: 'How long to wait for the query to complete, in milliseconds',
hint: 'How long to wait for the query to complete, in milliseconds',
description:
'Specifies the maximum amount of time, in milliseconds, that the client is willing to wait for the query to complete. Be aware that the call is not guaranteed to wait for the specified timeout; it typically returns after around 200 seconds (200,000 milliseconds), even if the query is not complete.',
},
{
displayName: 'Raw Output',
Expand Down Expand Up @@ -154,19 +157,31 @@ const displayOptions = {
export const description = updateDisplayOptions(displayOptions, properties);

export async function execute(this: IExecuteFunctions): Promise<INodeExecutionData[]> {
// https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query

const items = this.getInputData();
const length = items.length;

const returnData: INodeExecutionData[] = [];

let jobs = [];
let maxResults = 1000;
let timeoutMs = 10000;

for (let i = 0; i < length; i++) {
try {
let sqlQuery = this.getNodeParameter('sqlQuery', i) as string;
const options = this.getNodeParameter('options', i);

const options = this.getNodeParameter('options', i) as {
defaultDataset?: string;
dryRun?: boolean;
includeSchema?: boolean;
location?: string;
maximumBytesBilled?: string;
maxResults?: number;
timeoutMs?: number;
rawOutput?: boolean;
useLegacySql?: boolean;
};

const projectId = this.getNodeParameter('projectId', i, undefined, {
extractValue: true,
});
Expand All @@ -179,15 +194,25 @@ export async function execute(this: IExecuteFunctions): Promise<INodeExecutionDa
let includeSchema = false;

if (options.rawOutput !== undefined) {
rawOutput = options.rawOutput as boolean;
rawOutput = options.rawOutput;
delete options.rawOutput;
}

if (options.includeSchema !== undefined) {
includeSchema = options.includeSchema as boolean;
includeSchema = options.includeSchema;
delete options.includeSchema;
}

if (options.maxResults) {
maxResults = options.maxResults;
delete options.maxResults;
}

if (options.timeoutMs) {
timeoutMs = options.timeoutMs;
delete options.timeoutMs;
}

const body: IDataObject = { ...options };

body.query = sqlQuery;
Expand All @@ -203,7 +228,8 @@ export async function execute(this: IExecuteFunctions): Promise<INodeExecutionDa
body.useLegacySql = false;
}

const response: ResponseWithJobReference = await googleApiRequest.call(
//https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert
const response: ResponseWithJobReference = await googleBigQueryApiRequest.call(
this,
'POST',
`/v2/projects/${projectId}/jobs`,
Expand All @@ -222,13 +248,14 @@ export async function execute(this: IExecuteFunctions): Promise<INodeExecutionDa
}

const jobId = response?.jobReference?.jobId;
const raw = rawOutput || (options.dryRun as boolean) || false;
const raw = rawOutput || options.dryRun || false;
const location = options.location || response.jobReference.location;

if (response.status?.state === 'DONE') {
const qs = { location };
const qs = { location, maxResults, timeoutMs };

const queryResponse: IDataObject = await googleApiRequest.call(
//https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/getQueryResults
const queryResponse: IDataObject = await googleBigQueryApiRequestAllItems.call(
this,
'GET',
`/v2/projects/${projectId}/queries/${jobId}`,
Expand Down Expand Up @@ -272,9 +299,13 @@ export async function execute(this: IExecuteFunctions): Promise<INodeExecutionDa

for (const job of jobs) {
try {
const qs = job.location ? { location: job.location } : {};
const qs: IDataObject = job.location ? { location: job.location } : {};

qs.maxResults = maxResults;
qs.timeoutMs = timeoutMs;

const response: IDataObject = await googleApiRequest.call(
//https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/getQueryResults
const response: IDataObject = await googleBigQueryApiRequestAllItems.call(
this,
'GET',
`/v2/projects/${job.projectId}/queries/${job.jobId}`,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { NodeOperationError } from 'n8n-workflow';
import { v4 as uuid } from 'uuid';
import type { TableSchema } from '../../helpers/interfaces';
import { checkSchema, wrapData } from '../../helpers/utils';
import { googleApiRequest } from '../../transport';
import { googleBigQueryApiRequest } from '../../transport';
import { generatePairedItemData, updateDisplayOptions } from '@utils/utilities';

const properties: INodeProperties[] = [
Expand Down Expand Up @@ -178,7 +178,7 @@ export async function execute(this: IExecuteFunctions): Promise<INodeExecutionDa
}

const schema = (
await googleApiRequest.call(
await googleBigQueryApiRequest.call(
this,
'GET',
`/v2/projects/${projectId}/datasets/${datasetId}/tables/${tableId}`,
Expand Down Expand Up @@ -230,7 +230,7 @@ export async function execute(this: IExecuteFunctions): Promise<INodeExecutionDa
const batch = rows.slice(i, i + batchSize);
body.rows = batch;

const responseData = await googleApiRequest.call(
const responseData = await googleBigQueryApiRequest.call(
this,
'POST',
`/v2/projects/${projectId}/datasets/${datasetId}/tables/${tableId}/insertAll`,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { IDataObject, ILoadOptionsFunctions, INodeListSearchResult } from 'n8n-workflow';
import { googleApiRequest } from '../transport';
import { googleBigQueryApiRequest } from '../transport';

export async function searchProjects(
this: ILoadOptionsFunctions,
Expand All @@ -10,7 +10,7 @@ export async function searchProjects(
pageToken: (paginationToken as string) || undefined,
};

const response = await googleApiRequest.call(this, 'GET', '/v2/projects', undefined, qs);
const response = await googleBigQueryApiRequest.call(this, 'GET', '/v2/projects', undefined, qs);

let { projects } = response;

Expand Down Expand Up @@ -45,7 +45,7 @@ export async function searchDatasets(
pageToken: (paginationToken as string) || undefined,
};

const response = await googleApiRequest.call(
const response = await googleBigQueryApiRequest.call(
this,
'GET',
`/v2/projects/${projectId}/datasets`,
Expand Down Expand Up @@ -87,7 +87,7 @@ export async function searchTables(
pageToken: (paginationToken as string) || undefined,
};

const response = await googleApiRequest.call(
const response = await googleBigQueryApiRequest.call(
this,
'GET',
`/v2/projects/${projectId}/datasets/${datasetId}/tables`,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import type { IDataObject, ILoadOptionsFunctions, INodePropertyOptions } from 'n8n-workflow';
import { googleApiRequest } from '../transport';
import { googleBigQueryApiRequest } from '../transport';

export async function getDatasets(this: ILoadOptionsFunctions): Promise<INodePropertyOptions[]> {
const projectId = this.getNodeParameter('projectId', undefined, {
extractValue: true,
});
const returnData: INodePropertyOptions[] = [];
const { datasets } = await googleApiRequest.call(
const { datasets } = await googleBigQueryApiRequest.call(
this,
'GET',
`/v2/projects/${projectId}/datasets`,
Expand All @@ -33,7 +33,7 @@ export async function getSchema(this: ILoadOptionsFunctions): Promise<INodePrope

const returnData: INodePropertyOptions[] = [];

const { schema } = await googleApiRequest.call(
const { schema } = await googleBigQueryApiRequest.call(
this,
'GET',
`/v2/projects/${projectId}/datasets/${datasetId}/tables/${tableId}`,
Expand Down
17 changes: 9 additions & 8 deletions packages/nodes-base/nodes/Google/BigQuery/v2/transport/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import type {
import { NodeApiError, NodeOperationError } from 'n8n-workflow';
import { getGoogleAccessToken } from '../../../GenericFunctions';

export async function googleApiRequest(
export async function googleBigQueryApiRequest(
this: IExecuteFunctions | ILoadOptionsFunctions,
method: IHttpRequestMethods,
resource: string,
Expand Down Expand Up @@ -67,25 +67,26 @@ export async function googleApiRequest(
}
}

export async function googleApiRequestAllItems(
export async function googleBigQueryApiRequestAllItems(
this: IExecuteFunctions | ILoadOptionsFunctions,
propertyName: string,
method: IHttpRequestMethods,
endpoint: string,
body: IDataObject = {},
query: IDataObject = {},
) {
const returnData: IDataObject[] = [];
let rows: IDataObject[] = [];

let responseData;
query.maxResults = 10000;
if (query.maxResults === undefined) {
query.maxResults = 1000;
}

do {
responseData = await googleApiRequest.call(this, method, endpoint, body, query);
responseData = await googleBigQueryApiRequest.call(this, method, endpoint, body, query);

query.pageToken = responseData.pageToken;
returnData.push.apply(returnData, responseData[propertyName] as IDataObject[]);
rows = rows.concat((responseData.rows as IDataObject[]) ?? []);
} while (responseData.pageToken !== undefined && responseData.pageToken !== '');

return returnData;
return { ...(responseData || {}), rows };
}
Loading