From 7550c7b0d46adf86b0a17da284b3c88f80f5cf00 Mon Sep 17 00:00:00 2001 From: Ashish Agrawal Date: Tue, 1 Mar 2022 09:02:05 -0800 Subject: [PATCH 1/3] Add connection to triggers for doc level alerting Signed-off-by: Ashish Agrawal --- .../org/opensearch/alerting/AlertingPlugin.kt | 4 +- .../org/opensearch/alerting/MonitorRunner.kt | 145 +++++++++++----- .../org/opensearch/alerting/TriggerService.kt | 35 +++- .../alerting/model/DocumentLevelTrigger.kt | 164 ++++++++++++++++++ .../model/DocumentLevelTriggerRunResult.kt | 66 +++++++ .../org/opensearch/alerting/model/Trigger.kt | 2 + .../DocumentLevelTriggerExecutionContext.kt | 42 +++++ .../alerting/script/TriggerScript.kt | 2 +- .../alerting/org.opensearch.alerting.txt | 19 +- .../org/opensearch/alerting/TestHelpers.kt | 41 +++++ .../alerting/model/WriteableTests.kt | 30 ++++ 11 files changed, 500 insertions(+), 50 deletions(-) create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentLevelTrigger.kt create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentLevelTriggerRunResult.kt create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/script/DocumentLevelTriggerExecutionContext.kt diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index a899e33a0..8a8f2f627 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -37,6 +37,7 @@ import org.opensearch.alerting.core.schedule.JobScheduler import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings import org.opensearch.alerting.core.settings.ScheduledJobSettings import org.opensearch.alerting.model.BucketLevelTrigger +import org.opensearch.alerting.model.DocumentLevelTrigger import org.opensearch.alerting.model.Monitor import org.opensearch.alerting.model.QueryLevelTrigger import org.opensearch.alerting.model.docLevelInput.DocLevelMonitorInput @@ -211,7 +212,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R SearchInput.XCONTENT_REGISTRY, DocLevelMonitorInput.XCONTENT_REGISTRY, QueryLevelTrigger.XCONTENT_REGISTRY, - BucketLevelTrigger.XCONTENT_REGISTRY + BucketLevelTrigger.XCONTENT_REGISTRY, + DocumentLevelTrigger.XCONTENT_REGISTRY ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt index 31f811be1..892df1455 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt @@ -30,6 +30,7 @@ import org.opensearch.alerting.model.Alert import org.opensearch.alerting.model.AlertingConfigAccessor import org.opensearch.alerting.model.BucketLevelTrigger import org.opensearch.alerting.model.BucketLevelTriggerRunResult +import org.opensearch.alerting.model.DocumentLevelTrigger import org.opensearch.alerting.model.Finding import org.opensearch.alerting.model.InputRunResults import org.opensearch.alerting.model.Monitor @@ -48,6 +49,7 @@ import org.opensearch.alerting.model.destination.DestinationContextFactory import org.opensearch.alerting.model.docLevelInput.DocLevelMonitorInput import org.opensearch.alerting.model.docLevelInput.DocLevelQuery import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext +import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext import org.opensearch.alerting.script.TriggerExecutionContext import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT @@ -757,39 +759,119 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { return } + val count: Int = lastRunContext["shards_count"] as Int + val updatedLastRunContext = lastRunContext.toMutableMap() + for (i: Int in 0 until count) { + val shard = i.toString() + val maxSeqNo: Long = getMaxSeqNo(index, shard) + updatedLastRunContext[shard] = maxSeqNo.toString() + } + + val queryDocIds = mutableMapOf>() + val docsToQueries = mutableMapOf>() for (query in queries) { - runForEachQuery(monitor, lastRunContext, index, query) + val matchingDocIds = runForEachQuery(monitor, lastRunContext, updatedLastRunContext, index, query) + queryDocIds[query] = matchingDocIds + matchingDocIds.forEach { + if (docsToQueries.containsKey(it)) { + docsToQueries[it]?.add(query.id) + } else { + docsToQueries[it] = mutableListOf(query.id) + } + } } + + val queryIds = queries.map { it.id } + + for (trigger in monitor.triggers) { + val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger as DocumentLevelTrigger) + val triggerResult = triggerService.runDocLevelTrigger(monitor, trigger, triggerCtx, docsToQueries, queryIds) + + logger.info("trigger results") + logger.info(triggerResult.triggeredDocs.toString()) + + queryDocIds.forEach { + val queryTriggeredDocs = it.value.intersect(triggerResult.triggeredDocs) + if (queryTriggeredDocs.isNotEmpty()) { + val findingId = createFindings(monitor, index, it.key, queryTriggeredDocs, trigger) + // TODO: check if need to create alert, if so create it and point it to FindingId + // TODO: run action as well, but this mat need to be throttled based on Mo's comment for bucket level alerting + } + } + } + + // TODO: Check for race condition against the update monitor api + // This does the update at the end in case of errors and makes sure all the queries are executed + val updatedMonitor = monitor.copy(lastRunContext = updatedLastRunContext) + // note: update has to called in serial for shards of a given index. + // make sure this is just updated for the specific query or at the end of all the queries + updateMonitor(client, xContentRegistry, settings, updatedMonitor) + } + + private fun createFindings( + monitor: Monitor, + index: String, + docLevelQuery: DocLevelQuery, + matchingDocIds: Set, + trigger: DocumentLevelTrigger + ): String { + val finding = Finding( + id = UUID.randomUUID().toString(), + relatedDocId = matchingDocIds.joinToString(","), + monitorId = monitor.id, + monitorName = monitor.name, + index = index, + queryId = docLevelQuery.id, + queryTags = docLevelQuery.tags, + severity = docLevelQuery.severity, + timestamp = Instant.now(), + triggerId = trigger.id, + triggerName = trigger.name + ) + + val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string() + // change this to debug. + logger.info("Findings: $findingStr") + + // todo: below is all hardcoded, temp code and added only to test. replace this with proper Findings index lifecycle management. + val indexRequest = IndexRequest(".opensearch-alerting-findings") + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .source(findingStr, XContentType.JSON) + + client.index(indexRequest).actionGet() + return finding.id } - private suspend fun runForEachQuery(monitor: Monitor, lastRunContext: MutableMap, index: String, query: DocLevelQuery) { + private suspend fun runForEachQuery( + monitor: Monitor, + lastRunContext: MutableMap, + updatedLastRunContext: MutableMap, + index: String, + query: DocLevelQuery + ): Set { val count: Int = lastRunContext["shards_count"] as Int + val matchingDocs = mutableSetOf() for (i: Int in 0 until count) { val shard = i.toString() try { logger.info("Monitor execution for shard: $shard") - val maxSeqNo: Long = getMaxSeqNo(index, shard) + val maxSeqNo: Long = updatedLastRunContext[shard].toString().toLong() logger.info("MaxSeqNo of shard_$shard is $maxSeqNo") - // todo: scope to optimize this: in prev seqno and current max seq no are same don't search. val hits: SearchHits = searchShard(index, shard, lastRunContext[shard].toString().toLongOrNull(), maxSeqNo, query.query) logger.info("Search hits for shard_$shard is: ${hits.hits.size}") if (hits.hits.isNotEmpty()) { - createFindings(monitor, index, query, hits) + logger.info("found matches") + matchingDocs.addAll(getAllDocIds(hits)) } - - logger.info("Updating monitor: ${monitor.id}") - lastRunContext[shard] = maxSeqNo.toString() - val updatedMonitor = monitor.copy(lastRunContext = lastRunContext) - // note: update has to called in serial for shards of a given index. - updateMonitor(client, xContentRegistry, settings, updatedMonitor) } catch (e: Exception) { logger.info("Failed to run for shard $shard. Error: ${e.message}") logger.debug("Failed to run for shard $shard", e) } } + return matchingDocs } // todo: add more validations. @@ -810,7 +892,7 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { private fun getShardsCount(index: String): Int { val allShards: List = clusterService.state().routingTable().allShards(index) - return allShards.size + return allShards.filter { it.primary() }.size } private fun createRunContext(index: String): HashMap { @@ -854,6 +936,9 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { } private fun searchShard(index: String, shard: String, prevSeqNo: Long?, maxSeqNo: Long, query: String): SearchHits { + if (prevSeqNo?.equals(maxSeqNo) == true) { + return SearchHits.empty() + } val boolQueryBuilder = BoolQueryBuilder() boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo)) boolQueryBuilder.must(QueryBuilders.queryStringQuery(query)) @@ -875,39 +960,7 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { return response.hits } - private fun createFindings(monitor: Monitor, index: String, docLevelQuery: DocLevelQuery, hits: SearchHits) { - val finding = Finding( - id = UUID.randomUUID().toString(), - relatedDocId = getAllDocIds(hits), - monitorId = monitor.id, - monitorName = monitor.name, - index = index, - queryId = docLevelQuery.id, - queryTags = docLevelQuery.tags, - severity = docLevelQuery.severity, - timestamp = Instant.now(), - triggerId = null, // todo: add once integrated with actions/triggers - triggerName = null // todo: add once integrated with actions/triggers - ) - - val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string() - // change this to debug. - logger.info("Findings: $findingStr") - - // todo: below is all hardcoded, temp code and added only to test. replace this with proper Findings index lifecycle management. - val indexRequest = IndexRequest(".opensearch-alerting-findings") - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .source(findingStr, XContentType.JSON) - - client.index(indexRequest).actionGet() - } - - private fun getAllDocIds(hits: SearchHits): String { - var sb = StringBuilder() - for (hit in hits) { - sb.append(hit.id) - sb.append(",") - } - return sb.substring(0, sb.length - 1) + private fun getAllDocIds(hits: SearchHits): List { + return hits.map { hit -> hit.id } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt index c94f0419f..ea879a42d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt @@ -12,10 +12,13 @@ import org.opensearch.alerting.model.AggregationResultBucket import org.opensearch.alerting.model.Alert import org.opensearch.alerting.model.BucketLevelTrigger import org.opensearch.alerting.model.BucketLevelTriggerRunResult +import org.opensearch.alerting.model.DocumentLevelTrigger +import org.opensearch.alerting.model.DocumentLevelTriggerRunResult import org.opensearch.alerting.model.Monitor import org.opensearch.alerting.model.QueryLevelTrigger import org.opensearch.alerting.model.QueryLevelTriggerRunResult import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext +import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext import org.opensearch.alerting.script.TriggerScript import org.opensearch.alerting.util.getBucketKeysHash @@ -23,7 +26,6 @@ import org.opensearch.script.ScriptService import org.opensearch.search.aggregations.Aggregation import org.opensearch.search.aggregations.Aggregations import org.opensearch.search.aggregations.support.AggregationPath -import java.lang.IllegalArgumentException /** Service that handles executing Triggers */ class TriggerService(val scriptService: ScriptService) { @@ -53,6 +55,37 @@ class TriggerService(val scriptService: ScriptService) { } } + // TODO: improve performance and support match all and match any + fun runDocLevelTrigger( + monitor: Monitor, + trigger: DocumentLevelTrigger, + ctx: DocumentLevelTriggerExecutionContext, + docsToQueries: Map>, + queryIds: List + ): DocumentLevelTriggerRunResult { + return try { + val triggeredDocs = mutableListOf() + + for (doc in docsToQueries.keys) { + val params = trigger.condition.params.toMutableMap() + for (queryId in queryIds) { + params[queryId] = docsToQueries[doc]!!.contains(queryId) + } + val triggered = scriptService.compile(trigger.condition, TriggerScript.CONTEXT) + .newInstance(params) + .execute(ctx) + logger.info("trigger val: $triggered") + if (triggered) triggeredDocs.add(doc) + } + + DocumentLevelTriggerRunResult(trigger.name, triggeredDocs, null) + } catch (e: Exception) { + logger.info("Error running script for monitor ${monitor.id}, trigger: ${trigger.id}", e) + // if the script fails we need to send an alert so set triggered = true + DocumentLevelTriggerRunResult(trigger.name, emptyList(), e) + } + } + @Suppress("UNCHECKED_CAST") fun runBucketLevelTrigger( monitor: Monitor, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentLevelTrigger.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentLevelTrigger.kt new file mode 100644 index 000000000..2cd2350a0 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentLevelTrigger.kt @@ -0,0 +1,164 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.model + +import org.opensearch.alerting.model.Trigger.Companion.ACTIONS_FIELD +import org.opensearch.alerting.model.Trigger.Companion.ID_FIELD +import org.opensearch.alerting.model.Trigger.Companion.NAME_FIELD +import org.opensearch.alerting.model.Trigger.Companion.SEVERITY_FIELD +import org.opensearch.alerting.model.action.Action +import org.opensearch.common.CheckedFunction +import org.opensearch.common.ParseField +import org.opensearch.common.UUIDs +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.NamedXContentRegistry +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParser.Token +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken +import org.opensearch.script.Script +import java.io.IOException + +/** + * A single-alert Trigger that uses Painless scripts which execute on the response of the Monitor input query to define + * alerting conditions. + */ +data class DocumentLevelTrigger( + override val id: String = UUIDs.base64UUID(), + override val name: String, + override val severity: String, + override val actions: List, + val condition: Script +// // which queries need to be true from the doc monitor input +// val queryIds: List +) : Trigger { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString(), // id + sin.readString(), // name + sin.readString(), // severity + sin.readList(::Action), // actions + Script(sin) + ) + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .startObject(DOCUMENT_LEVEL_TRIGGER_FIELD) + .field(ID_FIELD, id) + .field(NAME_FIELD, name) + .field(SEVERITY_FIELD, severity) + .startObject(CONDITION_FIELD) + .field(SCRIPT_FIELD, condition) + .endObject() +// .field(QUERY_IDS_FIELD, queryIds.toTypedArray()) + .field(ACTIONS_FIELD, actions.toTypedArray()) + .endObject() + .endObject() + return builder + } + + override fun name(): String { + return DOCUMENT_LEVEL_TRIGGER_FIELD + } + + /** Returns a representation of the trigger suitable for passing into painless and mustache scripts. */ + fun asTemplateArg(): Map { + return mapOf( + ID_FIELD to id, NAME_FIELD to name, SEVERITY_FIELD to severity, + ACTIONS_FIELD to actions.map { it.asTemplateArg() } + ) + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeString(name) + out.writeString(severity) + out.writeCollection(actions) + condition.writeTo(out) +// out.writeStringCollection(queryIds) + } + + companion object { + const val DOCUMENT_LEVEL_TRIGGER_FIELD = "document_level_trigger" + const val CONDITION_FIELD = "condition" + const val SCRIPT_FIELD = "script" + const val QUERY_IDS_FIELD = "query_ids" + + val XCONTENT_REGISTRY = NamedXContentRegistry.Entry( + Trigger::class.java, ParseField(DOCUMENT_LEVEL_TRIGGER_FIELD), + CheckedFunction { parseInner(it) } + ) + + @JvmStatic @Throws(IOException::class) + fun parseInner(xcp: XContentParser): DocumentLevelTrigger { + var id = UUIDs.base64UUID() // assign a default triggerId if one is not specified + lateinit var name: String + lateinit var severity: String + lateinit var condition: Script + val queryIds: MutableList = mutableListOf() + val actions: MutableList = mutableListOf() + + if (xcp.currentToken() != Token.START_OBJECT && xcp.currentToken() != Token.FIELD_NAME) { + XContentParserUtils.throwUnknownToken(xcp.currentToken(), xcp.tokenLocation) + } + + // If the parser began on START_OBJECT, move to the next token so that the while loop enters on + // the fieldName (or END_OBJECT if it's empty). + if (xcp.currentToken() == Token.START_OBJECT) xcp.nextToken() + + while (xcp.currentToken() != Token.END_OBJECT) { + val fieldName = xcp.currentName() + + xcp.nextToken() + when (fieldName) { + ID_FIELD -> id = xcp.text() + NAME_FIELD -> name = xcp.text() + SEVERITY_FIELD -> severity = xcp.text() + CONDITION_FIELD -> { + xcp.nextToken() + condition = Script.parse(xcp) + require(condition.lang == Script.DEFAULT_SCRIPT_LANG) { + "Invalid script language. Allowed languages are [${Script.DEFAULT_SCRIPT_LANG}]" + } + xcp.nextToken() + } + QUERY_IDS_FIELD -> { + ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp) + while (xcp.nextToken() != Token.END_ARRAY) { + queryIds.add(xcp.text()) + } + } + ACTIONS_FIELD -> { + ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp) + while (xcp.nextToken() != Token.END_ARRAY) { + actions.add(Action.parse(xcp)) + } + } + } + xcp.nextToken() + } + + return DocumentLevelTrigger( + name = requireNotNull(name) { "Trigger name is null" }, + severity = requireNotNull(severity) { "Trigger severity is null" }, + condition = requireNotNull(condition) { "Trigger condition is null" }, + actions = requireNotNull(actions) { "Trigger actions are null" }, + id = requireNotNull(id) { "Trigger id is null." } + ) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): DocumentLevelTrigger { + return DocumentLevelTrigger(sin) + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentLevelTriggerRunResult.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentLevelTriggerRunResult.kt new file mode 100644 index 000000000..c5556630b --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentLevelTriggerRunResult.kt @@ -0,0 +1,66 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.model + +import org.opensearch.alerting.alerts.AlertError +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.script.ScriptException +import java.io.IOException +import java.time.Instant + +data class DocumentLevelTriggerRunResult( + override var triggerName: String, + var triggeredDocs: List, + override var error: Exception?, + var actionResults: MutableMap = mutableMapOf() +) : TriggerRunResult(triggerName, error) { + + @Throws(IOException::class) + @Suppress("UNCHECKED_CAST") + constructor(sin: StreamInput) : this( + triggerName = sin.readString(), + error = sin.readException(), + triggeredDocs = sin.readStringList(), + actionResults = sin.readMap() as MutableMap + ) + + override fun alertError(): AlertError? { + if (error != null) { + return AlertError(Instant.now(), "Failed evaluating trigger:\n${error!!.userErrorMessage()}") + } + for (actionResult in actionResults.values) { + if (actionResult.error != null) { + return AlertError(Instant.now(), "Failed running action:\n${actionResult.error.userErrorMessage()}") + } + } + return null + } + + override fun internalXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + if (error is ScriptException) error = Exception((error as ScriptException).toJsonString(), error) + return builder + .field("triggeredDocs", triggeredDocs as List) + .field("action_results", actionResults as Map) + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + super.writeTo(out) + out.writeStringCollection(triggeredDocs) + out.writeMap(actionResults as Map) + } + + companion object { + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): TriggerRunResult { + return DocumentLevelTriggerRunResult(sin) + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/Trigger.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/Trigger.kt index e6b30415b..e3a9b12ab 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/Trigger.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/Trigger.kt @@ -18,6 +18,7 @@ import java.io.IOException interface Trigger : Writeable, ToXContentObject { enum class Type(val value: String) { + DOCUMENT_LEVEL_TRIGGER(DocumentLevelTrigger.DOCUMENT_LEVEL_TRIGGER_FIELD), QUERY_LEVEL_TRIGGER(QueryLevelTrigger.QUERY_LEVEL_TRIGGER_FIELD), BUCKET_LEVEL_TRIGGER(BucketLevelTrigger.BUCKET_LEVEL_TRIGGER_FIELD); @@ -58,6 +59,7 @@ interface Trigger : Writeable, ToXContentObject { return when (val type = sin.readEnum(Trigger.Type::class.java)) { Type.QUERY_LEVEL_TRIGGER -> QueryLevelTrigger(sin) Type.BUCKET_LEVEL_TRIGGER -> BucketLevelTrigger(sin) + Type.DOCUMENT_LEVEL_TRIGGER -> DocumentLevelTrigger(sin) // This shouldn't be reachable but ensuring exhaustiveness as Kotlin warns // enum can be null in Java else -> throw IllegalStateException("Unexpected input [$type] when reading Trigger") diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/script/DocumentLevelTriggerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/script/DocumentLevelTriggerExecutionContext.kt new file mode 100644 index 000000000..318a2c712 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/script/DocumentLevelTriggerExecutionContext.kt @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.script + +import org.opensearch.alerting.model.Alert +import org.opensearch.alerting.model.DocumentLevelTrigger +import org.opensearch.alerting.model.Monitor +import java.time.Instant + +data class DocumentLevelTriggerExecutionContext( + override val monitor: Monitor, + val trigger: DocumentLevelTrigger, + override val results: List>, + override val periodStart: Instant, + override val periodEnd: Instant, + val alert: Alert? = null, + override val error: Exception? = null +) : TriggerExecutionContext(monitor, results, periodStart, periodEnd, error) { + + constructor( + monitor: Monitor, + trigger: DocumentLevelTrigger, + alert: Alert? = null + ) : this( + monitor, trigger, emptyList(), Instant.now(), Instant.now(), + alert, null + ) + + /** + * Mustache templates need special permissions to reflectively introspect field names. To avoid doing this we + * translate the context to a Map of Strings to primitive types, which can be accessed without reflection. + */ + override fun asTemplateArg(): Map { + val tempArg = super.asTemplateArg().toMutableMap() + tempArg["trigger"] = trigger.asTemplateArg() + tempArg["alert"] = alert?.asTemplateArg() + return tempArg + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/script/TriggerScript.kt b/alerting/src/main/kotlin/org/opensearch/alerting/script/TriggerScript.kt index a6896d004..6a737e434 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/script/TriggerScript.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/script/TriggerScript.kt @@ -37,7 +37,7 @@ abstract class TriggerScript(_scriptParams: Map) { * * @param ctx - the trigger execution context */ - abstract fun execute(ctx: QueryLevelTriggerExecutionContext): Boolean + abstract fun execute(ctx: TriggerExecutionContext): Boolean interface Factory { fun newInstance(scriptParams: Map): TriggerScript diff --git a/alerting/src/main/resources/org/opensearch/alerting/org.opensearch.alerting.txt b/alerting/src/main/resources/org/opensearch/alerting/org.opensearch.alerting.txt index 78d53e839..45f14d3e9 100644 --- a/alerting/src/main/resources/org/opensearch/alerting/org.opensearch.alerting.txt +++ b/alerting/src/main/resources/org/opensearch/alerting/org.opensearch.alerting.txt @@ -5,7 +5,7 @@ class org.opensearch.alerting.script.TriggerScript { Map getParams() - boolean execute(QueryLevelTriggerExecutionContext) + boolean execute(TriggerExecutionContext) String[] PARAMETERS } @@ -31,6 +31,16 @@ class org.opensearch.alerting.script.QueryLevelTriggerExecutionContext { Exception getError() } +class org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext { + Monitor getMonitor() + DocumentLevelTrigger getTrigger() + List getResults() + java.time.Instant getPeriodStart() + java.time.Instant getPeriodEnd() + Alert getAlert() + Exception getError() +} + class org.opensearch.alerting.model.Monitor { String getId() long getVersion() @@ -45,6 +55,13 @@ class org.opensearch.alerting.model.QueryLevelTrigger { List getActions() } +class org.opensearch.alerting.model.DocumentLevelTrigger { + String getId() + String getName() + String getSeverity() + List getActions() +} + class org.opensearch.alerting.model.Alert { String getId() long getVersion() diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index bd4cfa283..e457d575f 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -21,6 +21,8 @@ import org.opensearch.alerting.model.AggregationResultBucket import org.opensearch.alerting.model.Alert import org.opensearch.alerting.model.BucketLevelTrigger import org.opensearch.alerting.model.BucketLevelTriggerRunResult +import org.opensearch.alerting.model.DocumentLevelTrigger +import org.opensearch.alerting.model.DocumentLevelTriggerRunResult import org.opensearch.alerting.model.Finding import org.opensearch.alerting.model.InputRunResults import org.opensearch.alerting.model.Monitor @@ -181,6 +183,23 @@ fun randomBucketSelectorScript( return Script(Script.DEFAULT_SCRIPT_TYPE, Script.DEFAULT_SCRIPT_LANG, idOrCode, emptyMap(), params) } +fun randomDocLevelTrigger( + id: String = UUIDs.base64UUID(), + name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), + severity: String = "1", + condition: Script = randomScript(), + actions: List = mutableListOf(), + destinationId: String = "" +): DocumentLevelTrigger { + return DocumentLevelTrigger( + id = id, + name = name, + severity = severity, + condition = condition, + actions = if (actions.isEmpty()) (0..randomInt(10)).map { randomAction(destinationId = destinationId) } else actions + ) +} + fun randomEmailAccount( name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), email: String = OpenSearchRestTestCase.randomAlphaOfLength(5) + "@email.com", @@ -376,6 +395,21 @@ fun randomBucketLevelMonitorRunResult(): MonitorRunResult { + val triggerResults = mutableMapOf() + val triggerRunResult = randomDocumentLevelTriggerRunResult() + triggerResults.plus(Pair("test", triggerRunResult)) + + return MonitorRunResult( + "test-monitor", + Instant.now(), + Instant.now(), + null, + randomInputRunResults(), + triggerResults + ) +} + fun randomInputRunResults(): InputRunResults { return InputRunResults(listOf(), null) } @@ -417,6 +451,13 @@ fun randomBucketLevelTriggerRunResult(): BucketLevelTriggerRunResult { ) } +fun randomDocumentLevelTriggerRunResult(): DocumentLevelTriggerRunResult { + val map = mutableMapOf() + map.plus(Pair("key1", randomActionRunResult())) + map.plus(Pair("key2", randomActionRunResult())) + return DocumentLevelTriggerRunResult("trigger-name", mutableListOf(UUIDs.randomBase64UUID().toString()), null, map) +} + fun randomActionRunResult(): ActionRunResult { val map = mutableMapOf() map.plus(Pair("key1", "val1")) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/model/WriteableTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/model/WriteableTests.kt index 64b8c8d62..c0b1d7eb0 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/model/WriteableTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/model/WriteableTests.kt @@ -17,6 +17,9 @@ import org.opensearch.alerting.randomActionRunResult import org.opensearch.alerting.randomBucketLevelMonitorRunResult import org.opensearch.alerting.randomBucketLevelTrigger import org.opensearch.alerting.randomBucketLevelTriggerRunResult +import org.opensearch.alerting.randomDocLevelTrigger +import org.opensearch.alerting.randomDocumentLevelMonitorRunResult +import org.opensearch.alerting.randomDocumentLevelTriggerRunResult import org.opensearch.alerting.randomEmailAccount import org.opensearch.alerting.randomEmailGroup import org.opensearch.alerting.randomInputRunResults @@ -107,6 +110,15 @@ class WriteableTests : OpenSearchTestCase() { assertEquals("Round tripping BucketLevelTrigger doesn't work", trigger, newTrigger) } + fun `test doc-level trigger as stream`() { + val trigger = randomDocLevelTrigger() + val out = BytesStreamOutput() + trigger.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newTrigger = DocumentLevelTrigger.readFrom(sin) + assertEquals("Round tripping DocumentLevelTrigger doesn't work", trigger, newTrigger) + } + fun `test actionrunresult as stream`() { val actionRunResult = randomActionRunResult() val out = BytesStreamOutput() @@ -134,6 +146,15 @@ class WriteableTests : OpenSearchTestCase() { assertEquals("Round tripping ActionRunResult doesn't work", runResult, newRunResult) } + fun `test doc-level triggerrunresult as stream`() { + val runResult = randomDocumentLevelTriggerRunResult() + val out = BytesStreamOutput() + runResult.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newRunResult = DocumentLevelTriggerRunResult(sin) + assertEquals("Round tripping ActionRunResult doesn't work", runResult, newRunResult) + } + fun `test inputrunresult as stream`() { val runResult = randomInputRunResults() val out = BytesStreamOutput() @@ -161,6 +182,15 @@ class WriteableTests : OpenSearchTestCase() { assertEquals("Round tripping MonitorRunResult doesn't work", runResult, newRunResult) } + fun `test doc-level monitorrunresult as stream`() { + val runResult = randomDocumentLevelMonitorRunResult() + val out = BytesStreamOutput() + runResult.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newRunResult = MonitorRunResult(sin) + assertEquals("Round tripping MonitorRunResult doesn't work", runResult, newRunResult) + } + fun `test searchinput as stream`() { val input = SearchInput(emptyList(), SearchSourceBuilder()) val out = BytesStreamOutput() From 4ba370d97802f1af45f61796cac6d169b6fa9211 Mon Sep 17 00:00:00 2001 From: Ashish Agrawal Date: Tue, 1 Mar 2022 09:08:38 -0800 Subject: [PATCH 2/3] remove comments Signed-off-by: Ashish Agrawal --- .../org/opensearch/alerting/model/DocumentLevelTrigger.kt | 4 ---- 1 file changed, 4 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentLevelTrigger.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentLevelTrigger.kt index 2cd2350a0..1495be418 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentLevelTrigger.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentLevelTrigger.kt @@ -35,8 +35,6 @@ data class DocumentLevelTrigger( override val severity: String, override val actions: List, val condition: Script -// // which queries need to be true from the doc monitor input -// val queryIds: List ) : Trigger { @Throws(IOException::class) @@ -57,7 +55,6 @@ data class DocumentLevelTrigger( .startObject(CONDITION_FIELD) .field(SCRIPT_FIELD, condition) .endObject() -// .field(QUERY_IDS_FIELD, queryIds.toTypedArray()) .field(ACTIONS_FIELD, actions.toTypedArray()) .endObject() .endObject() @@ -83,7 +80,6 @@ data class DocumentLevelTrigger( out.writeString(severity) out.writeCollection(actions) condition.writeTo(out) -// out.writeStringCollection(queryIds) } companion object { From 2ea6420e3670d5cfbde8eb115b048ba6fe02267c Mon Sep 17 00:00:00 2001 From: Ashish Agrawal Date: Thu, 17 Mar 2022 14:10:55 -0700 Subject: [PATCH 3/3] fixes Signed-off-by: Ashish Agrawal --- .../org/opensearch/alerting/MonitorRunner.kt | 81 +++++++++++-------- .../model/DocumentExecutionContext.kt | 9 +++ 2 files changed, 57 insertions(+), 33 deletions(-) create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentExecutionContext.kt diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt index 892df1455..c6ea33e87 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt @@ -30,6 +30,7 @@ import org.opensearch.alerting.model.Alert import org.opensearch.alerting.model.AlertingConfigAccessor import org.opensearch.alerting.model.BucketLevelTrigger import org.opensearch.alerting.model.BucketLevelTriggerRunResult +import org.opensearch.alerting.model.DocumentExecutionContext import org.opensearch.alerting.model.DocumentLevelTrigger import org.opensearch.alerting.model.Finding import org.opensearch.alerting.model.InputRunResults @@ -767,37 +768,22 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { updatedLastRunContext[shard] = maxSeqNo.toString() } - val queryDocIds = mutableMapOf>() + val queryToDocIds = mutableMapOf>() val docsToQueries = mutableMapOf>() - for (query in queries) { - val matchingDocIds = runForEachQuery(monitor, lastRunContext, updatedLastRunContext, index, query) - queryDocIds[query] = matchingDocIds + val docExecutionContext = DocumentExecutionContext(queries, lastRunContext, updatedLastRunContext) + queries.forEach { query -> + val matchingDocIds = runForEachQuery(docExecutionContext, query, index) + queryToDocIds[query] = matchingDocIds matchingDocIds.forEach { - if (docsToQueries.containsKey(it)) { - docsToQueries[it]?.add(query.id) - } else { - docsToQueries[it] = mutableListOf(query.id) - } + docsToQueries.putIfAbsent(it, mutableListOf()) + docsToQueries[it]?.add(query.id) } } val queryIds = queries.map { it.id } - for (trigger in monitor.triggers) { - val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger as DocumentLevelTrigger) - val triggerResult = triggerService.runDocLevelTrigger(monitor, trigger, triggerCtx, docsToQueries, queryIds) - - logger.info("trigger results") - logger.info(triggerResult.triggeredDocs.toString()) - - queryDocIds.forEach { - val queryTriggeredDocs = it.value.intersect(triggerResult.triggeredDocs) - if (queryTriggeredDocs.isNotEmpty()) { - val findingId = createFindings(monitor, index, it.key, queryTriggeredDocs, trigger) - // TODO: check if need to create alert, if so create it and point it to FindingId - // TODO: run action as well, but this mat need to be throttled based on Mo's comment for bucket level alerting - } - } + monitor.triggers.forEach { + runForEachDocTrigger(it as DocumentLevelTrigger, monitor, docsToQueries, queryIds, queryToDocIds) } // TODO: Check for race condition against the update monitor api @@ -808,6 +794,31 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { updateMonitor(client, xContentRegistry, settings, updatedMonitor) } + private fun runForEachDocTrigger( + trigger: DocumentLevelTrigger, + monitor: Monitor, + docsToQueries: Map>, + queryIds: List, + queryToDocIds: Map> + ) { + val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger) + val triggerResult = triggerService.runDocLevelTrigger(monitor, trigger, triggerCtx, docsToQueries, queryIds) + + logger.info("trigger results") + logger.info(triggerResult.triggeredDocs.toString()) + + val index = (monitor.inputs[0] as DocLevelMonitorInput).indices[0] + + queryToDocIds.forEach { + val queryTriggeredDocs = it.value.intersect(triggerResult.triggeredDocs) + if (queryTriggeredDocs.isNotEmpty()) { + val findingId = createFindings(monitor, index, it.key, queryTriggeredDocs, trigger) + // TODO: check if need to create alert, if so create it and point it to FindingId + // TODO: run action as well, but this mat need to be throttled based on Mo's comment for bucket level alerting + } + } + } + private fun createFindings( monitor: Monitor, index: String, @@ -842,24 +853,28 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { return finding.id } - private suspend fun runForEachQuery( - monitor: Monitor, - lastRunContext: MutableMap, - updatedLastRunContext: MutableMap, - index: String, - query: DocLevelQuery + private fun runForEachQuery( + docExecutionCtx: DocumentExecutionContext, + query: DocLevelQuery, + index: String ): Set { - val count: Int = lastRunContext["shards_count"] as Int + val count: Int = docExecutionCtx.lastRunContext["shards_count"] as Int val matchingDocs = mutableSetOf() for (i: Int in 0 until count) { val shard = i.toString() try { logger.info("Monitor execution for shard: $shard") - val maxSeqNo: Long = updatedLastRunContext[shard].toString().toLong() + val maxSeqNo: Long = docExecutionCtx.updatedLastRunContext[shard].toString().toLong() logger.info("MaxSeqNo of shard_$shard is $maxSeqNo") - val hits: SearchHits = searchShard(index, shard, lastRunContext[shard].toString().toLongOrNull(), maxSeqNo, query.query) + val hits: SearchHits = searchShard( + index, + shard, + docExecutionCtx.lastRunContext[shard].toString().toLongOrNull(), + maxSeqNo, + query.query + ) logger.info("Search hits for shard_$shard is: ${hits.hits.size}") if (hits.hits.isNotEmpty()) { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentExecutionContext.kt new file mode 100644 index 000000000..db3f752c7 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentExecutionContext.kt @@ -0,0 +1,9 @@ +package org.opensearch.alerting.model + +import org.opensearch.alerting.model.docLevelInput.DocLevelQuery + +data class DocumentExecutionContext( + val queries: List, + val lastRunContext: Map, + val updatedLastRunContext: Map +)