Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.3] 64: Added rounding when using aggreagate script for avg metric. Added… #553

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ fun Rollup.rewriteAggregationBuilder(aggregationBuilder: AggregationBuilder): Ag
}
is AvgAggregationBuilder -> {
ScriptedMetricAggregationBuilder(aggregationBuilder.name)
.initScript(Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, "state.sums = 0; state.counts = 0;", emptyMap()))
.initScript(Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, "state.sums = 0D; state.counts = 0L;", emptyMap()))
.mapScript(
Script(
ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG,
Expand All @@ -227,7 +227,7 @@ fun Rollup.rewriteAggregationBuilder(aggregationBuilder: AggregationBuilder): Ag
.combineScript(
Script(
ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG,
"def d = new long[2]; d[0] = state.sums; d[1] = state.counts; return d", emptyMap()
"def d = new double[2]; d[0] = state.sums; d[1] = state.counts; return d", emptyMap()
)
)
.reduceScript(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.opensearch.indexmanagement.rollup.randomRollup
import org.opensearch.indexmanagement.rollup.settings.RollupSettings.Companion.ROLLUP_SEARCH_BACKOFF_COUNT
import org.opensearch.indexmanagement.waitFor
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule
import org.opensearch.rest.RestRequest
import org.opensearch.rest.RestStatus
import java.time.Instant
import java.time.temporal.ChronoUnit
Expand Down Expand Up @@ -67,6 +68,75 @@ class RollupRunnerIT : RollupRestTestCase() {
}
}

fun `test rollup with avg metric`() {
val sourceIdxTestName = "source_idx_test"
val targetIdxTestName = "target_idx_test"
val propertyName = "passenger_count"
val avgMetricName = "avg_passenger_count"

generateNYCTaxiData(sourceIdxTestName)

val rollup = Rollup(
id = "rollup_test",
schemaVersion = 1L,
enabled = true,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
jobLastUpdatedTime = Instant.now(),
jobEnabledTime = Instant.now(),
description = "basic stats test",
sourceIndex = sourceIdxTestName,
targetIndex = targetIdxTestName,
metadataID = null,
roles = emptyList(),
pageSize = 100,
delay = 0,
continuous = false,
dimensions = listOf(DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h")),
metrics = listOf(
RollupMetrics(sourceField = propertyName, targetField = propertyName, metrics = listOf(Average()))
)
).let { createRollup(it, it.id) }

updateRollupStartTime(rollup)

waitFor { assertTrue("Target rollup index was not created", indexExists(rollup.targetIndex)) }

waitFor {
val rollupJob = getRollup(rollupId = rollup.id)
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)

// Term query
var req = """
{
"size": 0,
"query": {
"match_all": {}
},
"aggs": {
"$avgMetricName": {
"avg": {
"field": "$propertyName"
}
}
}
}
""".trimIndent()
var rawRes = client().makeRequest(RestRequest.Method.POST.name, "/$sourceIdxTestName/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(rawRes.restStatus() == RestStatus.OK)
var rollupRes = client().makeRequest(RestRequest.Method.POST.name, "/$targetIdxTestName/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(rollupRes.restStatus() == RestStatus.OK)
var rawAggRes = rawRes.asMap()["aggregations"] as Map<String, Map<String, Any>>
var rollupAggRes = rollupRes.asMap()["aggregations"] as Map<String, Map<String, Any>>
assertEquals(
"Source and rollup index did not return same avg results",
rawAggRes.getValue(avgMetricName)["value"],
rollupAggRes.getValue(avgMetricName)["value"]
)
}
}

fun `test metadata is created for data stream rollup job when none exists`() {
val dataStreamName = "test-data-stream"

Expand Down