-
Notifications
You must be signed in to change notification settings - Fork 103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add connection to triggers for doc level alerting #316
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,8 @@ import org.opensearch.alerting.model.Alert | |
import org.opensearch.alerting.model.AlertingConfigAccessor | ||
import org.opensearch.alerting.model.BucketLevelTrigger | ||
import org.opensearch.alerting.model.BucketLevelTriggerRunResult | ||
import org.opensearch.alerting.model.DocumentExecutionContext | ||
import org.opensearch.alerting.model.DocumentLevelTrigger | ||
import org.opensearch.alerting.model.Finding | ||
import org.opensearch.alerting.model.InputRunResults | ||
import org.opensearch.alerting.model.Monitor | ||
|
@@ -48,6 +50,7 @@ import org.opensearch.alerting.model.destination.DestinationContextFactory | |
import org.opensearch.alerting.model.docLevelInput.DocLevelMonitorInput | ||
import org.opensearch.alerting.model.docLevelInput.DocLevelQuery | ||
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext | ||
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext | ||
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext | ||
import org.opensearch.alerting.script.TriggerExecutionContext | ||
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT | ||
|
@@ -757,39 +760,133 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { | |
return | ||
} | ||
|
||
for (query in queries) { | ||
runForEachQuery(monitor, lastRunContext, index, query) | ||
val count: Int = lastRunContext["shards_count"] as Int | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How do we handle change in shard count between the runs? Are there edge scenarios which might need this handling. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I created this github issue regarding this. Also this only happens when there is a reindexing operation, so there might still be issues during executions since the sequence numbers will be changed as well. |
||
val updatedLastRunContext = lastRunContext.toMutableMap() | ||
AWSHurneyt marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for (i: Int in 0 until count) { | ||
val shard = i.toString() | ||
val maxSeqNo: Long = getMaxSeqNo(index, shard) | ||
updatedLastRunContext[shard] = maxSeqNo.toString() | ||
} | ||
|
||
val queryToDocIds = mutableMapOf<DocLevelQuery, Set<String>>() | ||
val docsToQueries = mutableMapOf<String, MutableList<String>>() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
val docExecutionContext = DocumentExecutionContext(queries, lastRunContext, updatedLastRunContext) | ||
queries.forEach { query -> | ||
val matchingDocIds = runForEachQuery(docExecutionContext, query, index) | ||
queryToDocIds[query] = matchingDocIds | ||
matchingDocIds.forEach { | ||
docsToQueries.putIfAbsent(it, mutableListOf()) | ||
docsToQueries[it]?.add(query.id) | ||
} | ||
} | ||
|
||
val queryIds = queries.map { it.id } | ||
|
||
monitor.triggers.forEach { | ||
runForEachDocTrigger(it as DocumentLevelTrigger, monitor, docsToQueries, queryIds, queryToDocIds) | ||
} | ||
|
||
// TODO: Check for race condition against the update monitor api | ||
// This does the update at the end in case of errors and makes sure all the queries are executed | ||
val updatedMonitor = monitor.copy(lastRunContext = updatedLastRunContext) | ||
// note: update has to called in serial for shards of a given index. | ||
// make sure this is just updated for the specific query or at the end of all the queries | ||
updateMonitor(client, xContentRegistry, settings, updatedMonitor) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is there are race due to concurrent updates to monitor from user driven update vs last context updates? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will remove these comments since these have now been sorted. Yea there has been talks of having another index for additional metadata information and this can help with throttling. |
||
} | ||
|
||
private suspend fun runForEachQuery(monitor: Monitor, lastRunContext: MutableMap<String, Any>, index: String, query: DocLevelQuery) { | ||
val count: Int = lastRunContext["shards_count"] as Int | ||
private fun runForEachDocTrigger( | ||
trigger: DocumentLevelTrigger, | ||
monitor: Monitor, | ||
docsToQueries: Map<String, List<String>>, | ||
queryIds: List<String>, | ||
queryToDocIds: Map<DocLevelQuery, Set<String>> | ||
) { | ||
val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger) | ||
val triggerResult = triggerService.runDocLevelTrigger(monitor, trigger, triggerCtx, docsToQueries, queryIds) | ||
|
||
logger.info("trigger results") | ||
logger.info(triggerResult.triggeredDocs.toString()) | ||
|
||
val index = (monitor.inputs[0] as DocLevelMonitorInput).indices[0] | ||
|
||
queryToDocIds.forEach { | ||
val queryTriggeredDocs = it.value.intersect(triggerResult.triggeredDocs) | ||
if (queryTriggeredDocs.isNotEmpty()) { | ||
val findingId = createFindings(monitor, index, it.key, queryTriggeredDocs, trigger) | ||
// TODO: check if need to create alert, if so create it and point it to FindingId | ||
// TODO: run action as well, but this mat need to be throttled based on Mo's comment for bucket level alerting | ||
Comment on lines
+816
to
+817
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you think its better to cover ToDo in issues (linked under one Meta) and tag them here for more context? Will it ensure we close them out before the release? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Created a github issue and will create one meta github issue later which this will be included in |
||
} | ||
} | ||
} | ||
|
||
private fun createFindings( | ||
monitor: Monitor, | ||
index: String, | ||
docLevelQuery: DocLevelQuery, | ||
matchingDocIds: Set<String>, | ||
trigger: DocumentLevelTrigger | ||
): String { | ||
val finding = Finding( | ||
id = UUID.randomUUID().toString(), | ||
relatedDocId = matchingDocIds.joinToString(","), | ||
monitorId = monitor.id, | ||
monitorName = monitor.name, | ||
index = index, | ||
queryId = docLevelQuery.id, | ||
queryTags = docLevelQuery.tags, | ||
severity = docLevelQuery.severity, | ||
timestamp = Instant.now(), | ||
triggerId = trigger.id, | ||
triggerName = trigger.name | ||
) | ||
|
||
val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string() | ||
// change this to debug. | ||
logger.info("Findings: $findingStr") | ||
|
||
// todo: below is all hardcoded, temp code and added only to test. replace this with proper Findings index lifecycle management. | ||
val indexRequest = IndexRequest(".opensearch-alerting-findings") | ||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) | ||
.source(findingStr, XContentType.JSON) | ||
|
||
client.index(indexRequest).actionGet() | ||
return finding.id | ||
} | ||
|
||
private fun runForEachQuery( | ||
docExecutionCtx: DocumentExecutionContext, | ||
query: DocLevelQuery, | ||
index: String | ||
): Set<String> { | ||
val count: Int = docExecutionCtx.lastRunContext["shards_count"] as Int | ||
val matchingDocs = mutableSetOf<String>() | ||
for (i: Int in 0 until count) { | ||
val shard = i.toString() | ||
try { | ||
logger.info("Monitor execution for shard: $shard") | ||
|
||
val maxSeqNo: Long = getMaxSeqNo(index, shard) | ||
val maxSeqNo: Long = docExecutionCtx.updatedLastRunContext[shard].toString().toLong() | ||
logger.info("MaxSeqNo of shard_$shard is $maxSeqNo") | ||
|
||
// todo: scope to optimize this: in prev seqno and current max seq no are same don't search. | ||
val hits: SearchHits = searchShard(index, shard, lastRunContext[shard].toString().toLongOrNull(), maxSeqNo, query.query) | ||
val hits: SearchHits = searchShard( | ||
index, | ||
shard, | ||
docExecutionCtx.lastRunContext[shard].toString().toLongOrNull(), | ||
maxSeqNo, | ||
query.query | ||
) | ||
logger.info("Search hits for shard_$shard is: ${hits.hits.size}") | ||
|
||
if (hits.hits.isNotEmpty()) { | ||
createFindings(monitor, index, query, hits) | ||
logger.info("found matches") | ||
matchingDocs.addAll(getAllDocIds(hits)) | ||
} | ||
|
||
logger.info("Updating monitor: ${monitor.id}") | ||
lastRunContext[shard] = maxSeqNo.toString() | ||
val updatedMonitor = monitor.copy(lastRunContext = lastRunContext) | ||
// note: update has to called in serial for shards of a given index. | ||
updateMonitor(client, xContentRegistry, settings, updatedMonitor) | ||
} catch (e: Exception) { | ||
logger.info("Failed to run for shard $shard. Error: ${e.message}") | ||
logger.debug("Failed to run for shard $shard", e) | ||
} | ||
} | ||
return matchingDocs | ||
} | ||
|
||
// todo: add more validations. | ||
|
@@ -810,7 +907,7 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { | |
|
||
private fun getShardsCount(index: String): Int { | ||
val allShards: List<ShardRouting> = clusterService.state().routingTable().allShards(index) | ||
return allShards.size | ||
return allShards.filter { it.primary() }.size | ||
} | ||
|
||
private fun createRunContext(index: String): HashMap<String, Any> { | ||
|
@@ -854,6 +951,9 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { | |
} | ||
|
||
private fun searchShard(index: String, shard: String, prevSeqNo: Long?, maxSeqNo: Long, query: String): SearchHits { | ||
if (prevSeqNo?.equals(maxSeqNo) == true) { | ||
return SearchHits.empty() | ||
} | ||
val boolQueryBuilder = BoolQueryBuilder() | ||
boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo)) | ||
boolQueryBuilder.must(QueryBuilders.queryStringQuery(query)) | ||
|
@@ -875,39 +975,7 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { | |
return response.hits | ||
} | ||
|
||
private fun createFindings(monitor: Monitor, index: String, docLevelQuery: DocLevelQuery, hits: SearchHits) { | ||
val finding = Finding( | ||
id = UUID.randomUUID().toString(), | ||
relatedDocId = getAllDocIds(hits), | ||
monitorId = monitor.id, | ||
monitorName = monitor.name, | ||
index = index, | ||
queryId = docLevelQuery.id, | ||
queryTags = docLevelQuery.tags, | ||
severity = docLevelQuery.severity, | ||
timestamp = Instant.now(), | ||
triggerId = null, // todo: add once integrated with actions/triggers | ||
triggerName = null // todo: add once integrated with actions/triggers | ||
) | ||
|
||
val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string() | ||
// change this to debug. | ||
logger.info("Findings: $findingStr") | ||
|
||
// todo: below is all hardcoded, temp code and added only to test. replace this with proper Findings index lifecycle management. | ||
val indexRequest = IndexRequest(".opensearch-alerting-findings") | ||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) | ||
.source(findingStr, XContentType.JSON) | ||
|
||
client.index(indexRequest).actionGet() | ||
} | ||
|
||
private fun getAllDocIds(hits: SearchHits): String { | ||
var sb = StringBuilder() | ||
for (hit in hits) { | ||
sb.append(hit.id) | ||
sb.append(",") | ||
} | ||
return sb.substring(0, sb.length - 1) | ||
private fun getAllDocIds(hits: SearchHits): List<String> { | ||
return hits.map { hit -> hit.id } | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,18 +12,20 @@ import org.opensearch.alerting.model.AggregationResultBucket | |
import org.opensearch.alerting.model.Alert | ||
import org.opensearch.alerting.model.BucketLevelTrigger | ||
import org.opensearch.alerting.model.BucketLevelTriggerRunResult | ||
import org.opensearch.alerting.model.DocumentLevelTrigger | ||
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult | ||
import org.opensearch.alerting.model.Monitor | ||
import org.opensearch.alerting.model.QueryLevelTrigger | ||
import org.opensearch.alerting.model.QueryLevelTriggerRunResult | ||
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext | ||
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext | ||
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext | ||
import org.opensearch.alerting.script.TriggerScript | ||
import org.opensearch.alerting.util.getBucketKeysHash | ||
import org.opensearch.script.ScriptService | ||
import org.opensearch.search.aggregations.Aggregation | ||
import org.opensearch.search.aggregations.Aggregations | ||
import org.opensearch.search.aggregations.support.AggregationPath | ||
import java.lang.IllegalArgumentException | ||
|
||
/** Service that handles executing Triggers */ | ||
class TriggerService(val scriptService: ScriptService) { | ||
|
@@ -53,6 +55,37 @@ class TriggerService(val scriptService: ScriptService) { | |
} | ||
} | ||
|
||
// TODO: improve performance and support match all and match any | ||
fun runDocLevelTrigger( | ||
monitor: Monitor, | ||
trigger: DocumentLevelTrigger, | ||
ctx: DocumentLevelTriggerExecutionContext, | ||
docsToQueries: Map<String, List<String>>, | ||
queryIds: List<String> | ||
): DocumentLevelTriggerRunResult { | ||
return try { | ||
val triggeredDocs = mutableListOf<String>() | ||
|
||
for (doc in docsToQueries.keys) { | ||
val params = trigger.condition.params.toMutableMap() | ||
for (queryId in queryIds) { | ||
params[queryId] = docsToQueries[doc]!!.contains(queryId) | ||
} | ||
val triggered = scriptService.compile(trigger.condition, TriggerScript.CONTEXT) | ||
.newInstance(params) | ||
.execute(ctx) | ||
logger.info("trigger val: $triggered") | ||
if (triggered) triggeredDocs.add(doc) | ||
} | ||
|
||
DocumentLevelTriggerRunResult(trigger.name, triggeredDocs, null) | ||
} catch (e: Exception) { | ||
Comment on lines
+66
to
+82
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of Document -> Query, can we do Query -> Document as number of documents will be fairly large compared to number of rules. Also it will simplify match all cases, since it will boil down to the problem of common ids across multiple lists. thoughts? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is still in discussion and we will have a path forward in a future PR. |
||
logger.info("Error running script for monitor ${monitor.id}, trigger: ${trigger.id}", e) | ||
// if the script fails we need to send an alert so set triggered = true | ||
DocumentLevelTriggerRunResult(trigger.name, emptyList(), e) | ||
} | ||
} | ||
|
||
@Suppress("UNCHECKED_CAST") | ||
fun runBucketLevelTrigger( | ||
monitor: Monitor, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
package org.opensearch.alerting.model | ||
|
||
import org.opensearch.alerting.model.docLevelInput.DocLevelQuery | ||
|
||
data class DocumentExecutionContext( | ||
val queries: List<DocLevelQuery>, | ||
val lastRunContext: Map<String, Any>, | ||
val updatedLastRunContext: Map<String, Any> | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it's the time we break MonitorRunner into multiple concrete implementations for different types of alerting to make sure we are segregating responsibilities? This class is growing huge. It will also simplify start adding tests with dedicated scope.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I agree and I have a task for refactoring to help cleanup monitor runner