Skip to content

Commit

Permalink
[Backport 2.x] Transform maxclauses fix (#477) (#487)
Browse files Browse the repository at this point in the history
* Transform maxclauses fix (#477)

* transform maxClauses fix

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* added bucket log to track processed buckets

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* various renames/changes

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* fixed detekt issues

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* added comments to test

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* removed debug logging

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* empty commit to trigger checks

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* reduced pageSize to 1 in few ITs to avoid flaky tests; fixed bug where pagesProcessed was calculated incorrectly

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* reverted pagesProcessed change; fixed few ITs

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>
(cherry picked from commit 7475cfd)

* Update TransformRunnerIT.kt

Co-authored-by: Petar Dzepina <petar.dzepina@gmail.com>
  • Loading branch information
Angie-Zhang and petardz authored Sep 2, 2022
1 parent 4e0d86f commit c0138c4
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.transform

import java.math.BigInteger
import java.security.MessageDigest

class TransformProcessedBucketLog {

companion object {
const val MAX_SIZE = 100_000_000
const val HEX_RADIX = 16
}

private var processedBuckets: MutableSet<String> = HashSet()

fun addBuckets(buckets: List<Map<String, Any>>) {
buckets.forEach {
addBucket(it)
}
}

fun addBucket(bucket: Map<String, Any>) {
if (processedBuckets.size >= MAX_SIZE) return
processedBuckets.add(computeBucketHash(bucket))
}

fun isProcessed(bucket: Map<String, Any>): Boolean {
return processedBuckets.contains(computeBucketHash(bucket))
}

fun isNotProcessed(bucket: Map<String, Any>) = !isProcessed(bucket)

fun computeBucketHash(bucket: Map<String, Any>): String {
val md5Crypt = MessageDigest.getInstance("MD5")
bucket.entries.sortedBy { it.key }.also {
it.forEach { entry ->
md5Crypt.update(
if (entry.value == null) "null".toByteArray()
else entry.value.toString().toByteArray()
)
}
}
return BigInteger(1, md5Crypt.digest()).toString(HEX_RADIX)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ object TransformRunner :
TimeValue.timeValueMillis(TransformSettings.DEFAULT_RENEW_LOCK_RETRY_DELAY),
TransformSettings.DEFAULT_RENEW_LOCK_RETRY_COUNT
)

var attemptedToIndex = false
val transformProcessedBucketLog = TransformProcessedBucketLog()
var bucketsToTransform = BucketsToTransform(HashSet(), metadata)
var lock = acquireLockForScheduledJob(transform, context, backoffPolicy)
try {
Expand All @@ -134,7 +133,7 @@ object TransformRunner :
currentMetadata = validatedMetadata
return
}
if (transform.continuous && (bucketsToTransform.shardsToSearch == null || bucketsToTransform.currentShard != null)) {
if (transform.continuous) {
// If we have not populated the list of shards to search, do so now
if (bucketsToTransform.shardsToSearch == null) {
// Note the timestamp when we got the shard global checkpoints to the user may know what data is included
Expand All @@ -145,11 +144,29 @@ object TransformRunner :
newGlobalCheckpoints
)
}
bucketsToTransform = getBucketsToTransformIteration(transform, bucketsToTransform)
currentMetadata = bucketsToTransform.metadata
// If there are shards to search do it here
if (bucketsToTransform.currentShard != null) {
// Computes aggregation on modified documents for current shard to get modified buckets
bucketsToTransform = getBucketsToTransformIteration(transform, bucketsToTransform).also {
currentMetadata = it.metadata
}
// Filter out already processed buckets
val modifiedBuckets = bucketsToTransform.modifiedBuckets.filter {
transformProcessedBucketLog.isNotProcessed(it)
}.toMutableSet()
// Recompute modified buckets and update them in targetIndex
currentMetadata = recomputeModifiedBuckets(transform, currentMetadata, modifiedBuckets)
// Add processed buckets to 'processed set' so that we don't try to reprocess them again
transformProcessedBucketLog.addBuckets(modifiedBuckets.toList())
// Update TransformMetadata
currentMetadata = transformMetadataService.writeMetadata(currentMetadata, true)
bucketsToTransform = bucketsToTransform.copy(metadata = currentMetadata)
}
} else {
currentMetadata = executeTransformIteration(transform, currentMetadata, bucketsToTransform.modifiedBuckets)
attemptedToIndex = true
// Computes buckets from source index and stores them in targetIndex as transform docs
currentMetadata = computeBucketsIteration(transform, currentMetadata)
// Update TransformMetadata
currentMetadata = transformMetadataService.writeMetadata(currentMetadata, true)
}
// we attempt to renew lock for every loop of transform
val renewedLock = renewLockForScheduledJob(context, lock, backoffPolicy)
Expand All @@ -159,7 +176,7 @@ object TransformRunner :
lock = renewedLock
}
}
} while (bucketsToTransform.currentShard != null || currentMetadata.afterKey != null || !attemptedToIndex)
} while (bucketsToTransform.currentShard != null || currentMetadata.afterKey != null)
} catch (e: Exception) {
logger.error("Failed to execute the transform job [${transform.id}] because of exception [${e.localizedMessage}]", e)
currentMetadata = currentMetadata.copy(
Expand Down Expand Up @@ -189,6 +206,8 @@ object TransformRunner :
private suspend fun getBucketsToTransformIteration(transform: Transform, bucketsToTransform: BucketsToTransform): BucketsToTransform {
var currentBucketsToTransform = bucketsToTransform
val currentShard = bucketsToTransform.currentShard
// Clear modified buckets from previous iteration
currentBucketsToTransform.modifiedBuckets.clear()

if (currentShard != null) {
val shardLevelModifiedBuckets = withTransformSecurityContext(transform) {
Expand Down Expand Up @@ -236,32 +255,59 @@ object TransformRunner :
* the range query will not precisely specify the modified buckets. As a result, we increase the range for the query and then filter out
* the unintended buckets as part of the composite search step.
*/
private suspend fun executeTransformIteration(
private suspend fun computeBucketsIteration(
transform: Transform,
metadata: TransformMetadata,
): TransformMetadata {

val transformSearchResult = withTransformSecurityContext(transform) {
transformSearchService.executeCompositeSearch(
transform,
metadata.afterKey,
null
)
}
val indexTimeInMillis = withTransformSecurityContext(transform) {
transformIndexer.index(transformSearchResult.docsToIndex)
}
val afterKey = transformSearchResult.afterKey
val stats = transformSearchResult.stats
val updatedStats = stats.copy(
pagesProcessed = stats.pagesProcessed,
indexTimeInMillis = stats.indexTimeInMillis + indexTimeInMillis,
documentsIndexed = transformSearchResult.docsToIndex.size.toLong()
)
return metadata.mergeStats(updatedStats).copy(
afterKey = afterKey,
lastUpdatedAt = Instant.now(),
status = if (afterKey == null) TransformMetadata.Status.FINISHED else TransformMetadata.Status.STARTED
)
}

private suspend fun recomputeModifiedBuckets(
transform: Transform,
metadata: TransformMetadata,
modifiedBuckets: MutableSet<Map<String, Any>>
): TransformMetadata {
val updatedMetadata = if (!transform.continuous || modifiedBuckets.isNotEmpty()) {
val updatedMetadata = if (modifiedBuckets.isNotEmpty()) {
val transformSearchResult = withTransformSecurityContext(transform) {
transformSearchService.executeCompositeSearch(transform, metadata.afterKey, if (transform.continuous) modifiedBuckets else null)
transformSearchService.executeCompositeSearch(transform, null, modifiedBuckets)
}
val indexTimeInMillis = withTransformSecurityContext(transform) {
transformIndexer.index(transformSearchResult.docsToIndex)
}
val afterKey = transformSearchResult.afterKey
val stats = transformSearchResult.stats
val updatedStats = stats.copy(
pagesProcessed = if (transform.continuous) 0 else stats.pagesProcessed,
indexTimeInMillis = stats.indexTimeInMillis + indexTimeInMillis,
documentsIndexed = transformSearchResult.docsToIndex.size.toLong()
)
metadata.mergeStats(updatedStats).copy(
afterKey = afterKey,
lastUpdatedAt = Instant.now(),
status = if (afterKey == null && !transform.continuous) TransformMetadata.Status.FINISHED else TransformMetadata.Status.STARTED
status = TransformMetadata.Status.STARTED
)
} else metadata.copy(lastUpdatedAt = Instant.now(), status = TransformMetadata.Status.STARTED)
return transformMetadataService.writeMetadata(updatedMetadata, true)
return updatedMetadata
}

private suspend fun <T> withTransformSecurityContext(transform: Transform, block: suspend CoroutineScope.() -> T): T {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.opensearch.indexmanagement.transform.model.TransformSearchResult
import org.opensearch.indexmanagement.transform.model.TransformStats
import org.opensearch.indexmanagement.transform.settings.TransformSettings.Companion.TRANSFORM_JOB_SEARCH_BACKOFF_COUNT
import org.opensearch.indexmanagement.transform.settings.TransformSettings.Companion.TRANSFORM_JOB_SEARCH_BACKOFF_MILLIS
import org.opensearch.indexmanagement.util.IndexUtils.Companion.LUCENE_MAX_CLAUSES
import org.opensearch.indexmanagement.util.IndexUtils.Companion.ODFE_MAGIC_NULL
import org.opensearch.indexmanagement.util.IndexUtils.Companion.hashToFixedSize
import org.opensearch.rest.RestStatus
Expand Down Expand Up @@ -112,10 +113,11 @@ class TransformSearchService(
suspend fun getShardLevelModifiedBuckets(transform: Transform, afterKey: Map<String, Any>?, currentShard: ShardNewDocuments): BucketSearchResult {
try {
var retryAttempt = 0
var pageSize = calculateMaxPageSize(transform)
val searchResponse = backoffPolicy.retry(logger) {
val pageSizeDecay = 2f.pow(retryAttempt++)
client.suspendUntil { listener: ActionListener<SearchResponse> ->
val pageSize = max(1, transform.pageSize.div(pageSizeDecay.toInt()))
pageSize = max(1, pageSize.div(pageSizeDecay.toInt()))
if (retryAttempt > 1) {
logger.debug(
"Attempt [${retryAttempt - 1}] to get modified buckets for transform [${transform.id}]. Attempting " +
Expand All @@ -139,19 +141,33 @@ class TransformSearchService(
}
}

/**
* Apache Lucene has maxClauses limit which we could trip during recomputing of modified buckets(continuous transform)
* due to trying to match too many bucket fields. To avoid this, we control how many buckets we recompute at a time(pageSize)
*/
private fun calculateMaxPageSize(transform: Transform): Int {
return minOf(transform.pageSize, LUCENE_MAX_CLAUSES / (transform.groups.size + 1))
}

@Suppress("RethrowCaughtException")
suspend fun executeCompositeSearch(
transform: Transform,
afterKey: Map<String, Any>? = null,
modifiedBuckets: MutableSet<Map<String, Any>>? = null
): TransformSearchResult {
try {
var pageSize: Int =
if (modifiedBuckets.isNullOrEmpty())
transform.pageSize
else
modifiedBuckets.size

var retryAttempt = 0
val searchResponse = backoffPolicy.retry(logger) {
// TODO: Should we store the value of the past successful page size (?)
val pageSizeDecay = 2f.pow(retryAttempt++)
client.suspendUntil { listener: ActionListener<SearchResponse> ->
val pageSize = max(1, transform.pageSize.div(pageSizeDecay.toInt()))
pageSize = max(1, pageSize.div(pageSizeDecay.toInt()))
if (retryAttempt > 1) {
logger.debug(
"Attempt [${retryAttempt - 1}] of composite search failed for transform [${transform.id}]. Attempting " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class IndexUtils {
const val SCHEMA_VERSION = "schema_version"
const val DEFAULT_SCHEMA_VERSION = 1L
const val ODFE_MAGIC_NULL = "#ODFE-MAGIC-NULL-MAGIC-ODFE#"
const val LUCENE_MAX_CLAUSES = 1024
private const val BYTE_ARRAY_SIZE = 16
private const val DOCUMENT_ID_SEED = 72390L

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package org.opensearch.indexmanagement.transform

import org.apache.http.entity.ContentType
import org.apache.http.entity.StringEntity
import org.opensearch.client.Request
import org.opensearch.client.RequestOptions
import org.opensearch.common.settings.Settings
import org.opensearch.index.query.TermQueryBuilder
import org.opensearch.indexmanagement.common.model.dimension.DateHistogram
Expand Down Expand Up @@ -44,7 +46,7 @@ class TransformRunnerIT : TransformRestTestCase() {
sourceIndex = "transform-source-index",
targetIndex = "transform-target-index",
roles = emptyList(),
pageSize = 100,
pageSize = 1,
groups = listOf(
Terms(sourceField = "store_and_fwd_flag", targetField = "flag")
)
Expand All @@ -62,7 +64,7 @@ class TransformRunnerIT : TransformRestTestCase() {
transformMetadata
}

assertEquals("More than expected pages processed", 2L, metadata.stats.pagesProcessed)
assertEquals("More than expected pages processed", 3L, metadata.stats.pagesProcessed)
assertEquals("More than expected documents indexed", 2L, metadata.stats.documentsIndexed)
assertEquals("More than expected documents processed", 5000L, metadata.stats.documentsProcessed)
assertTrue("Doesn't capture indexed time", metadata.stats.indexTimeInMillis > 0)
Expand All @@ -84,7 +86,7 @@ class TransformRunnerIT : TransformRestTestCase() {
sourceIndex = "transform-source-index",
targetIndex = "transform-target-index",
roles = emptyList(),
pageSize = 100,
pageSize = 1,
groups = listOf(
Terms(sourceField = "store_and_fwd_flag", targetField = "flag")
),
Expand Down Expand Up @@ -950,6 +952,70 @@ class TransformRunnerIT : TransformRestTestCase() {
}
}

fun `test continuous transform with a lot of buckets`() {

// Create index with high cardinality fields
val sourceIndex = "index_with_lots_of_buckets"

val requestBody: StringBuilder = StringBuilder(100000)
for (i in 1..2000) {
val docPayload: String = """
{
"id1": "$i",
"id2": "${i + 1}"
}
""".trimIndent().replace(Regex("[\n\r\\s]"), "")

requestBody.append("{\"create\":{}}\n").append(docPayload).append('\n')
}

createIndexAndBulkInsert(sourceIndex, Settings.EMPTY, null, null, requestBody.toString())
// Source index will have total of 2000 buckets
val transform = Transform(
id = "transform_index_with_lots_of_buckets",
schemaVersion = 1L,
enabled = true,
enabledAt = Instant.now(),
updatedAt = Instant.now(),
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
description = "test transform",
metadataId = null,
sourceIndex = "index_with_lots_of_buckets",
targetIndex = "index_with_lots_of_buckets_transformed",
roles = emptyList(),
pageSize = 1000,
groups = listOf(
Terms(sourceField = "id1.keyword", targetField = "id1"),
Terms(sourceField = "id2.keyword", targetField = "id2")
),
continuous = true
).let { createTransform(it, it.id) }

updateTransformStartTime(transform)

waitFor { assertTrue("Target transform index was not created", indexExists(transform.targetIndex)) }

val firstIterationMetadata = waitFor {
val job = getTransform(transformId = transform.id)
assertNotNull("Transform job doesn't have metadata set", job.metadataId)
val transformMetadata = getTransformMetadata(job.metadataId!!)
assertEquals("Transform did not complete iteration or had incorrect number of documents processed", 2000, transformMetadata.stats.documentsProcessed)
assertEquals("Transform did not complete iteration", null, transformMetadata.afterKey)
assertNotNull("Continuous stats were not updated", transformMetadata.continuousStats)
assertNotNull("Continuous stats were set, but lastTimestamp was not", transformMetadata.continuousStats!!.lastTimestamp)
transformMetadata
}

assertEquals("Not the expected transform status", TransformMetadata.Status.STARTED, firstIterationMetadata.status)
assertEquals("Not the expected pages processed", 7, firstIterationMetadata.stats.pagesProcessed)
assertEquals("Not the expected documents indexed", 2000L, firstIterationMetadata.stats.documentsIndexed)
assertEquals("Not the expected documents processed", 2000L, firstIterationMetadata.stats.documentsProcessed)
assertTrue("Doesn't capture indexed time", firstIterationMetadata.stats.indexTimeInMillis > 0)
assertTrue("Didn't capture search time", firstIterationMetadata.stats.searchTimeInMillis > 0)

disableTransform(transform.id)
}

private fun getStrictMappings(): String {
return """
"dynamic": "strict",
Expand All @@ -967,4 +1033,21 @@ class TransformRunnerIT : TransformRestTestCase() {
assertIndexExists(indexName)
}
}

private fun createIndexAndBulkInsert(name: String, settings: Settings?, mapping: String?, aliases: String?, bulkData: String) {

if (settings != null || mapping != null || aliases != null) {
createIndex(name, settings, mapping, aliases)
}

val request = Request("POST", "/$name/_bulk/?refresh=true")
request.setJsonEntity(bulkData)
request.options = RequestOptions.DEFAULT.toBuilder().addHeader("content-type", "application/x-ndjson").build()
var res = client().performRequest(request)
assertEquals(RestStatus.OK, res.restStatus())

val refreshRequest = Request("POST", "/$name/_refresh")
res = client().performRequest(refreshRequest)
assertEquals(RestStatus.OK, res.restStatus())
}
}

0 comments on commit c0138c4

Please sign in to comment.