Skip to content

Commit

Permalink
Alias support for Document Level Monitors (#416)
Browse files Browse the repository at this point in the history
* Implemented support for defining doc level monitors using aliases.

Signed-off-by: AWSHurneyt <hurneyt@amazon.com>

* Fix integ tests and cleaup alias logic

Signed-off-by: Ashish Agrawal <ashisagr@amazon.com>

Co-authored-by: AWSHurneyt <hurneyt@amazon.com>
  • Loading branch information
lezzago and AWSHurneyt authored Apr 21, 2022
1 parent f0e8e20 commit c29956b
Show file tree
Hide file tree
Showing 15 changed files with 580 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.alerting

import org.apache.logging.log4j.LogManager
import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.search.SearchAction
import org.opensearch.action.search.SearchRequest
Expand Down Expand Up @@ -57,17 +58,16 @@ object DocumentReturningMonitorRunner : MonitorRunner() {
periodEnd: Instant,
dryrun: Boolean
): MonitorRunResult<DocumentLevelTriggerRunResult> {
logger.info("Document-level-monitor is running ...")
logger.debug("Document-level-monitor is running ...")
var monitorResult = MonitorRunResult<DocumentLevelTriggerRunResult>(monitor.name, periodStart, periodEnd)

// TODO: is this needed from Charlie?
try {
monitorCtx.alertIndices!!.createOrUpdateAlertIndex()
monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex()
monitorCtx.alertIndices!!.createOrUpdateInitialFindingHistoryIndex()
} catch (e: Exception) {
val id = if (monitor.id.trim().isEmpty()) "_na_" else monitor.id
logger.error("Error loading alerts for monitor: $id", e)
logger.error("Error setting up alerts and findings indices for monitor: $id", e)
return monitorResult.copy(error = e)
}

Expand All @@ -83,62 +83,87 @@ object DocumentReturningMonitorRunner : MonitorRunner() {
val queries: List<DocLevelQuery> = docLevelMonitorInput.queries

val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID
var lastRunContext = monitor.lastRunContext.toMutableMap()
try {
if (lastRunContext.isNullOrEmpty()) {
lastRunContext = createRunContext(monitorCtx.clusterService!!, monitorCtx.client!!, index).toMutableMap()
}
} catch (e: Exception) {
logger.info("Failed to start Document-level-monitor $index. Error: ${e.message}")
return monitorResult.copy(error = e)
}

val count: Int = lastRunContext["shards_count"] as Int
val lastRunContext = if (monitor.lastRunContext.isNullOrEmpty()) mutableMapOf()
else monitor.lastRunContext.toMutableMap() as MutableMap<String, MutableMap<String, Any>>
val updatedLastRunContext = lastRunContext.toMutableMap()
for (i: Int in 0 until count) {
val shard = i.toString()
val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard)
updatedLastRunContext[shard] = maxSeqNo

// update lastRunContext if its a temp monitor as we only want to view the last bit of data then
// TODO: If dryrun, we should make it so we limit the search as this could still potentially give us lots of data
if (isTempMonitor) {
lastRunContext[shard] = max(-1, maxSeqNo - 1)
}
}

val queryToDocIds = mutableMapOf<DocLevelQuery, MutableSet<String>>()
val docsToQueries = mutableMapOf<String, MutableList<String>>()
val docExecutionContext = DocumentExecutionContext(queries, lastRunContext, updatedLastRunContext)
val idQueryMap = mutableMapOf<String, DocLevelQuery>()

val matchingDocs = getMatchingDocs(monitor, monitorCtx, docExecutionContext, index, dryrun)
try {
val getAliasesRequest = GetAliasesRequest(index)
val getAliasesResponse = monitorCtx.client!!.admin().indices().getAliases(getAliasesRequest).actionGet()
val aliasIndices = getAliasesResponse.aliases.keys().map { it.value }

if (matchingDocs.isNotEmpty()) {
val matchedQueriesForDocs = getMatchedQueries(monitorCtx, matchingDocs.map { it.second }, monitor)
val isAlias = aliasIndices.isNotEmpty()
logger.debug("index, $index, is an alias index: $isAlias")

matchedQueriesForDocs.forEach { hit ->
val (id, query) = Pair(
hit.id.replace("_${monitor.id}", ""),
((hit.sourceAsMap["query"] as HashMap<*, *>)["query_string"] as HashMap<*, *>)["query"]
)
val docLevelQuery = DocLevelQuery(id, id, query.toString())

val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() }
docIndices.forEach { idx ->
if (queryToDocIds.containsKey(docLevelQuery)) {
queryToDocIds[docLevelQuery]?.add(matchingDocs[idx].first)
} else {
queryToDocIds[docLevelQuery] = mutableSetOf(matchingDocs[idx].first)
// If the input index is an alias, creating a list of all indices associated with that alias;
// else creating a list containing the single index input
val indices = if (isAlias) getAliasesResponse.aliases.keys().map { it.value } else listOf(index)

indices.forEach { indexName ->
// Prepare lastRunContext for each index
val indexLastRunContext = lastRunContext.getOrPut(indexName) {
createRunContext(monitorCtx.clusterService!!, monitorCtx.client!!, indexName)
}

// Prepare updatedLastRunContext for each index
val indexUpdatedRunContext = updateLastRunContext(
indexLastRunContext.toMutableMap(),
monitorCtx,
indexName
) as MutableMap<String, Any>
updatedLastRunContext[indexName] = indexUpdatedRunContext

val count: Int = indexLastRunContext["shards_count"] as Int
for (i: Int in 0 until count) {
val shard = i.toString()

// update lastRunContext if its a temp monitor as we only want to view the last bit of data then
// TODO: If dryrun, we should make it so we limit the search as this could still potentially give us lots of data
if (isTempMonitor) {
indexLastRunContext[shard] = max(-1, (indexUpdatedRunContext[shard] as String).toInt() - 1)
}
}

if (docsToQueries.containsKey(matchingDocs[idx].first)) {
docsToQueries[matchingDocs[idx].first]?.add(id)
} else {
docsToQueries[matchingDocs[idx].first] = mutableListOf(id)
// Prepare DocumentExecutionContext for each index
val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext)

val matchingDocs = getMatchingDocs(monitor, monitorCtx, docExecutionContext, indexName)

if (matchingDocs.isNotEmpty()) {
val matchedQueriesForDocs = getMatchedQueries(monitorCtx, matchingDocs.map { it.second }, monitor, indexName)

matchedQueriesForDocs.forEach { hit ->
val (id, query) = Pair(
hit.id.replace("_${indexName}_${monitor.id}", ""),
((hit.sourceAsMap["query"] as HashMap<*, *>)["query_string"] as HashMap<*, *>)["query"]
)
val docLevelQuery = DocLevelQuery(id, id, query.toString())

val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() }
docIndices.forEach { idx ->
val docIndex = "${matchingDocs[idx].first}|$indexName"
if (queryToDocIds.containsKey(docLevelQuery)) {
queryToDocIds[docLevelQuery]?.add(docIndex)
} else {
queryToDocIds[docLevelQuery] = mutableSetOf(docIndex)
}

if (docsToQueries.containsKey(docIndex)) {
docsToQueries[docIndex]?.add(id)
} else {
docsToQueries[docIndex] = mutableListOf(id)
}
}
}
}
}
} catch (e: Exception) {
logger.error("Failed to start Document-level-monitor $index. Error: ${e.message}", e)
return monitorResult.copy(error = e)
}

val queryInputResults = queryToDocIds.mapKeys { it.key.id }
Expand Down Expand Up @@ -193,17 +218,14 @@ object DocumentReturningMonitorRunner : MonitorRunner() {
logger.info("trigger results")
logger.info(triggerResult.triggeredDocs.toString())

val index = (monitor.inputs[0] as DocLevelMonitorInput).indices[0]

// TODO: modify findings such that there is a finding per document
val findings = mutableListOf<String>()
val findingDocPairs = mutableListOf<Pair<String, String>>()

// TODO: Implement throttling for findings
if (!dryrun && monitor.id != Monitor.NO_ID) {
docsToQueries.forEach {
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
val findingId = createFindings(monitor, monitorCtx, index, triggeredQueries, listOf(it.key))
val findingId = createFindings(monitor, monitorCtx, triggeredQueries, it.key)
findings.add(findingId)

if (triggerResult.triggeredDocs.contains(it.key)) {
Expand Down Expand Up @@ -244,25 +266,25 @@ object DocumentReturningMonitorRunner : MonitorRunner() {
private fun createFindings(
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
index: String,
docLevelQueries: List<DocLevelQuery>,
matchingDocIds: List<String>
matchingDocId: String
): String {
// Before the "|" is the doc id and after the "|" is the index
val docIndex = matchingDocId.split("|")

val finding = Finding(
id = UUID.randomUUID().toString(),
relatedDocIds = matchingDocIds,
relatedDocIds = listOf(docIndex[0]),
monitorId = monitor.id,
monitorName = monitor.name,
index = index,
index = docIndex[1],
docLevelQueries = docLevelQueries,
timestamp = Instant.now()
)

val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string()
// change this to debug.
logger.info("Findings: $findingStr")
logger.debug("Findings: $findingStr")

// todo: below is all hardcoded, temp code and added only to test. replace this with proper Findings index lifecycle management.
val indexRequest = IndexRequest(FINDING_HISTORY_WRITE_INDEX)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(findingStr, XContentType.JSON)
Expand All @@ -273,6 +295,21 @@ object DocumentReturningMonitorRunner : MonitorRunner() {
return finding.id
}

private fun updateLastRunContext(
lastRunContext: Map<String, Any>,
monitorCtx: MonitorRunnerExecutionContext,
index: String
): Map<String, Any> {
val count: Int = getShardsCount(monitorCtx.clusterService!!, index)
val updatedLastRunContext = lastRunContext.toMutableMap()
for (i: Int in 0 until count) {
val shard = i.toString()
val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard)
updatedLastRunContext[shard] = maxSeqNo.toString()
}
return updatedLastRunContext
}

private fun validate(monitor: Monitor) {
if (monitor.inputs.size > 1) {
throw IOException("Only one input is supported with document-level-monitor.")
Expand Down Expand Up @@ -337,28 +374,16 @@ object DocumentReturningMonitorRunner : MonitorRunner() {
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
docExecutionCtx: DocumentExecutionContext,
index: String,
dryrun: Boolean
index: String
): List<Pair<String, BytesReference>> {
val count: Int = docExecutionCtx.lastRunContext["shards_count"] as Int
val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int
val matchingDocs = mutableListOf<Pair<String, BytesReference>>()
for (i: Int in 0 until count) {
val shard = i.toString()
try {
logger.info("Monitor execution for shard: $shard")

val maxSeqNo: Long = docExecutionCtx.updatedLastRunContext[shard].toString().toLong()
logger.info("MaxSeqNo of shard_$shard is $maxSeqNo")

// If dryrun, set the previous sequence number as 1 less than the max sequence number or 0
val prevSeqNo = if (dryrun || monitor.id == Monitor.NO_ID)
max(-1, maxSeqNo - 1)
else docExecutionCtx.lastRunContext[shard].toString().toLongOrNull()

if (dryrun) {
logger.info("it is a dryrun")
}

val prevSeqNo = docExecutionCtx.lastRunContext[shard].toString().toLongOrNull()
logger.info("prevSeq: $prevSeqNo, maxSeq: $maxSeqNo")

val hits: SearchHits = searchShard(
Expand All @@ -372,7 +397,7 @@ object DocumentReturningMonitorRunner : MonitorRunner() {
logger.info("Search hits for shard_$shard is: ${hits.hits.size}")

if (hits.hits.isNotEmpty()) {
matchingDocs.addAll(getAllDocs(hits, monitor.id))
matchingDocs.addAll(getAllDocs(hits, index, monitor.id))
}
} catch (e: Exception) {
logger.info("Failed to run for shard $shard. Error: ${e.message}")
Expand Down Expand Up @@ -419,9 +444,10 @@ object DocumentReturningMonitorRunner : MonitorRunner() {
private fun getMatchedQueries(
monitorCtx: MonitorRunnerExecutionContext,
docs: List<BytesReference>,
monitor: Monitor
monitor: Monitor,
index: String
): SearchHits {
val boolQueryBuilder = BoolQueryBuilder()
val boolQueryBuilder = BoolQueryBuilder().filter(QueryBuilders.matchQuery("index", index))

val percolateQueryBuilder = PercolateQueryBuilderExt("query", docs, XContentType.JSON)
if (monitor.id.isNotEmpty()) {
Expand All @@ -442,13 +468,13 @@ object DocumentReturningMonitorRunner : MonitorRunner() {
return response.hits
}

private fun getAllDocs(hits: SearchHits, monitorId: String): List<Pair<String, BytesReference>> {
private fun getAllDocs(hits: SearchHits, index: String, monitorId: String): List<Pair<String, BytesReference>> {
return hits.map { hit ->
val sourceMap = hit.sourceAsMap

var xContentBuilder = XContentFactory.jsonBuilder().startObject()
sourceMap.forEach { (k, v) ->
xContentBuilder = xContentBuilder.field("${k}_$monitorId", v)
xContentBuilder = xContentBuilder.field("${k}_${index}_$monitorId", v)
}
xContentBuilder = xContentBuilder.endObject()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,11 @@ data class Monitor(
// Outputting type with each Trigger so that the generic Trigger.readFrom() can read it
out.writeVInt(triggers.size)
triggers.forEach {
if (it is QueryLevelTrigger) out.writeEnum(Trigger.Type.QUERY_LEVEL_TRIGGER)
else if (it is DocumentLevelTrigger) out.writeEnum(Trigger.Type.DOCUMENT_LEVEL_TRIGGER)
else out.writeEnum(Trigger.Type.BUCKET_LEVEL_TRIGGER)
when (it) {
is BucketLevelTrigger -> out.writeEnum(Trigger.Type.BUCKET_LEVEL_TRIGGER)
is DocumentLevelTrigger -> out.writeEnum(Trigger.Type.DOCUMENT_LEVEL_TRIGGER)
else -> out.writeEnum(Trigger.Type.QUERY_LEVEL_TRIGGER)
}
it.writeTo(out)
}
out.writeMap(lastRunContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class RestGetFindingsAction : BaseRestHandler() {
log.info("${request.method()} ${request.path()}")

val findingID: String? = request.param("findingId")
val sortString = request.param("sortString", "id.keyword")
val sortString = request.param("sortString", "id")
val sortOrder = request.param("sortOrder", "asc")
val missing: String? = request.param("missing")
val size = request.paramAsInt("size", 20)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import java.time.Instant

private val log = LogManager.getLogger(TransportGetMonitorAction::class.java)
private val log = LogManager.getLogger(TransportExecuteMonitorAction::class.java)

class TransportExecuteMonitorAction @Inject constructor(
transportService: TransportService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.alerting.transport

import org.apache.logging.log4j.LogManager
import org.apache.lucene.search.join.ScoreMode
import org.opensearch.action.ActionListener
import org.opensearch.action.get.MultiGetRequest
import org.opensearch.action.search.SearchRequest
Expand Down Expand Up @@ -90,12 +91,23 @@ class TransportGetFindingsSearchAction @Inject constructor(

if (!tableProp.searchString.isNullOrBlank()) {
queryBuilder
.must(
.should(
QueryBuilders
.queryStringQuery(tableProp.searchString)
.defaultOperator(Operator.AND)
.field("queries.tags")
.field("queries.name")
)
.should(
QueryBuilders.nestedQuery(
"queries",
QueryBuilders.boolQuery()
.must(
QueryBuilders
.queryStringQuery(tableProp.searchString)
.defaultOperator(Operator.AND)
.field("queries.tags")
.field("queries.name")
),
ScoreMode.Avg
)
)
}

Expand Down Expand Up @@ -131,7 +143,7 @@ class TransportGetFindingsSearchAction @Inject constructor(
mgetRequest.add(MultiGetRequest.Item(finding.index, docId))
}
}
val documents = searchDocument(mgetRequest)
val documents = if (mgetRequest.items.isEmpty()) mutableMapOf() else searchDocument(mgetRequest)
findings.forEach {
val documentIds = it.relatedDocIds
val relatedDocs = mutableListOf<FindingDocument>()
Expand Down
Loading

0 comments on commit c29956b

Please sign in to comment.