diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformProcessedBucketLog.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformProcessedBucketLog.kt new file mode 100644 index 000000000..6ba6232b2 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformProcessedBucketLog.kt @@ -0,0 +1,49 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.transform + +import java.math.BigInteger +import java.security.MessageDigest + +class TransformProcessedBucketLog { + + companion object { + const val MAX_SIZE = 100_000_000 + const val HEX_RADIX = 16 + } + + private var processedBuckets: MutableSet = HashSet() + + fun addBuckets(buckets: List>) { + buckets.forEach { + addBucket(it) + } + } + + fun addBucket(bucket: Map) { + if (processedBuckets.size >= MAX_SIZE) return + processedBuckets.add(computeBucketHash(bucket)) + } + + fun isProcessed(bucket: Map): Boolean { + return processedBuckets.contains(computeBucketHash(bucket)) + } + + fun isNotProcessed(bucket: Map) = !isProcessed(bucket) + + fun computeBucketHash(bucket: Map): String { + val md5Crypt = MessageDigest.getInstance("MD5") + bucket.entries.sortedBy { it.key }.also { + it.forEach { entry -> + md5Crypt.update( + if (entry.value == null) "null".toByteArray() + else entry.value.toString().toByteArray() + ) + } + } + return BigInteger(1, md5Crypt.digest()).toString(HEX_RADIX) + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt index 07e38fb11..62fb4104a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt @@ -112,8 +112,7 @@ object TransformRunner : TimeValue.timeValueMillis(TransformSettings.DEFAULT_RENEW_LOCK_RETRY_DELAY), TransformSettings.DEFAULT_RENEW_LOCK_RETRY_COUNT ) - - var attemptedToIndex = false + val transformProcessedBucketLog = TransformProcessedBucketLog() var bucketsToTransform = BucketsToTransform(HashSet(), metadata) var lock = acquireLockForScheduledJob(transform, context, backoffPolicy) try { @@ -134,7 +133,7 @@ object TransformRunner : currentMetadata = validatedMetadata return } - if (transform.continuous && (bucketsToTransform.shardsToSearch == null || bucketsToTransform.currentShard != null)) { + if (transform.continuous) { // If we have not populated the list of shards to search, do so now if (bucketsToTransform.shardsToSearch == null) { // Note the timestamp when we got the shard global checkpoints to the user may know what data is included @@ -145,11 +144,29 @@ object TransformRunner : newGlobalCheckpoints ) } - bucketsToTransform = getBucketsToTransformIteration(transform, bucketsToTransform) - currentMetadata = bucketsToTransform.metadata + // If there are shards to search do it here + if (bucketsToTransform.currentShard != null) { + // Computes aggregation on modified documents for current shard to get modified buckets + bucketsToTransform = getBucketsToTransformIteration(transform, bucketsToTransform).also { + currentMetadata = it.metadata + } + // Filter out already processed buckets + val modifiedBuckets = bucketsToTransform.modifiedBuckets.filter { + transformProcessedBucketLog.isNotProcessed(it) + }.toMutableSet() + // Recompute modified buckets and update them in targetIndex + currentMetadata = recomputeModifiedBuckets(transform, currentMetadata, modifiedBuckets) + // Add processed buckets to 'processed set' so that we don't try to reprocess them again + transformProcessedBucketLog.addBuckets(modifiedBuckets.toList()) + // Update TransformMetadata + currentMetadata = transformMetadataService.writeMetadata(currentMetadata, true) + bucketsToTransform = bucketsToTransform.copy(metadata = currentMetadata) + } } else { - currentMetadata = executeTransformIteration(transform, currentMetadata, bucketsToTransform.modifiedBuckets) - attemptedToIndex = true + // Computes buckets from source index and stores them in targetIndex as transform docs + currentMetadata = computeBucketsIteration(transform, currentMetadata) + // Update TransformMetadata + currentMetadata = transformMetadataService.writeMetadata(currentMetadata, true) } // we attempt to renew lock for every loop of transform val renewedLock = renewLockForScheduledJob(context, lock, backoffPolicy) @@ -159,7 +176,7 @@ object TransformRunner : lock = renewedLock } } - } while (bucketsToTransform.currentShard != null || currentMetadata.afterKey != null || !attemptedToIndex) + } while (bucketsToTransform.currentShard != null || currentMetadata.afterKey != null) } catch (e: Exception) { logger.error("Failed to execute the transform job [${transform.id}] because of exception [${e.localizedMessage}]", e) currentMetadata = currentMetadata.copy( @@ -189,6 +206,8 @@ object TransformRunner : private suspend fun getBucketsToTransformIteration(transform: Transform, bucketsToTransform: BucketsToTransform): BucketsToTransform { var currentBucketsToTransform = bucketsToTransform val currentShard = bucketsToTransform.currentShard + // Clear modified buckets from previous iteration + currentBucketsToTransform.modifiedBuckets.clear() if (currentShard != null) { val shardLevelModifiedBuckets = withTransformSecurityContext(transform) { @@ -236,19 +255,47 @@ object TransformRunner : * the range query will not precisely specify the modified buckets. As a result, we increase the range for the query and then filter out * the unintended buckets as part of the composite search step. */ - private suspend fun executeTransformIteration( + private suspend fun computeBucketsIteration( + transform: Transform, + metadata: TransformMetadata, + ): TransformMetadata { + + val transformSearchResult = withTransformSecurityContext(transform) { + transformSearchService.executeCompositeSearch( + transform, + metadata.afterKey, + null + ) + } + val indexTimeInMillis = withTransformSecurityContext(transform) { + transformIndexer.index(transformSearchResult.docsToIndex) + } + val afterKey = transformSearchResult.afterKey + val stats = transformSearchResult.stats + val updatedStats = stats.copy( + pagesProcessed = stats.pagesProcessed, + indexTimeInMillis = stats.indexTimeInMillis + indexTimeInMillis, + documentsIndexed = transformSearchResult.docsToIndex.size.toLong() + ) + return metadata.mergeStats(updatedStats).copy( + afterKey = afterKey, + lastUpdatedAt = Instant.now(), + status = if (afterKey == null) TransformMetadata.Status.FINISHED else TransformMetadata.Status.STARTED + ) + } + + private suspend fun recomputeModifiedBuckets( transform: Transform, metadata: TransformMetadata, modifiedBuckets: MutableSet> ): TransformMetadata { - val updatedMetadata = if (!transform.continuous || modifiedBuckets.isNotEmpty()) { + val updatedMetadata = if (modifiedBuckets.isNotEmpty()) { val transformSearchResult = withTransformSecurityContext(transform) { - transformSearchService.executeCompositeSearch(transform, metadata.afterKey, if (transform.continuous) modifiedBuckets else null) + transformSearchService.executeCompositeSearch(transform, null, modifiedBuckets) } val indexTimeInMillis = withTransformSecurityContext(transform) { transformIndexer.index(transformSearchResult.docsToIndex) } - val afterKey = transformSearchResult.afterKey val stats = transformSearchResult.stats val updatedStats = stats.copy( pagesProcessed = if (transform.continuous) 0 else stats.pagesProcessed, @@ -256,12 +303,11 @@ object TransformRunner : documentsIndexed = transformSearchResult.docsToIndex.size.toLong() ) metadata.mergeStats(updatedStats).copy( - afterKey = afterKey, lastUpdatedAt = Instant.now(), - status = if (afterKey == null && !transform.continuous) TransformMetadata.Status.FINISHED else TransformMetadata.Status.STARTED + status = TransformMetadata.Status.STARTED ) } else metadata.copy(lastUpdatedAt = Instant.now(), status = TransformMetadata.Status.STARTED) - return transformMetadataService.writeMetadata(updatedMetadata, true) + return updatedMetadata } private suspend fun withTransformSecurityContext(transform: Transform, block: suspend CoroutineScope.() -> T): T { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt index 3e22b68ce..3e02c7d46 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt @@ -40,6 +40,7 @@ import org.opensearch.indexmanagement.transform.model.TransformSearchResult import org.opensearch.indexmanagement.transform.model.TransformStats import org.opensearch.indexmanagement.transform.settings.TransformSettings.Companion.TRANSFORM_JOB_SEARCH_BACKOFF_COUNT import org.opensearch.indexmanagement.transform.settings.TransformSettings.Companion.TRANSFORM_JOB_SEARCH_BACKOFF_MILLIS +import org.opensearch.indexmanagement.util.IndexUtils.Companion.LUCENE_MAX_CLAUSES import org.opensearch.indexmanagement.util.IndexUtils.Companion.ODFE_MAGIC_NULL import org.opensearch.indexmanagement.util.IndexUtils.Companion.hashToFixedSize import org.opensearch.rest.RestStatus @@ -112,10 +113,11 @@ class TransformSearchService( suspend fun getShardLevelModifiedBuckets(transform: Transform, afterKey: Map?, currentShard: ShardNewDocuments): BucketSearchResult { try { var retryAttempt = 0 + var pageSize = calculateMaxPageSize(transform) val searchResponse = backoffPolicy.retry(logger) { val pageSizeDecay = 2f.pow(retryAttempt++) client.suspendUntil { listener: ActionListener -> - val pageSize = max(1, transform.pageSize.div(pageSizeDecay.toInt())) + pageSize = max(1, pageSize.div(pageSizeDecay.toInt())) if (retryAttempt > 1) { logger.debug( "Attempt [${retryAttempt - 1}] to get modified buckets for transform [${transform.id}]. Attempting " + @@ -139,6 +141,14 @@ class TransformSearchService( } } + /** + * Apache Lucene has maxClauses limit which we could trip during recomputing of modified buckets(continuous transform) + * due to trying to match too many bucket fields. To avoid this, we control how many buckets we recompute at a time(pageSize) + */ + private fun calculateMaxPageSize(transform: Transform): Int { + return minOf(transform.pageSize, LUCENE_MAX_CLAUSES / (transform.groups.size + 1)) + } + @Suppress("RethrowCaughtException") suspend fun executeCompositeSearch( transform: Transform, @@ -146,12 +156,18 @@ class TransformSearchService( modifiedBuckets: MutableSet>? = null ): TransformSearchResult { try { + var pageSize: Int = + if (modifiedBuckets.isNullOrEmpty()) + transform.pageSize + else + modifiedBuckets.size + var retryAttempt = 0 val searchResponse = backoffPolicy.retry(logger) { // TODO: Should we store the value of the past successful page size (?) val pageSizeDecay = 2f.pow(retryAttempt++) client.suspendUntil { listener: ActionListener -> - val pageSize = max(1, transform.pageSize.div(pageSizeDecay.toInt())) + pageSize = max(1, pageSize.div(pageSizeDecay.toInt())) if (retryAttempt > 1) { logger.debug( "Attempt [${retryAttempt - 1}] of composite search failed for transform [${transform.id}]. Attempting " + diff --git a/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt index 136d3a900..9986216a5 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt @@ -34,6 +34,7 @@ class IndexUtils { const val SCHEMA_VERSION = "schema_version" const val DEFAULT_SCHEMA_VERSION = 1L const val ODFE_MAGIC_NULL = "#ODFE-MAGIC-NULL-MAGIC-ODFE#" + const val LUCENE_MAX_CLAUSES = 1024 private const val BYTE_ARRAY_SIZE = 16 private const val DOCUMENT_ID_SEED = 72390L val logger = LogManager.getLogger(IndexUtils::class.java) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt index 6b5cd63c8..bed1b568f 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt @@ -7,6 +7,8 @@ package org.opensearch.indexmanagement.transform import org.apache.http.entity.ContentType import org.apache.http.entity.StringEntity +import org.opensearch.client.Request +import org.opensearch.client.RequestOptions import org.opensearch.common.settings.Settings import org.opensearch.index.query.TermQueryBuilder import org.opensearch.indexmanagement.common.model.dimension.DateHistogram @@ -44,7 +46,7 @@ class TransformRunnerIT : TransformRestTestCase() { sourceIndex = "transform-source-index", targetIndex = "transform-target-index", roles = emptyList(), - pageSize = 10, + pageSize = 1, groups = listOf( Terms(sourceField = "store_and_fwd_flag", targetField = "flag") ) @@ -62,7 +64,7 @@ class TransformRunnerIT : TransformRestTestCase() { transformMetadata } - assertEquals("More than expected pages processed", 2L, metadata.stats.pagesProcessed) + assertEquals("More than expected pages processed", 3L, metadata.stats.pagesProcessed) assertEquals("More than expected documents indexed", 2L, metadata.stats.documentsIndexed) assertEquals("More than expected documents processed", 5000L, metadata.stats.documentsProcessed) assertTrue("Doesn't capture indexed time", metadata.stats.indexTimeInMillis > 0) @@ -84,7 +86,7 @@ class TransformRunnerIT : TransformRestTestCase() { sourceIndex = "transform-source-index", targetIndex = "transform-target-index", roles = emptyList(), - pageSize = 10, + pageSize = 1, groups = listOf( Terms(sourceField = "store_and_fwd_flag", targetField = "flag") ), @@ -948,6 +950,70 @@ class TransformRunnerIT : TransformRestTestCase() { } } + fun `test continuous transform with a lot of buckets`() { + + // Create index with high cardinality fields + val sourceIndex = "index_with_lots_of_buckets" + + val requestBody: StringBuilder = StringBuilder(100000) + for (i in 1..2000) { + val docPayload: String = """ + { + "id1": "$i", + "id2": "${i + 1}" + } + """.trimIndent().replace(Regex("[\n\r\\s]"), "") + + requestBody.append("{\"create\":{}}\n").append(docPayload).append('\n') + } + + createIndexAndBulkInsert(sourceIndex, Settings.EMPTY, null, null, requestBody.toString()) + // Source index will have total of 2000 buckets + val transform = Transform( + id = "transform_index_with_lots_of_buckets", + schemaVersion = 1L, + enabled = true, + enabledAt = Instant.now(), + updatedAt = Instant.now(), + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + description = "test transform", + metadataId = null, + sourceIndex = "index_with_lots_of_buckets", + targetIndex = "index_with_lots_of_buckets_transformed", + roles = emptyList(), + pageSize = 1000, + groups = listOf( + Terms(sourceField = "id1.keyword", targetField = "id1"), + Terms(sourceField = "id2.keyword", targetField = "id2") + ), + continuous = true + ).let { createTransform(it, it.id) } + + updateTransformStartTime(transform) + + waitFor { assertTrue("Target transform index was not created", indexExists(transform.targetIndex)) } + + val firstIterationMetadata = waitFor { + val job = getTransform(transformId = transform.id) + assertNotNull("Transform job doesn't have metadata set", job.metadataId) + val transformMetadata = getTransformMetadata(job.metadataId!!) + assertEquals("Transform did not complete iteration or had incorrect number of documents processed", 2000, transformMetadata.stats.documentsProcessed) + assertEquals("Transform did not complete iteration", null, transformMetadata.afterKey) + assertNotNull("Continuous stats were not updated", transformMetadata.continuousStats) + assertNotNull("Continuous stats were set, but lastTimestamp was not", transformMetadata.continuousStats!!.lastTimestamp) + transformMetadata + } + + assertEquals("Not the expected transform status", TransformMetadata.Status.STARTED, firstIterationMetadata.status) + assertEquals("Not the expected pages processed", 7, firstIterationMetadata.stats.pagesProcessed) + assertEquals("Not the expected documents indexed", 2000L, firstIterationMetadata.stats.documentsIndexed) + assertEquals("Not the expected documents processed", 2000L, firstIterationMetadata.stats.documentsProcessed) + assertTrue("Doesn't capture indexed time", firstIterationMetadata.stats.indexTimeInMillis > 0) + assertTrue("Didn't capture search time", firstIterationMetadata.stats.searchTimeInMillis > 0) + + disableTransform(transform.id) + } + private fun getStrictMappings(): String { return """ "dynamic": "strict", @@ -965,4 +1031,21 @@ class TransformRunnerIT : TransformRestTestCase() { assertIndexExists(indexName) } } + + private fun createIndexAndBulkInsert(name: String, settings: Settings?, mapping: String?, aliases: String?, bulkData: String) { + + if (settings != null || mapping != null || aliases != null) { + createIndex(name, settings, mapping, aliases) + } + + val request = Request("POST", "/$name/_bulk/?refresh=true") + request.setJsonEntity(bulkData) + request.options = RequestOptions.DEFAULT.toBuilder().addHeader("content-type", "application/x-ndjson").build() + var res = client().performRequest(request) + assertEquals(RestStatus.OK, res.restStatus()) + + val refreshRequest = Request("POST", "/$name/_refresh") + res = client().performRequest(refreshRequest) + assertEquals(RestStatus.OK, res.restStatus()) + } }