From 27d59605876f9f3af2e7b71e0ea6d66c718b1013 Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Mon, 22 Aug 2022 22:19:49 +0200 Subject: [PATCH 1/9] transform maxClauses fix Signed-off-by: Petar Dzepina --- .../transform/TransformRunner.kt | 67 +++- .../transform/TransformSearchService.kt | 12 +- .../indexmanagement/util/IndexUtils.kt | 1 + .../transform/TransformRunnerIT.kt | 348 ------------------ 4 files changed, 68 insertions(+), 360 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt index 07e38fb11..7cb8ef648 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt @@ -134,7 +134,7 @@ object TransformRunner : currentMetadata = validatedMetadata return } - if (transform.continuous && (bucketsToTransform.shardsToSearch == null || bucketsToTransform.currentShard != null)) { + if (transform.continuous) { // If we have not populated the list of shards to search, do so now if (bucketsToTransform.shardsToSearch == null) { // Note the timestamp when we got the shard global checkpoints to the user may know what data is included @@ -145,10 +145,22 @@ object TransformRunner : newGlobalCheckpoints ) } - bucketsToTransform = getBucketsToTransformIteration(transform, bucketsToTransform) - currentMetadata = bucketsToTransform.metadata + // If there are shards to search do it here + if (bucketsToTransform.currentShard != null) { + // Computes aggregation on modified documents for current shard to get modified buckets + getBucketsToTransformIteration(transform, bucketsToTransform).let { + (first, second) -> + bucketsToTransform = first + currentMetadata = second + } + currentMetadata = recomputeModifiedBuckets(transform, currentMetadata, bucketsToTransform.modifiedBuckets) + currentMetadata = transformMetadataService.writeMetadata(currentMetadata, true) + bucketsToTransform = bucketsToTransform.copy(metadata = currentMetadata) + attemptedToIndex = true + } } else { currentMetadata = executeTransformIteration(transform, currentMetadata, bucketsToTransform.modifiedBuckets) + currentMetadata = transformMetadataService.writeMetadata(currentMetadata, true) attemptedToIndex = true } // we attempt to renew lock for every loop of transform @@ -176,6 +188,7 @@ object TransformRunner : continuousStats = ContinuousTransformStats(newGlobalCheckpointTime, null) ) } + logger.info("executeJob: writting metadata seqNo:${currentMetadata.seqNo}, primaryTerm:${currentMetadata.primaryTerm}") transformMetadataService.writeMetadata(currentMetadata, true) if (!transform.continuous || currentMetadata.status == TransformMetadata.Status.FAILED) { logger.info("Disabling the transform job ${transform.id}") @@ -186,9 +199,11 @@ object TransformRunner : } } - private suspend fun getBucketsToTransformIteration(transform: Transform, bucketsToTransform: BucketsToTransform): BucketsToTransform { + private suspend fun getBucketsToTransformIteration(transform: Transform, bucketsToTransform: BucketsToTransform): Pair { var currentBucketsToTransform = bucketsToTransform val currentShard = bucketsToTransform.currentShard + // Clear modified buckets from previous iteration + currentBucketsToTransform.modifiedBuckets.clear() if (currentShard != null) { val shardLevelModifiedBuckets = withTransformSecurityContext(transform) { @@ -217,7 +232,7 @@ object TransformRunner : currentBucketsToTransform.copy(currentShard = null) } } - return currentBucketsToTransform + return Pair(currentBucketsToTransform, currentBucketsToTransform.metadata) } private suspend fun validateTransform(transform: Transform, transformMetadata: TransformMetadata): TransformMetadata { @@ -227,6 +242,7 @@ object TransformRunner : return if (!validationResult.isValid) { val failureMessage = "Failed validation - ${validationResult.issues}" val failureMetadata = transformMetadata.copy(status = TransformMetadata.Status.FAILED, failureReason = failureMessage) + logger.info("validateFailed: writting metadata seqNo:${failureMetadata.seqNo}, primaryTerm:${failureMetadata.primaryTerm}") transformMetadataService.writeMetadata(failureMetadata, true) } else transformMetadata } @@ -241,14 +257,43 @@ object TransformRunner : metadata: TransformMetadata, modifiedBuckets: MutableSet> ): TransformMetadata { - val updatedMetadata = if (!transform.continuous || modifiedBuckets.isNotEmpty()) { + + val transformSearchResult = withTransformSecurityContext(transform) { + transformSearchService.executeCompositeSearch( + transform, + if (transform.continuous) null else metadata.afterKey, + if (transform.continuous) modifiedBuckets else null + ) + } + val indexTimeInMillis = withTransformSecurityContext(transform) { + transformIndexer.index(transformSearchResult.docsToIndex) + } + val afterKey = transformSearchResult.afterKey + val stats = transformSearchResult.stats + val updatedStats = stats.copy( + pagesProcessed = if (transform.continuous) 0 else stats.pagesProcessed, + indexTimeInMillis = stats.indexTimeInMillis + indexTimeInMillis, + documentsIndexed = transformSearchResult.docsToIndex.size.toLong() + ) + return metadata.mergeStats(updatedStats).copy( + afterKey = afterKey, + lastUpdatedAt = Instant.now(), + status = if (afterKey == null) TransformMetadata.Status.FINISHED else TransformMetadata.Status.STARTED + ) + } + + private suspend fun recomputeModifiedBuckets( + transform: Transform, + metadata: TransformMetadata, + modifiedBuckets: MutableSet> + ): TransformMetadata { + val updatedMetadata = if (modifiedBuckets.isNotEmpty()) { val transformSearchResult = withTransformSecurityContext(transform) { - transformSearchService.executeCompositeSearch(transform, metadata.afterKey, if (transform.continuous) modifiedBuckets else null) + transformSearchService.executeCompositeSearch(transform, null, modifiedBuckets) } val indexTimeInMillis = withTransformSecurityContext(transform) { transformIndexer.index(transformSearchResult.docsToIndex) } - val afterKey = transformSearchResult.afterKey val stats = transformSearchResult.stats val updatedStats = stats.copy( pagesProcessed = if (transform.continuous) 0 else stats.pagesProcessed, @@ -256,12 +301,12 @@ object TransformRunner : documentsIndexed = transformSearchResult.docsToIndex.size.toLong() ) metadata.mergeStats(updatedStats).copy( - afterKey = afterKey, lastUpdatedAt = Instant.now(), - status = if (afterKey == null && !transform.continuous) TransformMetadata.Status.FINISHED else TransformMetadata.Status.STARTED + status = TransformMetadata.Status.STARTED ) } else metadata.copy(lastUpdatedAt = Instant.now(), status = TransformMetadata.Status.STARTED) - return transformMetadataService.writeMetadata(updatedMetadata, true) + logger.info("recomputeModifiedBuckets: writting metadata seqNo:${updatedMetadata.seqNo}, primaryTerm:${updatedMetadata.primaryTerm}") + return updatedMetadata } private suspend fun withTransformSecurityContext(transform: Transform, block: suspend CoroutineScope.() -> T): T { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt index 57090c18c..73d8e24fa 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt @@ -40,6 +40,7 @@ import org.opensearch.indexmanagement.transform.model.TransformSearchResult import org.opensearch.indexmanagement.transform.model.TransformStats import org.opensearch.indexmanagement.transform.settings.TransformSettings.Companion.TRANSFORM_JOB_SEARCH_BACKOFF_COUNT import org.opensearch.indexmanagement.transform.settings.TransformSettings.Companion.TRANSFORM_JOB_SEARCH_BACKOFF_MILLIS +import org.opensearch.indexmanagement.util.IndexUtils.Companion.LUCENE_MAX_CLAUSES import org.opensearch.indexmanagement.util.IndexUtils.Companion.ODFE_MAGIC_NULL import org.opensearch.indexmanagement.util.IndexUtils.Companion.hashToFixedSize import org.opensearch.rest.RestStatus @@ -112,10 +113,11 @@ class TransformSearchService( suspend fun getShardLevelModifiedBuckets(transform: Transform, afterKey: Map?, currentShard: ShardNewDocuments): BucketSearchResult { try { var retryAttempt = 0 + val pageSize = calculateMaxPageSize(transform) val searchResponse = backoffPolicy.retry(logger) { val pageSizeDecay = 2f.pow(retryAttempt++) client.suspendUntil { listener: ActionListener -> - val pageSize = max(1, transform.pageSize.div(pageSizeDecay.toInt())) + val pageSize = max(1, pageSize.div(pageSizeDecay.toInt())) if (retryAttempt > 1) { logger.debug( "Attempt [${retryAttempt - 1}] to get modified buckets for transform [${transform.id}]. Attempting " + @@ -139,6 +141,14 @@ class TransformSearchService( } } + /** + * Apache Lucene has maxClauses limit which we could trip during recomputing of modified buckets + * due to matching too many bucket fields. To avoid this, we control how many buckets we recompute at a time(pageSize) + */ + private fun calculateMaxPageSize(transform: Transform): Int { + return minOf(transform.pageSize, LUCENE_MAX_CLAUSES / (transform.groups.size + 1)) + } + @Suppress("RethrowCaughtException") suspend fun executeCompositeSearch( transform: Transform, diff --git a/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt index db1fe9e73..31a7ea367 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt @@ -33,6 +33,7 @@ class IndexUtils { const val SCHEMA_VERSION = "schema_version" const val DEFAULT_SCHEMA_VERSION = 1L const val ODFE_MAGIC_NULL = "#ODFE-MAGIC-NULL-MAGIC-ODFE#" + const val LUCENE_MAX_CLAUSES = 1024 private const val BYTE_ARRAY_SIZE = 16 private const val DOCUMENT_ID_SEED = 72390L diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt index f1fafb996..0ba346daa 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt @@ -8,7 +8,6 @@ package org.opensearch.indexmanagement.transform import org.apache.http.entity.ContentType import org.apache.http.entity.StringEntity import org.opensearch.common.settings.Settings -import org.opensearch.index.query.TermQueryBuilder import org.opensearch.indexmanagement.common.model.dimension.DateHistogram import org.opensearch.indexmanagement.common.model.dimension.Histogram import org.opensearch.indexmanagement.common.model.dimension.Terms @@ -23,185 +22,11 @@ import org.opensearch.script.ScriptType import org.opensearch.search.aggregations.AggregationBuilders import org.opensearch.search.aggregations.AggregatorFactories import org.opensearch.search.aggregations.metrics.ScriptedMetricAggregationBuilder -import java.lang.Integer.min import java.time.Instant import java.time.temporal.ChronoUnit class TransformRunnerIT : TransformRestTestCase() { - fun `test transform`() { - validateSourceIndex("transform-source-index") - - val transform = Transform( - id = "id_1", - schemaVersion = 1L, - enabled = true, - enabledAt = Instant.now(), - updatedAt = Instant.now(), - jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), - description = "test transform", - metadataId = null, - sourceIndex = "transform-source-index", - targetIndex = "transform-target-index", - roles = emptyList(), - pageSize = 100, - groups = listOf( - Terms(sourceField = "store_and_fwd_flag", targetField = "flag") - ) - ).let { createTransform(it, it.id) } - - updateTransformStartTime(transform) - - waitFor { assertTrue("Target transform index was not created", indexExists(transform.targetIndex)) } - - val metadata = waitFor { - val job = getTransform(transformId = transform.id) - assertNotNull("Transform job doesn't have metadata set", job.metadataId) - val transformMetadata = getTransformMetadata(job.metadataId!!) - assertEquals("Transform has not finished", TransformMetadata.Status.FINISHED, transformMetadata.status) - transformMetadata - } - - assertEquals("More than expected pages processed", 2L, metadata.stats.pagesProcessed) - assertEquals("More than expected documents indexed", 2L, metadata.stats.documentsIndexed) - assertEquals("More than expected documents processed", 5000L, metadata.stats.documentsProcessed) - assertTrue("Doesn't capture indexed time", metadata.stats.indexTimeInMillis > 0) - assertTrue("Didn't capture search time", metadata.stats.searchTimeInMillis > 0) - } - - fun `test transform with data filter`() { - validateSourceIndex("transform-source-index") - - val transform = Transform( - id = "id_2", - schemaVersion = 1L, - enabled = true, - enabledAt = Instant.now(), - updatedAt = Instant.now(), - jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), - description = "test transform", - metadataId = null, - sourceIndex = "transform-source-index", - targetIndex = "transform-target-index", - roles = emptyList(), - pageSize = 100, - groups = listOf( - Terms(sourceField = "store_and_fwd_flag", targetField = "flag") - ), - dataSelectionQuery = TermQueryBuilder("store_and_fwd_flag", "N") - ).let { createTransform(it, it.id) } - - updateTransformStartTime(transform) - - val metadata = waitFor { - val job = getTransform(transformId = transform.id) - assertNotNull("Transform job doesn't have metadata set", job.metadataId) - val transformMetadata = getTransformMetadata(job.metadataId!!) - assertEquals("Transform has not finished", TransformMetadata.Status.FINISHED, transformMetadata.status) - transformMetadata - } - - assertEquals("More than expected pages processed", 2L, metadata.stats.pagesProcessed) - assertEquals("More than expected documents indexed", 1L, metadata.stats.documentsIndexed) - assertEquals("More than expected documents processed", 4977L, metadata.stats.documentsProcessed) - assertTrue("Doesn't capture indexed time", metadata.stats.indexTimeInMillis > 0) - assertTrue("Didn't capture search time", metadata.stats.searchTimeInMillis > 0) - } - - fun `test invalid transform`() { - // With invalid mapping - val transform = randomTransform().copy(enabled = true, jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES)) - createTransform(transform, transform.id) - deleteIndex(transform.sourceIndex) - - updateTransformStartTime(transform) - - val metadata = waitFor { - val job = getTransform(transformId = transform.id) - assertNotNull("Transform job doesn't have metadata set", job.metadataId) - val transformMetadata = getTransformMetadata(job.metadataId!!) - assertEquals("Transform has not failed", TransformMetadata.Status.FAILED, transformMetadata.status) - transformMetadata - } - - assertTrue("Expected failure message to be present", !metadata.failureReason.isNullOrBlank()) - } - - fun `test transform with aggregations`() { - validateSourceIndex("transform-source-index") - - val aggregatorFactories = AggregatorFactories.builder() - aggregatorFactories.addAggregator(AggregationBuilders.sum("revenue").field("total_amount")) - aggregatorFactories.addAggregator(AggregationBuilders.max("min_fare").field("fare_amount")) - aggregatorFactories.addAggregator(AggregationBuilders.min("max_fare").field("fare_amount")) - aggregatorFactories.addAggregator(AggregationBuilders.avg("avg_fare").field("fare_amount")) - aggregatorFactories.addAggregator(AggregationBuilders.count("count").field("orderID")) - aggregatorFactories.addAggregator(AggregationBuilders.percentiles("passenger_distribution").percentiles(90.0, 95.0).field("passenger_count")) - aggregatorFactories.addAggregator( - ScriptedMetricAggregationBuilder("average_revenue_per_passenger_per_trip") - .initScript(Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, "state.count = 0; state.sum = 0;", emptyMap())) - .mapScript( - Script( - ScriptType.INLINE, - Script.DEFAULT_SCRIPT_LANG, - "state.sum += doc[\"total_amount\"].value; state.count += doc[\"passenger_count\"].value", - emptyMap() - ) - ) - .combineScript( - Script( - ScriptType.INLINE, - Script.DEFAULT_SCRIPT_LANG, - "def d = new long[2]; d[0] = state.sum; d[1] = state.count; return d", - emptyMap() - ) - ) - .reduceScript( - Script( - ScriptType.INLINE, - Script.DEFAULT_SCRIPT_LANG, - "double sum = 0; double count = 0; for (a in states) { sum += a[0]; count += a[1]; } return sum/count", - emptyMap() - ) - ) - ) - - val transform = Transform( - id = "id_4", - schemaVersion = 1L, - enabled = true, - enabledAt = Instant.now(), - updatedAt = Instant.now(), - jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), - description = "test transform", - metadataId = null, - sourceIndex = "transform-source-index", - targetIndex = "transform-target-index", - roles = emptyList(), - pageSize = 1, - groups = listOf( - Terms(sourceField = "store_and_fwd_flag", targetField = "flag") - ), - aggregations = aggregatorFactories - ).let { createTransform(it, it.id) } - - updateTransformStartTime(transform) - - val metadata = waitFor { - val job = getTransform(transformId = transform.id) - assertNotNull("Transform job doesn't have metadata set", job.metadataId) - val transformMetadata = getTransformMetadata(job.metadataId!!) - assertEquals("Transform has not finished", TransformMetadata.Status.FINISHED, transformMetadata.status) - transformMetadata - } - - assertEquals("More than expected pages processed", 3L, metadata.stats.pagesProcessed) - assertEquals("More than expected documents indexed", 2L, metadata.stats.documentsIndexed) - assertEquals("More than expected documents processed", 5000L, metadata.stats.documentsProcessed) - assertTrue("Doesn't capture indexed time", metadata.stats.indexTimeInMillis > 0) - assertTrue("Didn't capture search time", metadata.stats.searchTimeInMillis > 0) - } - fun `test transform with failure during indexing`() { validateSourceIndex("transform-source-index") @@ -777,179 +602,6 @@ class TransformRunnerIT : TransformRestTestCase() { disableTransform(transform.id) } - fun `test continuous transforms with null buckets`() { - val sourceIndex = "null-bucket-source-index" - createIndex(sourceIndex, Settings.EMPTY, """"properties":{"iterating_id":{"type":"integer"},"term_id":{"type":"keyword"},"twice_id":{"type":"integer"}}""") - for (i in 0..12) { - val jsonString = "{\"iterating_id\": \"$i\",\"term_id\": \"${i % 5}\",\"twice_id\": \"${i * 2}\"}" - insertSampleData(sourceIndex, 1, jsonString = jsonString) - val idNullJsonString = "{\"iterating_id\": null,\"term_id\": \"${i % 5}\",\"twice_id\": \"${i * 2}\"}" - insertSampleData(sourceIndex, 1, jsonString = idNullJsonString) - val termNullJsonString = "{\"iterating_id\": \"$i\",\"term_id\": null,\"twice_id\": \"${i * 2}\"}" - insertSampleData(sourceIndex, 1, jsonString = termNullJsonString) - val bothNullJsonString = "{\"iterating_id\": null,\"term_id\": null,\"twice_id\": \"${i * 2}\"}" - insertSampleData(sourceIndex, 1, jsonString = bothNullJsonString) - } - - val aggregatorFactories = AggregatorFactories.builder() - aggregatorFactories.addAggregator(AggregationBuilders.sum("twice_id_sum").field("twice_id")) - - val transform = Transform( - id = "id_12", - schemaVersion = 1L, - enabled = true, - enabledAt = Instant.now(), - updatedAt = Instant.now(), - jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), - description = "test transform", - metadataId = null, - sourceIndex = sourceIndex, - targetIndex = "null-bucket-target-index", - roles = emptyList(), - pageSize = 100, - groups = listOf( - Histogram(sourceField = "iterating_id", targetField = "id_group", interval = 5.0), - Terms(sourceField = "term_id", targetField = "id_term") - ), - continuous = true, - aggregations = aggregatorFactories - ).let { createTransform(it, it.id) } - - updateTransformStartTime(transform) - - waitFor { assertTrue("Target transform index was not created", indexExists(transform.targetIndex)) } - - val firstIterationMetadata = waitFor { - val job = getTransform(transformId = transform.id) - assertNotNull("Transform job doesn't have metadata set", job.metadataId) - val transformMetadata = getTransformMetadata(job.metadataId!!) - assertEquals("Transform did not complete iteration or had incorrect number of documents processed", 52, transformMetadata.stats.documentsProcessed) - assertEquals("Transform did not complete iteration", null, transformMetadata.afterKey) - transformMetadata - } - - assertEquals("Not the expected transform status", TransformMetadata.Status.STARTED, firstIterationMetadata.status) - assertEquals("Not the expected pages processed", 2L, firstIterationMetadata.stats.pagesProcessed) - assertEquals("Not the expected documents indexed", 22L, firstIterationMetadata.stats.documentsIndexed) - assertEquals("Not the expected documents processed", 52L, firstIterationMetadata.stats.documentsProcessed) - assertTrue("Doesn't capture indexed time", firstIterationMetadata.stats.indexTimeInMillis > 0) - assertTrue("Didn't capture search time", firstIterationMetadata.stats.searchTimeInMillis > 0) - - // Get all of the buckets - var hits = waitFor { - val response = client().makeRequest( - "GET", "${transform.targetIndex}/_search", - StringEntity("{\"size\": 25}", ContentType.APPLICATION_JSON) - ) - assertEquals("Request failed", RestStatus.OK, response.restStatus()) - val responseHits = response.asMap().getValue("hits") as Map<*, *> - val totalDocs = (responseHits["hits"] as ArrayList<*>).fold(0) { sum, bucket -> - val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["transform._doc_count"] as Int - sum + docCount - } - assertEquals("Not all documents included in the transform target index", 52, totalDocs) - - responseHits["hits"] as ArrayList<*> - } - - // Validate the buckets include the correct information - hits.forEach { - val bucket = ((it as Map<*, *>)["_source"] as Map<*, *>) - val idGroup = (bucket["id_group"] as Double?)?.toInt() - val idTerm = (bucket["id_term"] as String?)?.toInt() - if (idGroup == null) { - if (idTerm == null) { - val expectedSum = (0..(24) step 2).sum() - assertEquals("ID sum not calculated correctly", expectedSum, (bucket["twice_id_sum"] as Double).toInt()) - } else { - val expectedSum = ((idTerm * 2)..(24) step 10).sum() - assertEquals("ID sum not calculated correctly", expectedSum, (bucket["twice_id_sum"] as Double).toInt()) - } - } else if (idTerm == null) { - // use the min to get the correct sum for the half full top bucket - val expectedSum = ((idGroup * 2)..(min(idGroup * 2 + 8, 24)) step 2).sum() - assertEquals("ID sum not calculated correctly", expectedSum, (bucket["twice_id_sum"] as Double).toInt()) - } else { - val expectedSum = idGroup * 2 + idTerm * 2 - assertEquals("ID sum not calculated correctly", expectedSum, (bucket["twice_id_sum"] as Double).toInt()) - } - } - - // Add more data, don't add any to the (null, null) bucket to check that it won't be updated without new data - for (i in 13..24) { - val jsonString = "{\"iterating_id\": \"$i\",\"term_id\": \"${i % 5}\",\"twice_id\": \"${i * 2}\"}" - insertSampleData(sourceIndex, 1, jsonString = jsonString) - val idNullJsonString = "{\"iterating_id\": null,\"term_id\": \"${i % 5}\",\"twice_id\": \"${i * 2}\"}" - insertSampleData(sourceIndex, 1, jsonString = idNullJsonString) - val termNullJsonString = "{\"iterating_id\": \"$i\",\"term_id\": null,\"twice_id\": \"${i * 2}\"}" - insertSampleData(sourceIndex, 1, jsonString = termNullJsonString) - } - - waitFor { - val documentsBehind = getTransformDocumentsBehind(transform.id)[transform.sourceIndex] - assertEquals("Documents behind not calculated correctly", 36, documentsBehind) - } - - updateTransformStartTime(transform) - - val secondIterationMetadata = waitFor { - val job = getTransform(transformId = transform.id) - assertNotNull("Transform job doesn't have metadata set", job.metadataId) - val transformMetadata = getTransformMetadata(job.metadataId!!) - assertEquals("Transform did not complete iteration or had incorrect number of documents processed", 104L, transformMetadata.stats.documentsProcessed) - assertEquals("Transform did not have null afterKey after iteration", null, transformMetadata.afterKey) - transformMetadata - } - - assertEquals("Not the expected transform status", TransformMetadata.Status.STARTED, secondIterationMetadata.status) - assertEquals("More than expected pages processed", 4L, secondIterationMetadata.stats.pagesProcessed) - assertEquals("More than expected documents indexed", 42L, secondIterationMetadata.stats.documentsIndexed) - assertEquals("Not the expected documents processed", 104L, secondIterationMetadata.stats.documentsProcessed) - assertTrue("Doesn't capture indexed time", secondIterationMetadata.stats.indexTimeInMillis > firstIterationMetadata.stats.indexTimeInMillis) - assertTrue("Didn't capture search time", secondIterationMetadata.stats.searchTimeInMillis > firstIterationMetadata.stats.searchTimeInMillis) - - disableTransform(transform.id) - - hits = waitFor { - val response = client().makeRequest( - "GET", "${transform.targetIndex}/_search", - StringEntity("{\"size\": 40}", ContentType.APPLICATION_JSON) - ) - assertEquals("Request failed", RestStatus.OK, response.restStatus()) - val responseHits = response.asMap().getValue("hits") as Map<*, *> - val totalDocs = (responseHits["hits"] as ArrayList<*>).fold(0) { sum, bucket -> - val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["transform._doc_count"] as Int - sum + docCount - } - assertEquals("Not all documents included in the transform target index", 88, totalDocs) - - responseHits["hits"] as ArrayList<*> - } - - // Validate the buckets include the correct information - hits.forEach { - val bucket = ((it as Map<*, *>)["_source"] as Map<*, *>) - val idGroup = (bucket["id_group"] as Double?)?.toInt() - val idTerm = (bucket["id_term"] as String?)?.toInt() - if (idGroup == null) { - if (idTerm == null) { - val expectedSum = (0..(24) step 2).sum() - assertEquals("ID sum not calculated correctly", expectedSum, (bucket["twice_id_sum"] as Double).toInt()) - } else { - val expectedSum = ((idTerm * 2)..(48) step 10).sum() - assertEquals("ID sum not calculated correctly", expectedSum, (bucket["twice_id_sum"] as Double).toInt()) - } - } else if (idTerm == null) { - // use the min to get the correct sum for the half full top bucket - val expectedSum = ((idGroup * 2)..(idGroup * 2 + 8) step 2).sum() - assertEquals("ID sum not calculated correctly", expectedSum, (bucket["twice_id_sum"] as Double).toInt()) - } else { - val expectedSum = idGroup * 2 + idTerm * 2 - assertEquals("ID sum not calculated correctly", expectedSum, (bucket["twice_id_sum"] as Double).toInt()) - } - } - } - private fun getStrictMappings(): String { return """ "dynamic": "strict", From 807f9c7ab598cb19f89e8c276728e7e1f7e75869 Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Tue, 23 Aug 2022 20:11:04 +0200 Subject: [PATCH 2/9] added bucket log to track processed buckets Signed-off-by: Petar Dzepina --- .../transform/TransformBucketLog.kt | 58 +++++++++++++++++++ .../transform/TransformRunner.kt | 14 +++-- .../index/TransportIndexTransformAction.kt | 8 ++- 3 files changed, 72 insertions(+), 8 deletions(-) create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/transform/TransformBucketLog.kt diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformBucketLog.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformBucketLog.kt new file mode 100644 index 000000000..12e7e9dfc --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformBucketLog.kt @@ -0,0 +1,58 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.transform + +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import org.apache.logging.log4j.LogManager +import java.math.BigInteger +import java.security.MessageDigest + +class TransformBucketLog { + + private val maxSize: Int = 100_000_000 + private val mutex = Mutex() + + private var processedBuckets: MutableSet = HashSet() + + private var logger = LogManager.getLogger(javaClass) + + suspend fun addBuckets(buckets: List>) { + buckets.forEach { + addBucket(it) + } + } + + suspend fun addBucket(bucket: Map) { + mutex.withLock { + if (processedBuckets.size >= maxSize) return + processedBuckets.add(computeBucketHash(bucket)) + } + } + + suspend fun isProcessed(bucket: Map): Boolean { + mutex.withLock { + return processedBuckets.contains(computeBucketHash(bucket)) + } + } + + suspend fun isNotProcessed(bucket: Map) = !isProcessed(bucket) + + suspend fun computeBucketHash(bucket: Map): String { + val md5Crypt = MessageDigest.getInstance("MD5") + bucket.entries.sortedBy { it.key }.also { + it.forEach { entry -> + md5Crypt.update(entry.value.toString().toByteArray()) + } + } + return BigInteger(1, md5Crypt.digest()).toString(16) + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt index 7cb8ef648..3f416cce3 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt @@ -112,8 +112,7 @@ object TransformRunner : TimeValue.timeValueMillis(TransformSettings.DEFAULT_RENEW_LOCK_RETRY_DELAY), TransformSettings.DEFAULT_RENEW_LOCK_RETRY_COUNT ) - - var attemptedToIndex = false + var transformBucketLog = TransformBucketLog() var bucketsToTransform = BucketsToTransform(HashSet(), metadata) var lock = acquireLockForScheduledJob(transform, context, backoffPolicy) try { @@ -153,15 +152,18 @@ object TransformRunner : bucketsToTransform = first currentMetadata = second } - currentMetadata = recomputeModifiedBuckets(transform, currentMetadata, bucketsToTransform.modifiedBuckets) + + val modifiedBuckets = bucketsToTransform.modifiedBuckets.filter { transformBucketLog.isNotProcessed(it) }.toMutableSet() + + transformBucketLog.addBuckets(modifiedBuckets.toList()) + + currentMetadata = recomputeModifiedBuckets(transform, currentMetadata, modifiedBuckets) currentMetadata = transformMetadataService.writeMetadata(currentMetadata, true) bucketsToTransform = bucketsToTransform.copy(metadata = currentMetadata) - attemptedToIndex = true } } else { currentMetadata = executeTransformIteration(transform, currentMetadata, bucketsToTransform.modifiedBuckets) currentMetadata = transformMetadataService.writeMetadata(currentMetadata, true) - attemptedToIndex = true } // we attempt to renew lock for every loop of transform val renewedLock = renewLockForScheduledJob(context, lock, backoffPolicy) @@ -171,7 +173,7 @@ object TransformRunner : lock = renewedLock } } - } while (bucketsToTransform.currentShard != null || currentMetadata.afterKey != null || !attemptedToIndex) + } while (bucketsToTransform.currentShard != null || currentMetadata.afterKey != null) } catch (e: Exception) { logger.error("Failed to execute the transform job [${transform.id}] because of exception [${e.localizedMessage}]", e) currentMetadata = currentMetadata.copy( diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/index/TransportIndexTransformAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/index/TransportIndexTransformAction.kt index 373530386..ebdac4170 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/index/TransportIndexTransformAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/index/TransportIndexTransformAction.kt @@ -150,9 +150,13 @@ class TransportIndexTransformAction @Inject constructor( if (transform.continuous != newTransform.continuous) modified.add(Transform.CONTINUOUS_FIELD) return modified.toList() } - +// (groups + 1) * buckets = 1024 private fun putTransform() { - val transform = request.transform.copy(schemaVersion = IndexUtils.indexManagementConfigSchemaVersion, user = this.user) + val transform = request.transform.copy( + pageSize = minOf(request.transform.pageSize, 1024 / (request.transform.groups.size + 1)), + schemaVersion = IndexUtils.indexManagementConfigSchemaVersion, + user = this.user + ) request.index(INDEX_MANAGEMENT_INDEX) .id(request.transform.id) .source(transform.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS)) From 8acfa3a5ac0a3b231ed3e75b0dd5318cc11e40a3 Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Wed, 24 Aug 2022 15:47:01 +0200 Subject: [PATCH 3/9] various renames/changes Signed-off-by: Petar Dzepina --- .../transform/TransformBucketLog.kt | 58 -------------- .../transform/TransformProcessedBucketLog.kt | 43 ++++++++++ .../transform/TransformRunner.kt | 36 ++++----- .../transform/TransformSearchService.kt | 16 ++-- .../index/TransportIndexTransformAction.kt | 8 +- .../transform/TransformRunnerIT.kt | 79 +++++++++++++++++++ 6 files changed, 153 insertions(+), 87 deletions(-) delete mode 100644 src/main/kotlin/org/opensearch/indexmanagement/transform/TransformBucketLog.kt create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/transform/TransformProcessedBucketLog.kt diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformBucketLog.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformBucketLog.kt deleted file mode 100644 index 12e7e9dfc..000000000 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformBucketLog.kt +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.indexmanagement.transform - -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock -import org.apache.logging.log4j.LogManager -import java.math.BigInteger -import java.security.MessageDigest - -class TransformBucketLog { - - private val maxSize: Int = 100_000_000 - private val mutex = Mutex() - - private var processedBuckets: MutableSet = HashSet() - - private var logger = LogManager.getLogger(javaClass) - - suspend fun addBuckets(buckets: List>) { - buckets.forEach { - addBucket(it) - } - } - - suspend fun addBucket(bucket: Map) { - mutex.withLock { - if (processedBuckets.size >= maxSize) return - processedBuckets.add(computeBucketHash(bucket)) - } - } - - suspend fun isProcessed(bucket: Map): Boolean { - mutex.withLock { - return processedBuckets.contains(computeBucketHash(bucket)) - } - } - - suspend fun isNotProcessed(bucket: Map) = !isProcessed(bucket) - - suspend fun computeBucketHash(bucket: Map): String { - val md5Crypt = MessageDigest.getInstance("MD5") - bucket.entries.sortedBy { it.key }.also { - it.forEach { entry -> - md5Crypt.update(entry.value.toString().toByteArray()) - } - } - return BigInteger(1, md5Crypt.digest()).toString(16) - } -} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformProcessedBucketLog.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformProcessedBucketLog.kt new file mode 100644 index 000000000..4224ce232 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformProcessedBucketLog.kt @@ -0,0 +1,43 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.transform + +import java.math.BigInteger +import java.security.MessageDigest + +class TransformProcessedBucketLog { + + private val maxSize: Int = 100_000_000 + + private var processedBuckets: MutableSet = HashSet() + + fun addBuckets(buckets: List>) { + buckets.forEach { + addBucket(it) + } + } + + fun addBucket(bucket: Map) { + if (processedBuckets.size >= maxSize) return + processedBuckets.add(computeBucketHash(bucket)) + } + + fun isProcessed(bucket: Map): Boolean { + return processedBuckets.contains(computeBucketHash(bucket)) + } + + suspend fun isNotProcessed(bucket: Map) = !isProcessed(bucket) + + fun computeBucketHash(bucket: Map): String { + val md5Crypt = MessageDigest.getInstance("MD5") + bucket.entries.sortedBy { it.key }.also { + it.forEach { entry -> + md5Crypt.update(entry.value.toString().toByteArray()) + } + } + return BigInteger(1, md5Crypt.digest()).toString(16) + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt index 3f416cce3..e7af36a95 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt @@ -112,7 +112,7 @@ object TransformRunner : TimeValue.timeValueMillis(TransformSettings.DEFAULT_RENEW_LOCK_RETRY_DELAY), TransformSettings.DEFAULT_RENEW_LOCK_RETRY_COUNT ) - var transformBucketLog = TransformBucketLog() + var transformProcessedBucketLog = TransformProcessedBucketLog() var bucketsToTransform = BucketsToTransform(HashSet(), metadata) var lock = acquireLockForScheduledJob(transform, context, backoffPolicy) try { @@ -147,22 +147,23 @@ object TransformRunner : // If there are shards to search do it here if (bucketsToTransform.currentShard != null) { // Computes aggregation on modified documents for current shard to get modified buckets - getBucketsToTransformIteration(transform, bucketsToTransform).let { - (first, second) -> - bucketsToTransform = first - currentMetadata = second + bucketsToTransform = getBucketsToTransformIteration(transform, bucketsToTransform).also { + currentMetadata = it.metadata } - - val modifiedBuckets = bucketsToTransform.modifiedBuckets.filter { transformBucketLog.isNotProcessed(it) }.toMutableSet() - - transformBucketLog.addBuckets(modifiedBuckets.toList()) - + // Filter out already processed buckets + val modifiedBuckets = bucketsToTransform.modifiedBuckets.filter { transformProcessedBucketLog.isNotProcessed(it) }.toMutableSet() + // Recompute modified buckets and update them in targetIndex currentMetadata = recomputeModifiedBuckets(transform, currentMetadata, modifiedBuckets) + // Add processed buckets to 'processed set' so that we don't try to reprocess them again + transformProcessedBucketLog.addBuckets(modifiedBuckets.toList()) + // Update TransformMetadata currentMetadata = transformMetadataService.writeMetadata(currentMetadata, true) bucketsToTransform = bucketsToTransform.copy(metadata = currentMetadata) } } else { - currentMetadata = executeTransformIteration(transform, currentMetadata, bucketsToTransform.modifiedBuckets) + // Computes buckets from source index and stores them in targetIndex as transform docs + currentMetadata = computeBucketsIteration(transform, currentMetadata) + // Update TransformMetadata currentMetadata = transformMetadataService.writeMetadata(currentMetadata, true) } // we attempt to renew lock for every loop of transform @@ -201,7 +202,7 @@ object TransformRunner : } } - private suspend fun getBucketsToTransformIteration(transform: Transform, bucketsToTransform: BucketsToTransform): Pair { + private suspend fun getBucketsToTransformIteration(transform: Transform, bucketsToTransform: BucketsToTransform): BucketsToTransform { var currentBucketsToTransform = bucketsToTransform val currentShard = bucketsToTransform.currentShard // Clear modified buckets from previous iteration @@ -234,7 +235,7 @@ object TransformRunner : currentBucketsToTransform.copy(currentShard = null) } } - return Pair(currentBucketsToTransform, currentBucketsToTransform.metadata) + return currentBucketsToTransform } private suspend fun validateTransform(transform: Transform, transformMetadata: TransformMetadata): TransformMetadata { @@ -254,17 +255,16 @@ object TransformRunner : * the range query will not precisely specify the modified buckets. As a result, we increase the range for the query and then filter out * the unintended buckets as part of the composite search step. */ - private suspend fun executeTransformIteration( + private suspend fun computeBucketsIteration( transform: Transform, metadata: TransformMetadata, - modifiedBuckets: MutableSet> ): TransformMetadata { val transformSearchResult = withTransformSecurityContext(transform) { transformSearchService.executeCompositeSearch( transform, - if (transform.continuous) null else metadata.afterKey, - if (transform.continuous) modifiedBuckets else null + metadata.afterKey, + null ) } val indexTimeInMillis = withTransformSecurityContext(transform) { @@ -273,7 +273,7 @@ object TransformRunner : val afterKey = transformSearchResult.afterKey val stats = transformSearchResult.stats val updatedStats = stats.copy( - pagesProcessed = if (transform.continuous) 0 else stats.pagesProcessed, + pagesProcessed = stats.pagesProcessed, indexTimeInMillis = stats.indexTimeInMillis + indexTimeInMillis, documentsIndexed = transformSearchResult.docsToIndex.size.toLong() ) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt index 73d8e24fa..610538019 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt @@ -113,11 +113,11 @@ class TransformSearchService( suspend fun getShardLevelModifiedBuckets(transform: Transform, afterKey: Map?, currentShard: ShardNewDocuments): BucketSearchResult { try { var retryAttempt = 0 - val pageSize = calculateMaxPageSize(transform) + var pageSize = calculateMaxPageSize(transform) val searchResponse = backoffPolicy.retry(logger) { val pageSizeDecay = 2f.pow(retryAttempt++) client.suspendUntil { listener: ActionListener -> - val pageSize = max(1, pageSize.div(pageSizeDecay.toInt())) + pageSize = max(1, pageSize.div(pageSizeDecay.toInt())) if (retryAttempt > 1) { logger.debug( "Attempt [${retryAttempt - 1}] to get modified buckets for transform [${transform.id}]. Attempting " + @@ -142,8 +142,8 @@ class TransformSearchService( } /** - * Apache Lucene has maxClauses limit which we could trip during recomputing of modified buckets - * due to matching too many bucket fields. To avoid this, we control how many buckets we recompute at a time(pageSize) + * Apache Lucene has maxClauses limit which we could trip during recomputing of modified buckets(continuous transform) + * due to trying to match too many bucket fields. To avoid this, we control how many buckets we recompute at a time(pageSize) */ private fun calculateMaxPageSize(transform: Transform): Int { return minOf(transform.pageSize, LUCENE_MAX_CLAUSES / (transform.groups.size + 1)) @@ -156,12 +156,18 @@ class TransformSearchService( modifiedBuckets: MutableSet>? = null ): TransformSearchResult { try { + var pageSize: Int = + if (modifiedBuckets.isNullOrEmpty()) + transform.pageSize + else + modifiedBuckets.size + var retryAttempt = 0 val searchResponse = backoffPolicy.retry(logger) { // TODO: Should we store the value of the past successful page size (?) val pageSizeDecay = 2f.pow(retryAttempt++) client.suspendUntil { listener: ActionListener -> - val pageSize = max(1, transform.pageSize.div(pageSizeDecay.toInt())) + pageSize = max(1, pageSize.div(pageSizeDecay.toInt())) if (retryAttempt > 1) { logger.debug( "Attempt [${retryAttempt - 1}] of composite search failed for transform [${transform.id}]. Attempting " + diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/index/TransportIndexTransformAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/index/TransportIndexTransformAction.kt index ebdac4170..373530386 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/index/TransportIndexTransformAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/index/TransportIndexTransformAction.kt @@ -150,13 +150,9 @@ class TransportIndexTransformAction @Inject constructor( if (transform.continuous != newTransform.continuous) modified.add(Transform.CONTINUOUS_FIELD) return modified.toList() } -// (groups + 1) * buckets = 1024 + private fun putTransform() { - val transform = request.transform.copy( - pageSize = minOf(request.transform.pageSize, 1024 / (request.transform.groups.size + 1)), - schemaVersion = IndexUtils.indexManagementConfigSchemaVersion, - user = this.user - ) + val transform = request.transform.copy(schemaVersion = IndexUtils.indexManagementConfigSchemaVersion, user = this.user) request.index(INDEX_MANAGEMENT_INDEX) .id(request.transform.id) .source(transform.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS)) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt index 0ba346daa..723a7527b 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt @@ -7,6 +7,8 @@ package org.opensearch.indexmanagement.transform import org.apache.http.entity.ContentType import org.apache.http.entity.StringEntity +import org.opensearch.client.Request +import org.opensearch.client.RequestOptions import org.opensearch.common.settings.Settings import org.opensearch.indexmanagement.common.model.dimension.DateHistogram import org.opensearch.indexmanagement.common.model.dimension.Histogram @@ -602,6 +604,69 @@ class TransformRunnerIT : TransformRestTestCase() { disableTransform(transform.id) } + fun `test continuous transform with a lot of buckets`() { + + val sourceIndex = "index_with_lots_of_buckets" + + var requestBody: StringBuilder = StringBuilder(100000) + for (i in 1..2000) { + var docPayload: String = """ + { + "id1": "$i", + "id2": "${i + 1}" + } + """.trimIndent().replace(Regex("[\n\r\\s]"), "") + + requestBody.append("{\"create\":{}}\n").append(docPayload).append('\n') + } + + createIndexAndBulkInsert(sourceIndex, Settings.EMPTY, null, null, requestBody.toString()) + + val transform = Transform( + id = "id_111", + schemaVersion = 1L, + enabled = true, + enabledAt = Instant.now(), + updatedAt = Instant.now(), + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + description = "test transform", + metadataId = null, + sourceIndex = "index_with_lots_of_buckets", + targetIndex = "index_with_lots_of_buckets_transformed", + roles = emptyList(), + pageSize = 1000, + groups = listOf( + Terms(sourceField = "id1.keyword", targetField = "id1"), + Terms(sourceField = "id2.keyword", targetField = "id2") + ), + continuous = true + ).let { createTransform(it, it.id) } + + updateTransformStartTime(transform) + + waitFor { assertTrue("Target transform index was not created", indexExists(transform.targetIndex)) } + + val firstIterationMetadata = waitFor { + val job = getTransform(transformId = transform.id) + assertNotNull("Transform job doesn't have metadata set", job.metadataId) + val transformMetadata = getTransformMetadata(job.metadataId!!) + assertEquals("Transform did not complete iteration or had incorrect number of documents processed", 2000, transformMetadata.stats.documentsProcessed) + assertEquals("Transform did not complete iteration", null, transformMetadata.afterKey) + assertNotNull("Continuous stats were not updated", transformMetadata.continuousStats) + assertNotNull("Continuous stats were set, but lastTimestamp was not", transformMetadata.continuousStats!!.lastTimestamp) + transformMetadata + } + + assertEquals("Not the expected transform status", TransformMetadata.Status.STARTED, firstIterationMetadata.status) + assertEquals("Not the expected pages processed", 7, firstIterationMetadata.stats.pagesProcessed) + assertEquals("Not the expected documents indexed", 2000L, firstIterationMetadata.stats.documentsIndexed) + assertEquals("Not the expected documents processed", 2000L, firstIterationMetadata.stats.documentsProcessed) + assertTrue("Doesn't capture indexed time", firstIterationMetadata.stats.indexTimeInMillis > 0) + assertTrue("Didn't capture search time", firstIterationMetadata.stats.searchTimeInMillis > 0) + + disableTransform(transform.id) + } + private fun getStrictMappings(): String { return """ "dynamic": "strict", @@ -619,4 +684,18 @@ class TransformRunnerIT : TransformRestTestCase() { assertIndexExists(indexName) } } + + fun createIndexAndBulkInsert(name: String, settings: Settings, mapping: String?, aliases: String?, bulkData: String) { + // createIndex(name, settings, mapping, aliases) + + val request = Request("POST", "/$name/_bulk/?refresh=true") + request.setJsonEntity(bulkData) + request.options = RequestOptions.DEFAULT.toBuilder().addHeader("content-type", "application/x-ndjson").build() + var res = client().performRequest(request) + assertEquals(RestStatus.OK, res.restStatus()) + + val refreshRequest = Request("POST", "/$name/_refresh") + res = client().performRequest(refreshRequest) + assertEquals(RestStatus.OK, res.restStatus()) + } } From dce28b9f8e4965722fee77f10f9c252f3ba89f77 Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Wed, 24 Aug 2022 18:45:04 +0200 Subject: [PATCH 4/9] fixed detekt issues Signed-off-by: Petar Dzepina --- .../transform/TransformProcessedBucketLog.kt | 9 ++++++--- .../indexmanagement/transform/TransformRunner.kt | 6 ++++-- .../indexmanagement/transform/TransformRunnerIT.kt | 11 +++++++---- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformProcessedBucketLog.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformProcessedBucketLog.kt index a115c74ba..6ba6232b2 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformProcessedBucketLog.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformProcessedBucketLog.kt @@ -10,7 +10,10 @@ import java.security.MessageDigest class TransformProcessedBucketLog { - private val maxSize: Int = 100_000_000 + companion object { + const val MAX_SIZE = 100_000_000 + const val HEX_RADIX = 16 + } private var processedBuckets: MutableSet = HashSet() @@ -21,7 +24,7 @@ class TransformProcessedBucketLog { } fun addBucket(bucket: Map) { - if (processedBuckets.size >= maxSize) return + if (processedBuckets.size >= MAX_SIZE) return processedBuckets.add(computeBucketHash(bucket)) } @@ -41,6 +44,6 @@ class TransformProcessedBucketLog { ) } } - return BigInteger(1, md5Crypt.digest()).toString(16) + return BigInteger(1, md5Crypt.digest()).toString(HEX_RADIX) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt index e7af36a95..d0ef5c74b 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt @@ -112,7 +112,7 @@ object TransformRunner : TimeValue.timeValueMillis(TransformSettings.DEFAULT_RENEW_LOCK_RETRY_DELAY), TransformSettings.DEFAULT_RENEW_LOCK_RETRY_COUNT ) - var transformProcessedBucketLog = TransformProcessedBucketLog() + val transformProcessedBucketLog = TransformProcessedBucketLog() var bucketsToTransform = BucketsToTransform(HashSet(), metadata) var lock = acquireLockForScheduledJob(transform, context, backoffPolicy) try { @@ -151,7 +151,9 @@ object TransformRunner : currentMetadata = it.metadata } // Filter out already processed buckets - val modifiedBuckets = bucketsToTransform.modifiedBuckets.filter { transformProcessedBucketLog.isNotProcessed(it) }.toMutableSet() + val modifiedBuckets = bucketsToTransform.modifiedBuckets.filter { + transformProcessedBucketLog.isNotProcessed(it) + }.toMutableSet() // Recompute modified buckets and update them in targetIndex currentMetadata = recomputeModifiedBuckets(transform, currentMetadata, modifiedBuckets) // Add processed buckets to 'processed set' so that we don't try to reprocess them again diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt index 9e5ec4836..3bd6b9aa5 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt @@ -956,9 +956,9 @@ class TransformRunnerIT : TransformRestTestCase() { val sourceIndex = "index_with_lots_of_buckets" - var requestBody: StringBuilder = StringBuilder(100000) + val requestBody: StringBuilder = StringBuilder(100000) for (i in 1..2000) { - var docPayload: String = """ + val docPayload: String = """ { "id1": "$i", "id2": "${i + 1}" @@ -1033,8 +1033,11 @@ class TransformRunnerIT : TransformRestTestCase() { } } - fun createIndexAndBulkInsert(name: String, settings: Settings, mapping: String?, aliases: String?, bulkData: String) { - // createIndex(name, settings, mapping, aliases) + private fun createIndexAndBulkInsert(name: String, settings: Settings, mapping: String?, aliases: String?, bulkData: String) { + + if (settings != null || mapping != null || aliases != null) { + createIndex(name, settings, mapping, aliases) + } val request = Request("POST", "/$name/_bulk/?refresh=true") request.setJsonEntity(bulkData) From 12cf319330b2f991d8d3eb29a7268b2c4ce93f11 Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Wed, 24 Aug 2022 19:30:12 +0200 Subject: [PATCH 5/9] added comments to test Signed-off-by: Petar Dzepina --- .../indexmanagement/transform/TransformRunnerIT.kt | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt index 3bd6b9aa5..9d18ef24f 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt @@ -954,6 +954,7 @@ class TransformRunnerIT : TransformRestTestCase() { fun `test continuous transform with a lot of buckets`() { + // Create index with high cardinality fields val sourceIndex = "index_with_lots_of_buckets" val requestBody: StringBuilder = StringBuilder(100000) @@ -969,9 +970,9 @@ class TransformRunnerIT : TransformRestTestCase() { } createIndexAndBulkInsert(sourceIndex, Settings.EMPTY, null, null, requestBody.toString()) - + // Source index will have total of 2000 buckets val transform = Transform( - id = "id_111", + id = "transform_index_with_lots_of_buckets", schemaVersion = 1L, enabled = true, enabledAt = Instant.now(), From 60e7f164cf60d41a794cfd469185a580dbf1d966 Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Wed, 24 Aug 2022 20:47:28 +0200 Subject: [PATCH 6/9] removed debug logging Signed-off-by: Petar Dzepina --- .../opensearch/indexmanagement/transform/TransformRunner.kt | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt index d0ef5c74b..62fb4104a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt @@ -193,7 +193,6 @@ object TransformRunner : continuousStats = ContinuousTransformStats(newGlobalCheckpointTime, null) ) } - logger.info("executeJob: writting metadata seqNo:${currentMetadata.seqNo}, primaryTerm:${currentMetadata.primaryTerm}") transformMetadataService.writeMetadata(currentMetadata, true) if (!transform.continuous || currentMetadata.status == TransformMetadata.Status.FAILED) { logger.info("Disabling the transform job ${transform.id}") @@ -247,7 +246,6 @@ object TransformRunner : return if (!validationResult.isValid) { val failureMessage = "Failed validation - ${validationResult.issues}" val failureMetadata = transformMetadata.copy(status = TransformMetadata.Status.FAILED, failureReason = failureMessage) - logger.info("validateFailed: writting metadata seqNo:${failureMetadata.seqNo}, primaryTerm:${failureMetadata.primaryTerm}") transformMetadataService.writeMetadata(failureMetadata, true) } else transformMetadata } @@ -309,7 +307,6 @@ object TransformRunner : status = TransformMetadata.Status.STARTED ) } else metadata.copy(lastUpdatedAt = Instant.now(), status = TransformMetadata.Status.STARTED) - logger.info("recomputeModifiedBuckets: writting metadata seqNo:${updatedMetadata.seqNo}, primaryTerm:${updatedMetadata.primaryTerm}") return updatedMetadata } From c3e2cbc7d07721a830b2f3d033ee780d3bd9320d Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Wed, 24 Aug 2022 21:38:28 +0200 Subject: [PATCH 7/9] empty commit to trigger checks Signed-off-by: Petar Dzepina From af8f4c348b0e9bca677620da1af332331c8f3c92 Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Wed, 31 Aug 2022 17:05:41 +0200 Subject: [PATCH 8/9] reduced pageSize to 1 in few ITs to avoid flaky tests; fixed bug where pagesProcessed was calculated incorrectly Signed-off-by: Petar Dzepina --- .../indexmanagement/transform/TransformSearchService.kt | 2 +- .../opensearch/indexmanagement/transform/TransformRunnerIT.kt | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt index 610538019..f42c22c1c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt @@ -287,7 +287,7 @@ class TransformSearchService( val aggs = searchResponse.aggregations.get(transform.id) as CompositeAggregation val buckets = if (modifiedBuckets != null) aggs.buckets.filter { modifiedBuckets.contains(it.key) } else aggs.buckets val documentsProcessed = buckets.fold(0L) { sum, it -> sum + it.docCount } - val pagesProcessed = 1L + val pagesProcessed = if (buckets.isNotEmpty()) 1L else 0L val searchTime = searchResponse.took.millis val stats = TransformStats(pagesProcessed, documentsProcessed, 0, 0, searchTime) val afterKey = aggs.afterKey() diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt index 9d18ef24f..bbf13cb86 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt @@ -46,7 +46,7 @@ class TransformRunnerIT : TransformRestTestCase() { sourceIndex = "transform-source-index", targetIndex = "transform-target-index", roles = emptyList(), - pageSize = 100, + pageSize = 1, groups = listOf( Terms(sourceField = "store_and_fwd_flag", targetField = "flag") ) @@ -86,7 +86,7 @@ class TransformRunnerIT : TransformRestTestCase() { sourceIndex = "transform-source-index", targetIndex = "transform-target-index", roles = emptyList(), - pageSize = 100, + pageSize = 1, groups = listOf( Terms(sourceField = "store_and_fwd_flag", targetField = "flag") ), From 9c80b2915e777ad96fb55b5788cbcb9212462a14 Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Wed, 31 Aug 2022 17:58:59 +0200 Subject: [PATCH 9/9] reverted pagesProcessed change; fixed few ITs Signed-off-by: Petar Dzepina --- .../indexmanagement/transform/TransformSearchService.kt | 2 +- .../opensearch/indexmanagement/transform/TransformRunnerIT.kt | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt index f42c22c1c..610538019 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt @@ -287,7 +287,7 @@ class TransformSearchService( val aggs = searchResponse.aggregations.get(transform.id) as CompositeAggregation val buckets = if (modifiedBuckets != null) aggs.buckets.filter { modifiedBuckets.contains(it.key) } else aggs.buckets val documentsProcessed = buckets.fold(0L) { sum, it -> sum + it.docCount } - val pagesProcessed = if (buckets.isNotEmpty()) 1L else 0L + val pagesProcessed = 1L val searchTime = searchResponse.took.millis val stats = TransformStats(pagesProcessed, documentsProcessed, 0, 0, searchTime) val afterKey = aggs.afterKey() diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt index bbf13cb86..62ef305bd 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt @@ -64,7 +64,7 @@ class TransformRunnerIT : TransformRestTestCase() { transformMetadata } - assertEquals("More than expected pages processed", 2L, metadata.stats.pagesProcessed) + assertEquals("More than expected pages processed", 3L, metadata.stats.pagesProcessed) assertEquals("More than expected documents indexed", 2L, metadata.stats.documentsIndexed) assertEquals("More than expected documents processed", 5000L, metadata.stats.documentsProcessed) assertTrue("Doesn't capture indexed time", metadata.stats.indexTimeInMillis > 0) @@ -1034,7 +1034,7 @@ class TransformRunnerIT : TransformRestTestCase() { } } - private fun createIndexAndBulkInsert(name: String, settings: Settings, mapping: String?, aliases: String?, bulkData: String) { + private fun createIndexAndBulkInsert(name: String, settings: Settings?, mapping: String?, aliases: String?, bulkData: String) { if (settings != null || mapping != null || aliases != null) { createIndex(name, settings, mapping, aliases)