Skip to content

Commit

Permalink
Add connection to triggers for doc level alerting
Browse files Browse the repository at this point in the history
  • Loading branch information
lezzago committed Mar 1, 2022
1 parent 5500451 commit 6b8c55b
Show file tree
Hide file tree
Showing 11 changed files with 500 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.opensearch.alerting.core.schedule.JobScheduler
import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings
import org.opensearch.alerting.core.settings.ScheduledJobSettings
import org.opensearch.alerting.model.BucketLevelTrigger
import org.opensearch.alerting.model.DocumentLevelTrigger
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.QueryLevelTrigger
import org.opensearch.alerting.model.docLevelInput.DocLevelMonitorInput
Expand Down Expand Up @@ -211,7 +212,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
SearchInput.XCONTENT_REGISTRY,
DocLevelMonitorInput.XCONTENT_REGISTRY,
QueryLevelTrigger.XCONTENT_REGISTRY,
BucketLevelTrigger.XCONTENT_REGISTRY
BucketLevelTrigger.XCONTENT_REGISTRY,
DocumentLevelTrigger.XCONTENT_REGISTRY
)
}

Expand Down
145 changes: 99 additions & 46 deletions alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.opensearch.alerting.model.Alert
import org.opensearch.alerting.model.AlertingConfigAccessor
import org.opensearch.alerting.model.BucketLevelTrigger
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.DocumentLevelTrigger
import org.opensearch.alerting.model.Finding
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.Monitor
Expand All @@ -48,6 +49,7 @@ import org.opensearch.alerting.model.destination.DestinationContextFactory
import org.opensearch.alerting.model.docLevelInput.DocLevelMonitorInput
import org.opensearch.alerting.model.docLevelInput.DocLevelQuery
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.script.TriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT
Expand Down Expand Up @@ -757,39 +759,119 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
return
}

val count: Int = lastRunContext["shards_count"] as Int
val updatedLastRunContext = lastRunContext.toMutableMap()
for (i: Int in 0 until count) {
val shard = i.toString()
val maxSeqNo: Long = getMaxSeqNo(index, shard)
updatedLastRunContext[shard] = maxSeqNo.toString()
}

val queryDocIds = mutableMapOf<DocLevelQuery, Set<String>>()
val docsToQueries = mutableMapOf<String, MutableList<String>>()
for (query in queries) {
runForEachQuery(monitor, lastRunContext, index, query)
val matchingDocIds = runForEachQuery(monitor, lastRunContext, updatedLastRunContext, index, query)
queryDocIds[query] = matchingDocIds
matchingDocIds.forEach {
if (docsToQueries.containsKey(it)) {
docsToQueries[it]?.add(query.id)
} else {
docsToQueries[it] = mutableListOf(query.id)
}
}
}

val queryIds = queries.map { it.id }

for (trigger in monitor.triggers) {
val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger as DocumentLevelTrigger)
val triggerResult = triggerService.runDocLevelTrigger(monitor, trigger, triggerCtx, docsToQueries, queryIds)

logger.info("trigger results")
logger.info(triggerResult.triggeredDocs.toString())

queryDocIds.forEach {
val queryTriggeredDocs = it.value.intersect(triggerResult.triggeredDocs)
if (queryTriggeredDocs.isNotEmpty()) {
val findingId = createFindings(monitor, index, it.key, queryTriggeredDocs, trigger)
// TODO: check if need to create alert, if so create it and point it to FindingId
// TODO: run action as well, but this mat need to be throttled based on Mo's comment for bucket level alerting
}
}
}

// TODO: Check for race condition against the update monitor api
// This does the update at the end in case of errors and makes sure all the queries are executed
val updatedMonitor = monitor.copy(lastRunContext = updatedLastRunContext)
// note: update has to called in serial for shards of a given index.
// make sure this is just updated for the specific query or at the end of all the queries
updateMonitor(client, xContentRegistry, settings, updatedMonitor)
}

private fun createFindings(
monitor: Monitor,
index: String,
docLevelQuery: DocLevelQuery,
matchingDocIds: Set<String>,
trigger: DocumentLevelTrigger
): String {
val finding = Finding(
id = UUID.randomUUID().toString(),
relatedDocId = matchingDocIds.joinToString(","),
monitorId = monitor.id,
monitorName = monitor.name,
index = index,
queryId = docLevelQuery.id,
queryTags = docLevelQuery.tags,
severity = docLevelQuery.severity,
timestamp = Instant.now(),
triggerId = trigger.id,
triggerName = trigger.name
)

val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string()
// change this to debug.
logger.info("Findings: $findingStr")

// todo: below is all hardcoded, temp code and added only to test. replace this with proper Findings index lifecycle management.
val indexRequest = IndexRequest(".opensearch-alerting-findings")
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(findingStr, XContentType.JSON)

client.index(indexRequest).actionGet()
return finding.id
}

private suspend fun runForEachQuery(monitor: Monitor, lastRunContext: MutableMap<String, Any>, index: String, query: DocLevelQuery) {
private suspend fun runForEachQuery(
monitor: Monitor,
lastRunContext: MutableMap<String, Any>,
updatedLastRunContext: MutableMap<String, Any>,
index: String,
query: DocLevelQuery
): Set<String> {
val count: Int = lastRunContext["shards_count"] as Int
val matchingDocs = mutableSetOf<String>()
for (i: Int in 0 until count) {
val shard = i.toString()
try {
logger.info("Monitor execution for shard: $shard")

val maxSeqNo: Long = getMaxSeqNo(index, shard)
val maxSeqNo: Long = updatedLastRunContext[shard].toString().toLong()
logger.info("MaxSeqNo of shard_$shard is $maxSeqNo")

// todo: scope to optimize this: in prev seqno and current max seq no are same don't search.
val hits: SearchHits = searchShard(index, shard, lastRunContext[shard].toString().toLongOrNull(), maxSeqNo, query.query)
logger.info("Search hits for shard_$shard is: ${hits.hits.size}")

if (hits.hits.isNotEmpty()) {
createFindings(monitor, index, query, hits)
logger.info("found matches")
matchingDocs.addAll(getAllDocIds(hits))
}

logger.info("Updating monitor: ${monitor.id}")
lastRunContext[shard] = maxSeqNo.toString()
val updatedMonitor = monitor.copy(lastRunContext = lastRunContext)
// note: update has to called in serial for shards of a given index.
updateMonitor(client, xContentRegistry, settings, updatedMonitor)
} catch (e: Exception) {
logger.info("Failed to run for shard $shard. Error: ${e.message}")
logger.debug("Failed to run for shard $shard", e)
}
}
return matchingDocs
}

// todo: add more validations.
Expand All @@ -810,7 +892,7 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {

private fun getShardsCount(index: String): Int {
val allShards: List<ShardRouting> = clusterService.state().routingTable().allShards(index)
return allShards.size
return allShards.filter { it.primary() }.size
}

private fun createRunContext(index: String): HashMap<String, Any> {
Expand Down Expand Up @@ -854,6 +936,9 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
}

private fun searchShard(index: String, shard: String, prevSeqNo: Long?, maxSeqNo: Long, query: String): SearchHits {
if (prevSeqNo?.equals(maxSeqNo) == true) {
return SearchHits.empty()
}
val boolQueryBuilder = BoolQueryBuilder()
boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo))
boolQueryBuilder.must(QueryBuilders.queryStringQuery(query))
Expand All @@ -875,39 +960,7 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
return response.hits
}

private fun createFindings(monitor: Monitor, index: String, docLevelQuery: DocLevelQuery, hits: SearchHits) {
val finding = Finding(
id = UUID.randomUUID().toString(),
relatedDocId = getAllDocIds(hits),
monitorId = monitor.id,
monitorName = monitor.name,
index = index,
queryId = docLevelQuery.id,
queryTags = docLevelQuery.tags,
severity = docLevelQuery.severity,
timestamp = Instant.now(),
triggerId = null, // todo: add once integrated with actions/triggers
triggerName = null // todo: add once integrated with actions/triggers
)

val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string()
// change this to debug.
logger.info("Findings: $findingStr")

// todo: below is all hardcoded, temp code and added only to test. replace this with proper Findings index lifecycle management.
val indexRequest = IndexRequest(".opensearch-alerting-findings")
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(findingStr, XContentType.JSON)

client.index(indexRequest).actionGet()
}

private fun getAllDocIds(hits: SearchHits): String {
var sb = StringBuilder()
for (hit in hits) {
sb.append(hit.id)
sb.append(",")
}
return sb.substring(0, sb.length - 1)
private fun getAllDocIds(hits: SearchHits): List<String> {
return hits.map { hit -> hit.id }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,20 @@ import org.opensearch.alerting.model.AggregationResultBucket
import org.opensearch.alerting.model.Alert
import org.opensearch.alerting.model.BucketLevelTrigger
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.DocumentLevelTrigger
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.QueryLevelTrigger
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.script.TriggerScript
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.script.ScriptService
import org.opensearch.search.aggregations.Aggregation
import org.opensearch.search.aggregations.Aggregations
import org.opensearch.search.aggregations.support.AggregationPath
import java.lang.IllegalArgumentException

/** Service that handles executing Triggers */
class TriggerService(val scriptService: ScriptService) {
Expand Down Expand Up @@ -53,6 +55,37 @@ class TriggerService(val scriptService: ScriptService) {
}
}

// TODO: improve performance and support match all and match any
fun runDocLevelTrigger(
monitor: Monitor,
trigger: DocumentLevelTrigger,
ctx: DocumentLevelTriggerExecutionContext,
docsToQueries: Map<String, List<String>>,
queryIds: List<String>
): DocumentLevelTriggerRunResult {
return try {
val triggeredDocs = mutableListOf<String>()

for (doc in docsToQueries.keys) {
val params = trigger.condition.params.toMutableMap()
for (queryId in queryIds) {
params[queryId] = docsToQueries[doc]!!.contains(queryId)
}
val triggered = scriptService.compile(trigger.condition, TriggerScript.CONTEXT)
.newInstance(params)
.execute(ctx)
logger.info("trigger val: $triggered")
if (triggered) triggeredDocs.add(doc)
}

DocumentLevelTriggerRunResult(trigger.name, triggeredDocs, null)
} catch (e: Exception) {
logger.info("Error running script for monitor ${monitor.id}, trigger: ${trigger.id}", e)
// if the script fails we need to send an alert so set triggered = true
DocumentLevelTriggerRunResult(trigger.name, emptyList(), e)
}
}

@Suppress("UNCHECKED_CAST")
fun runBucketLevelTrigger(
monitor: Monitor,
Expand Down
Loading

0 comments on commit 6b8c55b

Please sign in to comment.