diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt index 2bbfd6092..988fa9643 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt @@ -214,7 +214,7 @@ fun Rollup.rewriteAggregationBuilder(aggregationBuilder: AggregationBuilder): Ag } is AvgAggregationBuilder -> { ScriptedMetricAggregationBuilder(aggregationBuilder.name) - .initScript(Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, "state.sums = 0; state.counts = 0;", emptyMap())) + .initScript(Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, "state.sums = 0D; state.counts = 0L;", emptyMap())) .mapScript( Script( ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, @@ -227,7 +227,7 @@ fun Rollup.rewriteAggregationBuilder(aggregationBuilder: AggregationBuilder): Ag .combineScript( Script( ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, - "def d = new long[2]; d[0] = state.sums; d[1] = state.counts; return d", emptyMap() + "def d = new double[2]; d[0] = state.sums; d[1] = state.counts; return d", emptyMap() ) ) .reduceScript( diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt index 87997e3a0..60898e7bb 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt @@ -25,6 +25,7 @@ import org.opensearch.indexmanagement.rollup.randomRollup import org.opensearch.indexmanagement.rollup.settings.RollupSettings.Companion.ROLLUP_SEARCH_BACKOFF_COUNT import org.opensearch.indexmanagement.waitFor import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule +import org.opensearch.rest.RestRequest import org.opensearch.rest.RestStatus import java.time.Instant import java.time.temporal.ChronoUnit @@ -67,6 +68,75 @@ class RollupRunnerIT : RollupRestTestCase() { } } + fun `test rollup with avg metric`() { + val sourceIdxTestName = "source_idx_test" + val targetIdxTestName = "target_idx_test" + val propertyName = "passenger_count" + val avgMetricName = "avg_passenger_count" + + generateNYCTaxiData(sourceIdxTestName) + + val rollup = Rollup( + id = "rollup_test", + schemaVersion = 1L, + enabled = true, + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + jobLastUpdatedTime = Instant.now(), + jobEnabledTime = Instant.now(), + description = "basic stats test", + sourceIndex = sourceIdxTestName, + targetIndex = targetIdxTestName, + metadataID = null, + roles = emptyList(), + pageSize = 100, + delay = 0, + continuous = false, + dimensions = listOf(DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h")), + metrics = listOf( + RollupMetrics(sourceField = propertyName, targetField = propertyName, metrics = listOf(Average())) + ) + ).let { createRollup(it, it.id) } + + updateRollupStartTime(rollup) + + waitFor { assertTrue("Target rollup index was not created", indexExists(rollup.targetIndex)) } + + waitFor { + val rollupJob = getRollup(rollupId = rollup.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + + // Term query + var req = """ + { + "size": 0, + "query": { + "match_all": {} + }, + "aggs": { + "$avgMetricName": { + "avg": { + "field": "$propertyName" + } + } + } + } + """.trimIndent() + var rawRes = client().makeRequest(RestRequest.Method.POST.name, "/$sourceIdxTestName/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + var rollupRes = client().makeRequest(RestRequest.Method.POST.name, "/$targetIdxTestName/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rollupRes.restStatus() == RestStatus.OK) + var rawAggRes = rawRes.asMap()["aggregations"] as Map> + var rollupAggRes = rollupRes.asMap()["aggregations"] as Map> + assertEquals( + "Source and rollup index did not return same avg results", + rawAggRes.getValue(avgMetricName)["value"], + rollupAggRes.getValue(avgMetricName)["value"] + ) + } + } + fun `test metadata is created for data stream rollup job when none exists`() { val dataStreamName = "test-data-stream"