Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add connection to triggers for doc level alerting #316

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.opensearch.alerting.core.schedule.JobScheduler
import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings
import org.opensearch.alerting.core.settings.ScheduledJobSettings
import org.opensearch.alerting.model.BucketLevelTrigger
import org.opensearch.alerting.model.DocumentLevelTrigger
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.QueryLevelTrigger
import org.opensearch.alerting.model.docLevelInput.DocLevelMonitorInput
Expand Down Expand Up @@ -211,7 +212,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
SearchInput.XCONTENT_REGISTRY,
DocLevelMonitorInput.XCONTENT_REGISTRY,
QueryLevelTrigger.XCONTENT_REGISTRY,
BucketLevelTrigger.XCONTENT_REGISTRY
BucketLevelTrigger.XCONTENT_REGISTRY,
DocumentLevelTrigger.XCONTENT_REGISTRY
)
}

Expand Down
145 changes: 99 additions & 46 deletions alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.opensearch.alerting.model.Alert
import org.opensearch.alerting.model.AlertingConfigAccessor
import org.opensearch.alerting.model.BucketLevelTrigger
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.DocumentLevelTrigger
import org.opensearch.alerting.model.Finding
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.Monitor
Expand All @@ -48,6 +49,7 @@ import org.opensearch.alerting.model.destination.DestinationContextFactory
import org.opensearch.alerting.model.docLevelInput.DocLevelMonitorInput
import org.opensearch.alerting.model.docLevelInput.DocLevelQuery
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.script.TriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT
Expand Down Expand Up @@ -757,39 +759,119 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
return
}

val count: Int = lastRunContext["shards_count"] as Int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it's the time we break MonitorRunner into multiple concrete implementations for different types of alerting to make sure we are segregating responsibilities? This class is growing huge. It will also simplify start adding tests with dedicated scope.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree and I have a task for refactoring to help cleanup monitor runner

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we handle change in shard count between the runs? Are there edge scenarios which might need this handling.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created this github issue regarding this. Also this only happens when there is a reindexing operation, so there might still be issues during executions since the sequence numbers will be changed as well.

val updatedLastRunContext = lastRunContext.toMutableMap()
AWSHurneyt marked this conversation as resolved.
Show resolved Hide resolved
for (i: Int in 0 until count) {
val shard = i.toString()
val maxSeqNo: Long = getMaxSeqNo(index, shard)
updatedLastRunContext[shard] = maxSeqNo.toString()
}

val queryDocIds = mutableMapOf<DocLevelQuery, Set<String>>()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

queryDocIds --> queryToDocIds ?

val docsToQueries = mutableMapOf<String, MutableList<String>>()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docsToQueries -> docIdsToQueries

for (query in queries) {
runForEachQuery(monitor, lastRunContext, index, query)
val matchingDocIds = runForEachQuery(monitor, lastRunContext, updatedLastRunContext, index, query)
queryDocIds[query] = matchingDocIds
matchingDocIds.forEach {
if (docsToQueries.containsKey(it)) {
docsToQueries[it]?.add(query.id)
} else {
docsToQueries[it] = mutableListOf(query.id)
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we simplify this by passing a Supplier Function to runForEachQuery instead which already has the matchedDocumentIds? I am again trying to put a perspective how runForEachQuery can be executed concurrently.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By trying to add a supplier, the code started to seem more complex and I noticed we need to change the threads the plugin runs on and additionally add additional permissions for the plugin to parallelize on multiple threads in the cluster.

We can do this as a separate PR

}

val queryIds = queries.map { it.id }

for (trigger in monitor.triggers) {
val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger as DocumentLevelTrigger)
val triggerResult = triggerService.runDocLevelTrigger(monitor, trigger, triggerCtx, docsToQueries, queryIds)

logger.info("trigger results")
logger.info(triggerResult.triggeredDocs.toString())

queryDocIds.forEach {
val queryTriggeredDocs = it.value.intersect(triggerResult.triggeredDocs)
if (queryTriggeredDocs.isNotEmpty()) {
val findingId = createFindings(monitor, index, it.key, queryTriggeredDocs, trigger)
// TODO: check if need to create alert, if so create it and point it to FindingId
// TODO: run action as well, but this mat need to be throttled based on Mo's comment for bucket level alerting
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Abstract this out as a separate runForEachTrigger. Again to achieve higher parallelism we can expect each trigger execution to return the findings. Here we may only want to consolidate them to create final findings. Thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have separated the function out for this. Also not sure if there is any benefit to consolidate all the findings in the end since we just want to run a single action based on each finding

}

// TODO: Check for race condition against the update monitor api
// This does the update at the end in case of errors and makes sure all the queries are executed
val updatedMonitor = monitor.copy(lastRunContext = updatedLastRunContext)
// note: update has to called in serial for shards of a given index.
// make sure this is just updated for the specific query or at the end of all the queries
updateMonitor(client, xContentRegistry, settings, updatedMonitor)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is there are race due to concurrent updates to monitor from user driven update vs last context updates?
I see a value into segregating the monitor configuration information from state information. Thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove these comments since these have now been sorted. Yea there has been talks of having another index for additional metadata information and this can help with throttling.

}

private fun createFindings(
monitor: Monitor,
index: String,
docLevelQuery: DocLevelQuery,
matchingDocIds: Set<String>,
trigger: DocumentLevelTrigger
): String {
val finding = Finding(
id = UUID.randomUUID().toString(),
relatedDocId = matchingDocIds.joinToString(","),
monitorId = monitor.id,
monitorName = monitor.name,
index = index,
queryId = docLevelQuery.id,
queryTags = docLevelQuery.tags,
severity = docLevelQuery.severity,
timestamp = Instant.now(),
triggerId = trigger.id,
triggerName = trigger.name
)

val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string()
// change this to debug.
logger.info("Findings: $findingStr")

// todo: below is all hardcoded, temp code and added only to test. replace this with proper Findings index lifecycle management.
val indexRequest = IndexRequest(".opensearch-alerting-findings")
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(findingStr, XContentType.JSON)

client.index(indexRequest).actionGet()
return finding.id
}

private suspend fun runForEachQuery(monitor: Monitor, lastRunContext: MutableMap<String, Any>, index: String, query: DocLevelQuery) {
private suspend fun runForEachQuery(
monitor: Monitor,
lastRunContext: MutableMap<String, Any>,
updatedLastRunContext: MutableMap<String, Any>,
index: String,
query: DocLevelQuery
): Set<String> {
val count: Int = lastRunContext["shards_count"] as Int
val matchingDocs = mutableSetOf<String>()
for (i: Int in 0 until count) {
val shard = i.toString()
try {
logger.info("Monitor execution for shard: $shard")

val maxSeqNo: Long = getMaxSeqNo(index, shard)
val maxSeqNo: Long = updatedLastRunContext[shard].toString().toLong()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea that we are pre-caching the last run context per shard, as this allows uniform run-time across the shards since we its essential sequential run among the shards.

However, I see the opportunity of creating an execution object which provides abstraction for all the information needed to run the queries per shard. Such as queries, lastRunContext, UpdatedContext, matchingDocs under a single wrapper.

Think of it like the state object which you can pass to a runnable for asynchronous parallel executions. We will need this in very near future. This will help make the flow logic of this method stateless.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a new object, but note that this object will be modified more later on as we optimize the flow and utilize the object. I didn't see any benefit to add the matchingDocs, so that was not added.

logger.info("MaxSeqNo of shard_$shard is $maxSeqNo")

// todo: scope to optimize this: in prev seqno and current max seq no are same don't search.
val hits: SearchHits = searchShard(index, shard, lastRunContext[shard].toString().toLongOrNull(), maxSeqNo, query.query)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handle current seq number less than current? Possible it also needs initial handling when contexts are created.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fail fast or return empty for cases where there are merges and the sequence numbers are messed up. This way next monitor execution will work correctly

logger.info("Search hits for shard_$shard is: ${hits.hits.size}")

if (hits.hits.isNotEmpty()) {
createFindings(monitor, index, query, hits)
logger.info("found matches")
matchingDocs.addAll(getAllDocIds(hits))
}

logger.info("Updating monitor: ${monitor.id}")
lastRunContext[shard] = maxSeqNo.toString()
val updatedMonitor = monitor.copy(lastRunContext = lastRunContext)
// note: update has to called in serial for shards of a given index.
updateMonitor(client, xContentRegistry, settings, updatedMonitor)
} catch (e: Exception) {
logger.info("Failed to run for shard $shard. Error: ${e.message}")
logger.debug("Failed to run for shard $shard", e)
}
}
return matchingDocs
}

// todo: add more validations.
Expand All @@ -810,7 +892,7 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {

private fun getShardsCount(index: String): Int {
val allShards: List<ShardRouting> = clusterService.state().routingTable().allShards(index)
return allShards.size
return allShards.filter { it.primary() }.size
}

private fun createRunContext(index: String): HashMap<String, Any> {
Expand Down Expand Up @@ -854,6 +936,9 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
}

private fun searchShard(index: String, shard: String, prevSeqNo: Long?, maxSeqNo: Long, query: String): SearchHits {
if (prevSeqNo?.equals(maxSeqNo) == true) {
return SearchHits.empty()
}
val boolQueryBuilder = BoolQueryBuilder()
boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo))
boolQueryBuilder.must(QueryBuilders.queryStringQuery(query))
Expand All @@ -875,39 +960,7 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
return response.hits
}

private fun createFindings(monitor: Monitor, index: String, docLevelQuery: DocLevelQuery, hits: SearchHits) {
val finding = Finding(
id = UUID.randomUUID().toString(),
relatedDocId = getAllDocIds(hits),
monitorId = monitor.id,
monitorName = monitor.name,
index = index,
queryId = docLevelQuery.id,
queryTags = docLevelQuery.tags,
severity = docLevelQuery.severity,
timestamp = Instant.now(),
triggerId = null, // todo: add once integrated with actions/triggers
triggerName = null // todo: add once integrated with actions/triggers
)

val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string()
// change this to debug.
logger.info("Findings: $findingStr")

// todo: below is all hardcoded, temp code and added only to test. replace this with proper Findings index lifecycle management.
val indexRequest = IndexRequest(".opensearch-alerting-findings")
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(findingStr, XContentType.JSON)

client.index(indexRequest).actionGet()
}

private fun getAllDocIds(hits: SearchHits): String {
var sb = StringBuilder()
for (hit in hits) {
sb.append(hit.id)
sb.append(",")
}
return sb.substring(0, sb.length - 1)
private fun getAllDocIds(hits: SearchHits): List<String> {
return hits.map { hit -> hit.id }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,20 @@ import org.opensearch.alerting.model.AggregationResultBucket
import org.opensearch.alerting.model.Alert
import org.opensearch.alerting.model.BucketLevelTrigger
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.DocumentLevelTrigger
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.QueryLevelTrigger
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.script.TriggerScript
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.script.ScriptService
import org.opensearch.search.aggregations.Aggregation
import org.opensearch.search.aggregations.Aggregations
import org.opensearch.search.aggregations.support.AggregationPath
import java.lang.IllegalArgumentException

/** Service that handles executing Triggers */
class TriggerService(val scriptService: ScriptService) {
Expand Down Expand Up @@ -53,6 +55,37 @@ class TriggerService(val scriptService: ScriptService) {
}
}

// TODO: improve performance and support match all and match any
fun runDocLevelTrigger(
monitor: Monitor,
trigger: DocumentLevelTrigger,
ctx: DocumentLevelTriggerExecutionContext,
docsToQueries: Map<String, List<String>>,
queryIds: List<String>
): DocumentLevelTriggerRunResult {
return try {
val triggeredDocs = mutableListOf<String>()

for (doc in docsToQueries.keys) {
val params = trigger.condition.params.toMutableMap()
for (queryId in queryIds) {
params[queryId] = docsToQueries[doc]!!.contains(queryId)
}
val triggered = scriptService.compile(trigger.condition, TriggerScript.CONTEXT)
.newInstance(params)
.execute(ctx)
logger.info("trigger val: $triggered")
if (triggered) triggeredDocs.add(doc)
}

DocumentLevelTriggerRunResult(trigger.name, triggeredDocs, null)
} catch (e: Exception) {
Comment on lines +66 to +82
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of Document -> Query, can we do Query -> Document as number of documents will be fairly large compared to number of rules. Also it will simplify match all cases, since it will boil down to the problem of common ids across multiple lists. thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still in discussion and we will have a path forward in a future PR.

logger.info("Error running script for monitor ${monitor.id}, trigger: ${trigger.id}", e)
// if the script fails we need to send an alert so set triggered = true
DocumentLevelTriggerRunResult(trigger.name, emptyList(), e)
}
}

@Suppress("UNCHECKED_CAST")
fun runBucketLevelTrigger(
monitor: Monitor,
Expand Down
Loading