diff --git a/packages/nodes-base/nodes/Elastic/Elasticsearch/Elasticsearch.node.ts b/packages/nodes-base/nodes/Elastic/Elasticsearch/Elasticsearch.node.ts index c209b84538bff..d94ac77725464 100644 --- a/packages/nodes-base/nodes/Elastic/Elasticsearch/Elasticsearch.node.ts +++ b/packages/nodes-base/nodes/Elastic/Elasticsearch/Elasticsearch.node.ts @@ -4,11 +4,16 @@ import type { INodeExecutionData, INodeType, INodeTypeDescription, + JsonObject, } from 'n8n-workflow'; -import { jsonParse } from 'n8n-workflow'; +import { jsonParse, NodeApiError } from 'n8n-workflow'; import omit from 'lodash/omit'; -import { elasticsearchApiRequest, elasticsearchApiRequestAllItems } from './GenericFunctions'; +import { + elasticsearchApiRequest, + elasticsearchApiRequestAllItems, + elasticsearchBulkApiRequest, +} from './GenericFunctions'; import { documentFields, documentOperations, indexFields, indexOperations } from './descriptions'; @@ -68,12 +73,14 @@ export class Elasticsearch implements INodeType { let responseData; + let bulkBody: IDataObject = {}; + for (let i = 0; i < items.length; i++) { + const bulkOperation = this.getNodeParameter('options.bulkOperation', i, false); if (resource === 'document') { // ********************************************************************** // document // ********************************************************************** - if (operation === 'delete') { // ---------------------------------------- // document: delete @@ -84,8 +91,17 @@ export class Elasticsearch implements INodeType { const indexId = this.getNodeParameter('indexId', i); const documentId = this.getNodeParameter('documentId', i); - const endpoint = `/${indexId}/_doc/${documentId}`; - responseData = await elasticsearchApiRequest.call(this, 'DELETE', endpoint); + if (bulkOperation) { + bulkBody[i] = JSON.stringify({ + delete: { + _index: indexId, + _id: documentId, + }, + }); + } else { + const endpoint = `/${indexId}/_doc/${documentId}`; + responseData = await elasticsearchApiRequest.call(this, 'DELETE', endpoint); + } } else if (operation === 'get') { // ---------------------------------------- // document: get @@ -223,12 +239,22 @@ export class Elasticsearch implements INodeType { const indexId = this.getNodeParameter('indexId', i); const { documentId } = additionalFields; - if (documentId) { - const endpoint = `/${indexId}/_doc/${documentId}`; - responseData = await elasticsearchApiRequest.call(this, 'PUT', endpoint, body); + if (bulkOperation) { + bulkBody[i] = JSON.stringify({ + index: { + _index: indexId, + _id: documentId, + }, + }); + bulkBody[i] += `\n${JSON.stringify(body)}`; } else { - const endpoint = `/${indexId}/_doc`; - responseData = await elasticsearchApiRequest.call(this, 'POST', endpoint, body); + if (documentId) { + const endpoint = `/${indexId}/_doc/${documentId}`; + responseData = await elasticsearchApiRequest.call(this, 'PUT', endpoint, body); + } else { + const endpoint = `/${indexId}/_doc`; + responseData = await elasticsearchApiRequest.call(this, 'POST', endpoint, body); + } } } else if (operation === 'update') { // ---------------------------------------- @@ -261,7 +287,17 @@ export class Elasticsearch implements INodeType { const documentId = this.getNodeParameter('documentId', i); const endpoint = `/${indexId}/_update/${documentId}`; - responseData = await elasticsearchApiRequest.call(this, 'POST', endpoint, body); + if (bulkOperation) { + bulkBody[i] = JSON.stringify({ + update: { + _index: indexId, + _id: documentId, + }, + }); + bulkBody[i] += `\n${JSON.stringify(body)}`; + } else { + responseData = await elasticsearchApiRequest.call(this, 'POST', endpoint, body); + } } } else if (resource === 'index') { // ********************************************************************** @@ -341,13 +377,80 @@ export class Elasticsearch implements INodeType { } } } - const executionData = this.helpers.constructExecutionMetaData( - this.helpers.returnJsonArray(responseData as IDataObject[]), - { itemData: { item: i } }, - ); - returnData.push(...executionData); - } + if (!bulkOperation) { + const executionData = this.helpers.constructExecutionMetaData( + this.helpers.returnJsonArray(responseData as IDataObject[]), + { itemData: { item: i } }, + ); + returnData.push(...executionData); + } + if (Object.keys(bulkBody).length >= 50) { + responseData = (await elasticsearchBulkApiRequest.call(this, bulkBody)) as IDataObject[]; + for (let j = 0; j < responseData.length; j++) { + const itemData = responseData[j]; + if (itemData.error) { + const errorData = itemData.error as IDataObject; + const message = errorData.type as string; + const description = errorData.reason as string; + const itemIndex = parseInt(Object.keys(bulkBody)[j]); + if (this.continueOnFail()) { + returnData.push( + ...this.helpers.constructExecutionMetaData( + this.helpers.returnJsonArray({ error: message, message: itemData.error }), + { itemData: { item: itemIndex } }, + ), + ); + continue; + } else { + throw new NodeApiError(this.getNode(), { + message, + description, + itemIndex, + } as JsonObject); + } + } + const executionData = this.helpers.constructExecutionMetaData( + this.helpers.returnJsonArray(itemData), + { itemData: { item: parseInt(Object.keys(bulkBody)[j]) } }, + ); + returnData.push(...executionData); + } + bulkBody = {}; + } + } + if (Object.keys(bulkBody).length) { + responseData = (await elasticsearchBulkApiRequest.call(this, bulkBody)) as IDataObject[]; + for (let j = 0; j < responseData.length; j++) { + const itemData = responseData[j]; + if (itemData.error) { + const errorData = itemData.error as IDataObject; + const message = errorData.type as string; + const description = errorData.reason as string; + const itemIndex = parseInt(Object.keys(bulkBody)[j]); + if (this.continueOnFail()) { + returnData.push( + ...this.helpers.constructExecutionMetaData( + this.helpers.returnJsonArray({ error: message, message: itemData.error }), + { itemData: { item: itemIndex } }, + ), + ); + continue; + } else { + throw new NodeApiError(this.getNode(), { + message, + description, + itemIndex, + } as JsonObject); + } + } + const executionData = this.helpers.constructExecutionMetaData( + this.helpers.returnJsonArray(itemData), + { itemData: { item: parseInt(Object.keys(bulkBody)[j]) } }, + ); + returnData.push(...executionData); + } + } return [returnData]; } } diff --git a/packages/nodes-base/nodes/Elastic/Elasticsearch/GenericFunctions.ts b/packages/nodes-base/nodes/Elastic/Elasticsearch/GenericFunctions.ts index ed013079f8cc6..642ce1484fe4f 100644 --- a/packages/nodes-base/nodes/Elastic/Elasticsearch/GenericFunctions.ts +++ b/packages/nodes-base/nodes/Elastic/Elasticsearch/GenericFunctions.ts @@ -2,13 +2,55 @@ import type { IExecuteFunctions, IDataObject, JsonObject, - IRequestOptions, + IHttpRequestOptions, IHttpRequestMethods, } from 'n8n-workflow'; import { NodeApiError } from 'n8n-workflow'; import type { ElasticsearchApiCredentials } from './types'; +export async function elasticsearchBulkApiRequest(this: IExecuteFunctions, body: IDataObject) { + const { baseUrl, ignoreSSLIssues } = (await this.getCredentials( + 'elasticsearchApi', + )) as ElasticsearchApiCredentials; + + const bulkBody = Object.values(body).flat().join('\n') + '\n'; + + const options: IHttpRequestOptions = { + method: 'POST', + headers: { 'Content-Type': 'application/x-ndjson' }, + body: bulkBody, + url: `${baseUrl}/_bulk`, + skipSslCertificateValidation: ignoreSSLIssues, + returnFullResponse: true, + ignoreHttpStatusErrors: true, + }; + + const response = await this.helpers.httpRequestWithAuthentication.call( + this, + 'elasticsearchApi', + options, + ); + + if (response.statusCode > 299) { + if (this.continueOnFail()) { + return Object.values(body).map((_) => ({ error: response.body.error })); + } else { + throw new NodeApiError(this.getNode(), { error: response.body.error } as JsonObject); + } + } + + return response.body.items.map((item: IDataObject) => { + return { + ...(item.index as IDataObject), + ...(item.update as IDataObject), + ...(item.create as IDataObject), + ...(item.delete as IDataObject), + ...(item.error as IDataObject), + }; + }); +} + export async function elasticsearchApiRequest( this: IExecuteFunctions, method: IHttpRequestMethods, @@ -20,13 +62,13 @@ export async function elasticsearchApiRequest( 'elasticsearchApi', )) as ElasticsearchApiCredentials; - const options: IRequestOptions = { + const options: IHttpRequestOptions = { method, body, qs, - uri: `${baseUrl}${endpoint}`, + url: `${baseUrl}${endpoint}`, json: true, - rejectUnauthorized: !ignoreSSLIssues, + skipSslCertificateValidation: ignoreSSLIssues, }; if (!Object.keys(body).length) { diff --git a/packages/nodes-base/nodes/Elastic/Elasticsearch/descriptions/DocumentDescription.ts b/packages/nodes-base/nodes/Elastic/Elasticsearch/descriptions/DocumentDescription.ts index 86cefdf4bd16a..3a446243ace73 100644 --- a/packages/nodes-base/nodes/Elastic/Elasticsearch/descriptions/DocumentDescription.ts +++ b/packages/nodes-base/nodes/Elastic/Elasticsearch/descriptions/DocumentDescription.ts @@ -81,6 +81,28 @@ export const documentFields: INodeProperties[] = [ }, }, }, + { + displayName: 'Options', + name: 'options', + type: 'collection', + placeholder: 'Add Option', + default: {}, + displayOptions: { + show: { + resource: ['document'], + operation: ['delete'], + }, + }, + options: [ + { + displayName: 'Bulk Delete', + name: 'bulkOperation', + type: 'boolean', + default: false, + description: 'Whether to use the bulk operation to delete the document/s', + }, + ], + }, // ---------------------------------------- // document: get @@ -644,6 +666,13 @@ export const documentFields: INodeProperties[] = [ }, }, options: [ + { + displayName: 'Bulk Create', + name: 'bulkOperation', + type: 'boolean', + default: false, + description: 'Whether to use the bulk operation to create the document/s', + }, { displayName: 'Pipeline ID', name: 'pipeline', @@ -802,6 +831,13 @@ export const documentFields: INodeProperties[] = [ }, }, options: [ + { + displayName: 'Bulk Update', + name: 'bulkOperation', + type: 'boolean', + default: false, + description: 'Whether to use the bulk operation to update the document/s', + }, { displayName: 'Refresh', name: 'refresh',