Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC for doc-level-alerting #277

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.opensearch.alerting.core.settings.ScheduledJobSettings
import org.opensearch.alerting.model.BucketLevelTrigger
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.QueryLevelTrigger
import org.opensearch.alerting.model.docLevelInput.DocLevelMonitorInput
import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction
import org.opensearch.alerting.resthandler.RestDeleteDestinationAction
import org.opensearch.alerting.resthandler.RestDeleteEmailAccountAction
Expand Down Expand Up @@ -208,6 +209,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
return listOf(
Monitor.XCONTENT_REGISTRY,
SearchInput.XCONTENT_REGISTRY,
DocLevelMonitorInput.XCONTENT_REGISTRY,
QueryLevelTrigger.XCONTENT_REGISTRY,
BucketLevelTrigger.XCONTENT_REGISTRY
)
Expand Down
203 changes: 203 additions & 0 deletions alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,23 @@ import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import org.apache.logging.log4j.LogManager
import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.alerts.moveAlerts
import org.opensearch.alerting.core.JobRunner
import org.opensearch.alerting.core.model.ScheduledJob
import org.opensearch.alerting.elasticapi.InjectorContextElement
import org.opensearch.alerting.elasticapi.retry
import org.opensearch.alerting.elasticapi.string
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.Alert
import org.opensearch.alerting.model.AlertingConfigAccessor
import org.opensearch.alerting.model.BucketLevelTrigger
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.Finding
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.MonitorRunResult
Expand All @@ -39,6 +45,8 @@ import org.opensearch.alerting.model.action.AlertCategory
import org.opensearch.alerting.model.action.PerAlertActionScope
import org.opensearch.alerting.model.action.PerExecutionActionScope
import org.opensearch.alerting.model.destination.DestinationContextFactory
import org.opensearch.alerting.model.docLevelInput.DocLevelMonitorInput
import org.opensearch.alerting.model.docLevelInput.DocLevelQuery
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.script.TriggerExecutionContext
Expand All @@ -60,17 +68,32 @@ import org.opensearch.alerting.util.getCombinedTriggerRunResult
import org.opensearch.alerting.util.isADMonitor
import org.opensearch.alerting.util.isAllowed
import org.opensearch.alerting.util.isBucketLevelMonitor
import org.opensearch.alerting.util.isDocLevelMonitor
import org.opensearch.alerting.util.updateMonitor
import org.opensearch.client.Client
import org.opensearch.cluster.routing.ShardRouting
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.Strings
import org.opensearch.common.component.AbstractLifecycleComponent
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentType
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.QueryBuilders
import org.opensearch.rest.RestStatus
import org.opensearch.script.Script
import org.opensearch.script.ScriptService
import org.opensearch.script.TemplateScript
import org.opensearch.search.SearchHits
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.sort.SortOrder
import org.opensearch.threadpool.ThreadPool
import java.io.IOException
import java.time.Instant
import java.util.UUID
import kotlin.collections.HashMap
import kotlin.coroutines.CoroutineContext

object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
Expand Down Expand Up @@ -247,6 +270,8 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
launch {
if (job.isBucketLevelMonitor()) {
runBucketLevelMonitor(job, periodStart, periodEnd)
} else if (job.isDocLevelMonitor()) {
runDocLevelMonitor(job, periodStart, periodEnd)
} else {
runQueryLevelMonitor(job, periodStart, periodEnd)
}
Expand Down Expand Up @@ -707,4 +732,182 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
.newInstance(template.params + mapOf("ctx" to ctx.asTemplateArg()))
.execute()
}

private suspend fun runDocLevelMonitor(monitor: Monitor, periodStart: Instant, periodEnd: Instant, dryrun: Boolean = false) {

logger.info("Document-level-monitor is running ...")
try {
validate(monitor)
} catch (e: Exception) {
logger.info("Failed to start Document-level-monitor. Error: ${e.message}")
return
}

val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput
val index = docLevelMonitorInput.indices[0]
val queries: List<DocLevelQuery> = docLevelMonitorInput.queries

var lastRunContext = monitor.lastRunContext.toMutableMap()
try {
if (lastRunContext.isNullOrEmpty()) {
lastRunContext = createRunContext(index).toMutableMap()
}
} catch (e: Exception) {
logger.info("Failed to start Document-level-monitor $index. Error: ${e.message}")
return
}

for (query in queries) {
runForEachQuery(monitor, lastRunContext, index, query)
}
}

private suspend fun runForEachQuery(monitor: Monitor, lastRunContext: MutableMap<String, Any>, index: String, query: DocLevelQuery) {
val count: Int = lastRunContext["shards_count"] as Int
for (i: Int in 0 until count) {
val shard = i.toString()
try {
logger.info("Monitor execution for shard: $shard")

val maxSeqNo: Long = getMaxSeqNo(index, shard)
logger.info("MaxSeqNo of shard_$shard is $maxSeqNo")

// todo: scope to optimize this: in prev seqno and current max seq no are same don't search.
val hits: SearchHits = searchShard(index, shard, lastRunContext[shard].toString().toLongOrNull(), maxSeqNo, query.query)
logger.info("Search hits for shard_$shard is: ${hits.hits.size}")

if (hits.hits.isNotEmpty()) {
createFindings(monitor, index, query, hits)
}

logger.info("Updating monitor: ${monitor.id}")
lastRunContext[shard] = maxSeqNo.toString()
val updatedMonitor = monitor.copy(lastRunContext = lastRunContext)
// note: update has to called in serial for shards of a given index.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this the case? I thought a monitor's execution is not done in parallel, so couldn't we do the update call after executing on all shards?

updateMonitor(client, xContentRegistry, settings, updatedMonitor)
} catch (e: Exception) {
logger.info("Failed to run for shard $shard. Error: ${e.message}")
logger.debug("Failed to run for shard $shard", e)
}
}
}

// todo: add more validations.
private fun validate(monitor: Monitor) {
if (monitor.inputs.size > 1) {
throw IOException("Only one input is supported with document-level-monitor.")
}

if (monitor.inputs[0].name() != DocLevelMonitorInput.DOC_LEVEL_INPUT_FIELD) {
throw IOException("Invalid input with document-level-monitor.")
}

val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput
if (docLevelMonitorInput.indices.size > 1) {
throw IOException("Only one index is supported with document-level-monitor.")
}
}

private fun getShardsCount(index: String): Int {
val allShards: List<ShardRouting> = clusterService.state().routingTable().allShards(index)
return allShards.size
}

private fun createRunContext(index: String): HashMap<String, Any> {
val lastRunContext = HashMap<String, Any>()
lastRunContext["index"] = index
val count = getShardsCount(index)
lastRunContext["shards_count"] = count

for (i: Int in 0 until count) {
val shard = i.toString()
val maxSeqNo: Long = getMaxSeqNo(index, shard)
lastRunContext[shard] = maxSeqNo
}
return lastRunContext
}

/**
* Get the current max seq number of the shard. We find it by searching the last document
* in the primary shard.
*/
private fun getMaxSeqNo(index: String, shard: String): Long {
val request: SearchRequest = SearchRequest()
.indices(index)
.preference("_shards:$shard")
.source(
SearchSourceBuilder()
.version(true)
.sort("_seq_no", SortOrder.DESC)
.seqNoAndPrimaryTerm(true)
.query(QueryBuilders.matchAllQuery())
.size(1)
)
val response: SearchResponse = client.search(request).actionGet()
if (response.status() !== RestStatus.OK) {
throw IOException("Failed to get max seq no for shard: $shard")
}
if (response.hits.hits.isEmpty())
return -1L
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be 0 instead or line 809 can throw errors since you cant have a seq# less or equal to -1.


return response.hits.hits[0].seqNo
}

private fun searchShard(index: String, shard: String, prevSeqNo: Long?, maxSeqNo: Long, query: String): SearchHits {
val boolQueryBuilder = BoolQueryBuilder()
boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo))
boolQueryBuilder.must(QueryBuilders.queryStringQuery(query))

val request: SearchRequest = SearchRequest()
.indices(index)
.preference("_shards:$shard")
.source(
SearchSourceBuilder()
.version(true)
.query(boolQueryBuilder)
.size(10000) // fixme: make this configurable.
)
logger.info("Request: $request")
val response: SearchResponse = client.search(request).actionGet()
if (response.status() !== RestStatus.OK) {
throw IOException("Failed to search shard: $shard")
}
return response.hits
}

private fun createFindings(monitor: Monitor, index: String, docLevelQuery: DocLevelQuery, hits: SearchHits) {
val finding = Finding(
id = UUID.randomUUID().toString(),
relatedDocId = getAllDocIds(hits),
monitorId = monitor.id,
monitorName = monitor.name,
index = index,
queryId = docLevelQuery.id,
queryTags = docLevelQuery.tags,
severity = docLevelQuery.severity,
timestamp = Instant.now(),
triggerId = null, // todo: add once integrated with actions/triggers
triggerName = null // todo: add once integrated with actions/triggers
)

val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string()
// change this to debug.
logger.info("Findings: $findingStr")

// todo: below is all hardcoded, temp code and added only to test. replace this with proper Findings index lifecycle management.
val indexRequest = IndexRequest(".opensearch-alerting-findings")
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(findingStr, XContentType.JSON)

client.index(indexRequest).actionGet()
}

private fun getAllDocIds(hits: SearchHits): String {
var sb = StringBuilder()
for (hit in hits) {
sb.append(hit.id)
sb.append(",")
}
return sb.substring(0, sb.length - 1)
}
}
Loading