Skip to content

Commit

Permalink
[Backport 1.3]Transform maxclauses fix (#477) (#482) (#489)
Browse files Browse the repository at this point in the history
* Transform maxclauses fix (#477) (#482)

* transform maxClauses fix

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* added bucket log to track processed buckets

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* various renames/changes

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* fixed detekt issues

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* added comments to test

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* removed debug logging

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* empty commit to trigger checks

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* reduced pageSize to 1 in few ITs to avoid flaky tests; fixed bug where pagesProcessed was calculated incorrectly

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* reverted pagesProcessed change; fixed few ITs

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>
(cherry picked from commit 7475cfd)

Co-authored-by: Petar Dzepina <petar.dzepina@gmail.com>
(cherry picked from commit b39bd64)

* Update kotlin 1.6.10 to fix kotlin-stdlib-1.4.0 vulnerability

Signed-off-by: Angie Zhang <langelzh@amazon.com>

* Update kotlin 1.4.32 to fix kotlin-stdlib-1.4.0 vulnerability

Signed-off-by: Angie Zhang <langelzh@amazon.com>

Signed-off-by: Angie Zhang <langelzh@amazon.com>
Co-authored-by: opensearch-trigger-bot[bot] <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com>
  • Loading branch information
Angie Zhang and opensearch-trigger-bot[bot] authored Oct 4, 2022
1 parent 8b39773 commit 73161b5
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 21 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ buildscript {
notification_version = System.getProperty("notification.version", opensearch_build)
common_utils_version = System.getProperty("common_utils.version", opensearch_build)
job_scheduler_version = System.getProperty("job_scheduler_version.version", opensearch_build)
kotlin_version = System.getProperty("kotlin.version", "1.4.0")
kotlin_version = System.getProperty("kotlin.version", "1.4.32")

opensearch_no_snapshot = opensearch_version.replace("-SNAPSHOT","")
job_scheduler_no_snapshot = job_scheduler_version.replace("-SNAPSHOT","")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.transform

import java.math.BigInteger
import java.security.MessageDigest

class TransformProcessedBucketLog {

companion object {
const val MAX_SIZE = 100_000_000
const val HEX_RADIX = 16
}

private var processedBuckets: MutableSet<String> = HashSet()

fun addBuckets(buckets: List<Map<String, Any>>) {
buckets.forEach {
addBucket(it)
}
}

fun addBucket(bucket: Map<String, Any>) {
if (processedBuckets.size >= MAX_SIZE) return
processedBuckets.add(computeBucketHash(bucket))
}

fun isProcessed(bucket: Map<String, Any>): Boolean {
return processedBuckets.contains(computeBucketHash(bucket))
}

fun isNotProcessed(bucket: Map<String, Any>) = !isProcessed(bucket)

fun computeBucketHash(bucket: Map<String, Any>): String {
val md5Crypt = MessageDigest.getInstance("MD5")
bucket.entries.sortedBy { it.key }.also {
it.forEach { entry ->
md5Crypt.update(
if (entry.value == null) "null".toByteArray()
else entry.value.toString().toByteArray()
)
}
}
return BigInteger(1, md5Crypt.digest()).toString(HEX_RADIX)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ object TransformRunner :
TimeValue.timeValueMillis(TransformSettings.DEFAULT_RENEW_LOCK_RETRY_DELAY),
TransformSettings.DEFAULT_RENEW_LOCK_RETRY_COUNT
)

var attemptedToIndex = false
val transformProcessedBucketLog = TransformProcessedBucketLog()
var bucketsToTransform = BucketsToTransform(HashSet(), metadata)
var lock = acquireLockForScheduledJob(transform, context, backoffPolicy)
try {
Expand All @@ -134,7 +133,7 @@ object TransformRunner :
currentMetadata = validatedMetadata
return
}
if (transform.continuous && (bucketsToTransform.shardsToSearch == null || bucketsToTransform.currentShard != null)) {
if (transform.continuous) {
// If we have not populated the list of shards to search, do so now
if (bucketsToTransform.shardsToSearch == null) {
// Note the timestamp when we got the shard global checkpoints to the user may know what data is included
Expand All @@ -145,11 +144,29 @@ object TransformRunner :
newGlobalCheckpoints
)
}
bucketsToTransform = getBucketsToTransformIteration(transform, bucketsToTransform)
currentMetadata = bucketsToTransform.metadata
// If there are shards to search do it here
if (bucketsToTransform.currentShard != null) {
// Computes aggregation on modified documents for current shard to get modified buckets
bucketsToTransform = getBucketsToTransformIteration(transform, bucketsToTransform).also {
currentMetadata = it.metadata
}
// Filter out already processed buckets
val modifiedBuckets = bucketsToTransform.modifiedBuckets.filter {
transformProcessedBucketLog.isNotProcessed(it)
}.toMutableSet()
// Recompute modified buckets and update them in targetIndex
currentMetadata = recomputeModifiedBuckets(transform, currentMetadata, modifiedBuckets)
// Add processed buckets to 'processed set' so that we don't try to reprocess them again
transformProcessedBucketLog.addBuckets(modifiedBuckets.toList())
// Update TransformMetadata
currentMetadata = transformMetadataService.writeMetadata(currentMetadata, true)
bucketsToTransform = bucketsToTransform.copy(metadata = currentMetadata)
}
} else {
currentMetadata = executeTransformIteration(transform, currentMetadata, bucketsToTransform.modifiedBuckets)
attemptedToIndex = true
// Computes buckets from source index and stores them in targetIndex as transform docs
currentMetadata = computeBucketsIteration(transform, currentMetadata)
// Update TransformMetadata
currentMetadata = transformMetadataService.writeMetadata(currentMetadata, true)
}
// we attempt to renew lock for every loop of transform
val renewedLock = renewLockForScheduledJob(context, lock, backoffPolicy)
Expand All @@ -159,7 +176,7 @@ object TransformRunner :
lock = renewedLock
}
}
} while (bucketsToTransform.currentShard != null || currentMetadata.afterKey != null || !attemptedToIndex)
} while (bucketsToTransform.currentShard != null || currentMetadata.afterKey != null)
} catch (e: Exception) {
logger.error("Failed to execute the transform job [${transform.id}] because of exception [${e.localizedMessage}]", e)
currentMetadata = currentMetadata.copy(
Expand Down Expand Up @@ -189,6 +206,8 @@ object TransformRunner :
private suspend fun getBucketsToTransformIteration(transform: Transform, bucketsToTransform: BucketsToTransform): BucketsToTransform {
var currentBucketsToTransform = bucketsToTransform
val currentShard = bucketsToTransform.currentShard
// Clear modified buckets from previous iteration
currentBucketsToTransform.modifiedBuckets.clear()

if (currentShard != null) {
val shardLevelModifiedBuckets = withTransformSecurityContext(transform) {
Expand Down Expand Up @@ -236,32 +255,59 @@ object TransformRunner :
* the range query will not precisely specify the modified buckets. As a result, we increase the range for the query and then filter out
* the unintended buckets as part of the composite search step.
*/
private suspend fun executeTransformIteration(
private suspend fun computeBucketsIteration(
transform: Transform,
metadata: TransformMetadata,
): TransformMetadata {

val transformSearchResult = withTransformSecurityContext(transform) {
transformSearchService.executeCompositeSearch(
transform,
metadata.afterKey,
null
)
}
val indexTimeInMillis = withTransformSecurityContext(transform) {
transformIndexer.index(transformSearchResult.docsToIndex)
}
val afterKey = transformSearchResult.afterKey
val stats = transformSearchResult.stats
val updatedStats = stats.copy(
pagesProcessed = stats.pagesProcessed,
indexTimeInMillis = stats.indexTimeInMillis + indexTimeInMillis,
documentsIndexed = transformSearchResult.docsToIndex.size.toLong()
)
return metadata.mergeStats(updatedStats).copy(
afterKey = afterKey,
lastUpdatedAt = Instant.now(),
status = if (afterKey == null) TransformMetadata.Status.FINISHED else TransformMetadata.Status.STARTED
)
}

private suspend fun recomputeModifiedBuckets(
transform: Transform,
metadata: TransformMetadata,
modifiedBuckets: MutableSet<Map<String, Any>>
): TransformMetadata {
val updatedMetadata = if (!transform.continuous || modifiedBuckets.isNotEmpty()) {
val updatedMetadata = if (modifiedBuckets.isNotEmpty()) {
val transformSearchResult = withTransformSecurityContext(transform) {
transformSearchService.executeCompositeSearch(transform, metadata.afterKey, if (transform.continuous) modifiedBuckets else null)
transformSearchService.executeCompositeSearch(transform, null, modifiedBuckets)
}
val indexTimeInMillis = withTransformSecurityContext(transform) {
transformIndexer.index(transformSearchResult.docsToIndex)
}
val afterKey = transformSearchResult.afterKey
val stats = transformSearchResult.stats
val updatedStats = stats.copy(
pagesProcessed = if (transform.continuous) 0 else stats.pagesProcessed,
indexTimeInMillis = stats.indexTimeInMillis + indexTimeInMillis,
documentsIndexed = transformSearchResult.docsToIndex.size.toLong()
)
metadata.mergeStats(updatedStats).copy(
afterKey = afterKey,
lastUpdatedAt = Instant.now(),
status = if (afterKey == null && !transform.continuous) TransformMetadata.Status.FINISHED else TransformMetadata.Status.STARTED
status = TransformMetadata.Status.STARTED
)
} else metadata.copy(lastUpdatedAt = Instant.now(), status = TransformMetadata.Status.STARTED)
return transformMetadataService.writeMetadata(updatedMetadata, true)
return updatedMetadata
}

private suspend fun <T> withTransformSecurityContext(transform: Transform, block: suspend CoroutineScope.() -> T): T {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.opensearch.indexmanagement.transform.model.TransformSearchResult
import org.opensearch.indexmanagement.transform.model.TransformStats
import org.opensearch.indexmanagement.transform.settings.TransformSettings.Companion.TRANSFORM_JOB_SEARCH_BACKOFF_COUNT
import org.opensearch.indexmanagement.transform.settings.TransformSettings.Companion.TRANSFORM_JOB_SEARCH_BACKOFF_MILLIS
import org.opensearch.indexmanagement.util.IndexUtils.Companion.LUCENE_MAX_CLAUSES
import org.opensearch.indexmanagement.util.IndexUtils.Companion.ODFE_MAGIC_NULL
import org.opensearch.indexmanagement.util.IndexUtils.Companion.hashToFixedSize
import org.opensearch.rest.RestStatus
Expand Down Expand Up @@ -112,10 +113,11 @@ class TransformSearchService(
suspend fun getShardLevelModifiedBuckets(transform: Transform, afterKey: Map<String, Any>?, currentShard: ShardNewDocuments): BucketSearchResult {
try {
var retryAttempt = 0
var pageSize = calculateMaxPageSize(transform)
val searchResponse = backoffPolicy.retry(logger) {
val pageSizeDecay = 2f.pow(retryAttempt++)
client.suspendUntil { listener: ActionListener<SearchResponse> ->
val pageSize = max(1, transform.pageSize.div(pageSizeDecay.toInt()))
pageSize = max(1, pageSize.div(pageSizeDecay.toInt()))
if (retryAttempt > 1) {
logger.debug(
"Attempt [${retryAttempt - 1}] to get modified buckets for transform [${transform.id}]. Attempting " +
Expand All @@ -139,19 +141,33 @@ class TransformSearchService(
}
}

/**
* Apache Lucene has maxClauses limit which we could trip during recomputing of modified buckets(continuous transform)
* due to trying to match too many bucket fields. To avoid this, we control how many buckets we recompute at a time(pageSize)
*/
private fun calculateMaxPageSize(transform: Transform): Int {
return minOf(transform.pageSize, LUCENE_MAX_CLAUSES / (transform.groups.size + 1))
}

@Suppress("RethrowCaughtException")
suspend fun executeCompositeSearch(
transform: Transform,
afterKey: Map<String, Any>? = null,
modifiedBuckets: MutableSet<Map<String, Any>>? = null
): TransformSearchResult {
try {
var pageSize: Int =
if (modifiedBuckets.isNullOrEmpty())
transform.pageSize
else
modifiedBuckets.size

var retryAttempt = 0
val searchResponse = backoffPolicy.retry(logger) {
// TODO: Should we store the value of the past successful page size (?)
val pageSizeDecay = 2f.pow(retryAttempt++)
client.suspendUntil { listener: ActionListener<SearchResponse> ->
val pageSize = max(1, transform.pageSize.div(pageSizeDecay.toInt()))
pageSize = max(1, pageSize.div(pageSizeDecay.toInt()))
if (retryAttempt > 1) {
logger.debug(
"Attempt [${retryAttempt - 1}] of composite search failed for transform [${transform.id}]. Attempting " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class IndexUtils {
const val SCHEMA_VERSION = "schema_version"
const val DEFAULT_SCHEMA_VERSION = 1L
const val ODFE_MAGIC_NULL = "#ODFE-MAGIC-NULL-MAGIC-ODFE#"
const val LUCENE_MAX_CLAUSES = 1024
private const val BYTE_ARRAY_SIZE = 16
private const val DOCUMENT_ID_SEED = 72390L
val logger = LogManager.getLogger(IndexUtils::class.java)
Expand Down
Loading

0 comments on commit 73161b5

Please sign in to comment.