Skip to content

Commit

Permalink
[ML] File data viz limiting uploaded doc chunk size (#44768)
Browse files Browse the repository at this point in the history
* [ML] File data viz limitiing upload chunk size

* adding comments

* refactor

* fixing incorrect overwrite of array
  • Loading branch information
jgowdyelastic authored Sep 5, 2019
1 parent cd968d2 commit 7d22b74
Showing 1 changed file with 43 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import { chunk } from 'lodash';
import moment from 'moment';
import { i18n } from '@kbn/i18n';

const CHUNK_SIZE = 10000;
const CHUNK_SIZE = 5000;
const MAX_CHUNK_CHAR_COUNT = 1000000;
const IMPORT_RETRIES = 5;

export class Importer {
Expand All @@ -21,6 +22,7 @@ export class Importer {

this.data = [];
this.docArray = [];
this.docSizeArray = [];
}

async initializeImport(index) {
Expand Down Expand Up @@ -58,7 +60,7 @@ export class Importer {
};
}

const chunks = chunk(this.docArray, CHUNK_SIZE);
const chunks = createDocumentChunks(this.docArray);

const ingestPipeline = {
id: pipelineId,
Expand Down Expand Up @@ -86,13 +88,18 @@ export class Importer {
};

while (resp.success === false && retries > 0) {
resp = await ml.fileDatavisualizer.import(aggs);
try {
resp = await ml.fileDatavisualizer.import(aggs);

if (retries < IMPORT_RETRIES) {
console.log(`Retrying import ${IMPORT_RETRIES - retries}`);
}
if (retries < IMPORT_RETRIES) {
console.log(`Retrying import ${IMPORT_RETRIES - retries}`);
}

retries--;
retries--;
} catch (err) {
resp = { success: false, error: err };
retries = 0;
}
}

if (resp.success) {
Expand Down Expand Up @@ -152,3 +159,32 @@ function updatePipelineTimezone(ingestPipeline) {
}
}
}

function createDocumentChunks(docArray) {
const chunks = [];
// chop docArray into 5000 doc chunks
const tempChunks = chunk(docArray, CHUNK_SIZE);

// loop over tempChunks and check that the total character length
// for each chunk is within the MAX_CHUNK_CHAR_COUNT.
// if the length is too long, split the chunk into smaller chunks
// based on how much larger it is than MAX_CHUNK_CHAR_COUNT
// note, each document is a different size, so dividing by charCountOfDocs
// only produces an average chunk size that should be smaller than the max length
for (let i = 0; i < tempChunks.length; i++) {
const docs = tempChunks[i];
const numberOfDocs = docs.length;

const charCountOfDocs = JSON.stringify(docs).length;
if (charCountOfDocs > MAX_CHUNK_CHAR_COUNT) {
// calculate new chunk size which should produce a chunk
// who's length is on average around MAX_CHUNK_CHAR_COUNT
const adjustedChunkSize = Math.floor((MAX_CHUNK_CHAR_COUNT / charCountOfDocs) * numberOfDocs);
const smallerChunks = chunk(docs, adjustedChunkSize);
chunks.push(...smallerChunks);
} else {
chunks.push(docs);
}
}
return chunks;
}

0 comments on commit 7d22b74

Please sign in to comment.