Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add action and alert flow and findings schema and additional fixes #381

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.AggregationResultBucket
import org.opensearch.alerting.model.Alert
import org.opensearch.alerting.model.BucketLevelTrigger
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.model.Trigger
import org.opensearch.alerting.model.action.AlertCategory
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.getBucketKeysHash
Expand Down Expand Up @@ -166,6 +168,56 @@ class AlertService(
}
}

// TODO: clean this up so it follows the proper alert management for doc monitors
fun composeDocLevelAlert(
lezzago marked this conversation as resolved.
Show resolved Hide resolved
ctx: DocumentLevelTriggerExecutionContext,
result: DocumentLevelTriggerRunResult,
alertError: AlertError?
): Alert {
val currentTime = Instant.now()
val currentAlert = ctx.alert

val updatedActionExecutionResults = mutableListOf<ActionExecutionResult>()
val currentActionIds = mutableSetOf<String>()
if (currentAlert != null) {
// update current alert's action execution results
for (actionExecutionResult in currentAlert.actionExecutionResults) {
val actionId = actionExecutionResult.actionId
currentActionIds.add(actionId)
val actionRunResult = result.actionResults[actionId]
lezzago marked this conversation as resolved.
Show resolved Hide resolved
when {
actionRunResult == null -> updatedActionExecutionResults.add(actionExecutionResult)
actionRunResult.throttled ->
updatedActionExecutionResults.add(
actionExecutionResult.copy(
throttledCount = actionExecutionResult.throttledCount + 1
)
)
else -> updatedActionExecutionResults.add(actionExecutionResult.copy(lastExecutionTime = actionRunResult.executionTime))
}
}
// add action execution results which not exist in current alert
updatedActionExecutionResults.addAll(
result.actionResults.filter { !currentActionIds.contains(it.key) }
.map { ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0) }
lezzago marked this conversation as resolved.
Show resolved Hide resolved
)
} else {
updatedActionExecutionResults.addAll(
result.actionResults.map {
ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0)
}
)
lezzago marked this conversation as resolved.
Show resolved Hide resolved
}

val alertState = if (alertError == null) Alert.State.ACTIVE else Alert.State.ERROR
return Alert(
monitor = ctx.monitor, trigger = ctx.trigger, startTime = currentTime,
lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message,
actionExecutionResults = updatedActionExecutionResults, schemaVersion = IndexUtils.alertIndexSchemaVersion,
findingIds = ctx.relatedFindings, relatedDocIds = ctx.triggeredDocs
)
}

fun updateActionResultsForBucketLevelAlert(
currentAlert: Alert,
actionResults: Map<String, ActionRunResult>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.opensearch.alerting.core.JobSweeper
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsAction
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsTransportAction
import org.opensearch.alerting.core.model.DocLevelMonitorInput
import org.opensearch.alerting.core.model.ScheduledJob
import org.opensearch.alerting.core.model.SearchInput
import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler
Expand All @@ -40,7 +41,6 @@ import org.opensearch.alerting.model.BucketLevelTrigger
import org.opensearch.alerting.model.DocumentLevelTrigger
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.QueryLevelTrigger
import org.opensearch.alerting.model.docLevelInput.DocLevelMonitorInput
import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction
import org.opensearch.alerting.resthandler.RestDeleteDestinationAction
import org.opensearch.alerting.resthandler.RestDeleteEmailAccountAction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,13 @@ import java.time.Instant
object BucketLevelMonitorRunner : MonitorRunner {
private val logger = LogManager.getLogger(javaClass)

override suspend fun runMonitor(monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, periodStart: Instant, periodEnd: Instant, dryrun: Boolean):
MonitorRunResult<BucketLevelTriggerRunResult> {
override suspend fun runMonitor(
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
periodStart: Instant,
periodEnd: Instant,
dryrun: Boolean
): MonitorRunResult<BucketLevelTriggerRunResult> {
lezzago marked this conversation as resolved.
Show resolved Hide resolved
val roles = MonitorRunnerService.getRolesForMonitor(monitor)
logger.debug("Running monitor: ${monitor.name} with roles: $roles Thread: ${Thread.currentThread().name}")

Expand Down Expand Up @@ -82,7 +87,12 @@ object BucketLevelMonitorRunner : MonitorRunner {
// in the final output of monitorResult which occurs when all pages have been exhausted.
// If it's favorable to return the last page, will need to check how to accomplish that with multiple aggregation paths
// with different page counts.
val inputResults = monitorCtx.inputService!!.collectInputResults(monitor, periodStart, periodEnd, monitorResult.inputResults)
val inputResults = monitorCtx.inputService!!.collectInputResults(
monitor,
periodStart,
periodEnd,
monitorResult.inputResults
)
if (firstIteration) {
firstPageOfInputResults = inputResults
firstIteration = false
Expand Down Expand Up @@ -149,7 +159,8 @@ object BucketLevelMonitorRunner : MonitorRunner {
// in favor of just using the currentAlerts as-is.
currentAlerts.forEach { (trigger, keysToAlertsMap) ->
if (triggerResults[trigger.id]?.error == null)
nextAlerts[trigger.id]?.get(AlertCategory.COMPLETED)?.addAll(monitorCtx.alertService!!.convertToCompletedAlerts(keysToAlertsMap))
nextAlerts[trigger.id]?.get(AlertCategory.COMPLETED)
?.addAll(monitorCtx.alertService!!.convertToCompletedAlerts(keysToAlertsMap))
}

for (trigger in monitor.triggers) {
Expand Down Expand Up @@ -262,24 +273,39 @@ object BucketLevelMonitorRunner : MonitorRunner {
if (!dryrun && monitor.id != Monitor.NO_ID) {
monitorCtx.alertService!!.saveAlerts(updatedAlerts, monitorCtx.retryPolicy!!, allowUpdatingAcknowledgedAlert = false)
// Save any COMPLETED Alerts that were not covered in updatedAlerts
monitorCtx.alertService!!.saveAlerts(completedAlertsToUpdate.toList(), monitorCtx.retryPolicy!!, allowUpdatingAcknowledgedAlert = false)
monitorCtx.alertService!!.saveAlerts(
completedAlertsToUpdate.toList(),
monitorCtx.retryPolicy!!,
allowUpdatingAcknowledgedAlert = false
)
}
}

return monitorResult.copy(inputResults = firstPageOfInputResults, triggerResults = triggerResults)
}

override suspend fun runAction(action: Action, ctx: TriggerExecutionContext, monitorCtx: MonitorRunnerExecutionContext, dryrun: Boolean): ActionRunResult {
override suspend fun runAction(
action: Action,
ctx: TriggerExecutionContext,
monitorCtx: MonitorRunnerExecutionContext,
dryrun: Boolean
): ActionRunResult {
return try {
val actionOutput = mutableMapOf<String, String>()
actionOutput[Action.SUBJECT] = if (action.subjectTemplate != null) MonitorRunnerService.compileTemplate(action.subjectTemplate, ctx) else ""
actionOutput[Action.SUBJECT] = if (action.subjectTemplate != null)
MonitorRunnerService.compileTemplate(action.subjectTemplate, ctx)
else ""
actionOutput[Action.MESSAGE] = MonitorRunnerService.compileTemplate(action.messageTemplate, ctx)
if (Strings.isNullOrEmpty(actionOutput[Action.MESSAGE])) {
throw IllegalStateException("Message content missing in the Destination with id: ${action.destinationId}")
}
if (!dryrun) {
withContext(Dispatchers.IO) {
val destination = AlertingConfigAccessor.getDestinationInfo(monitorCtx.client!!, monitorCtx.xContentRegistry!!, action.destinationId)
val destination = AlertingConfigAccessor.getDestinationInfo(
monitorCtx.client!!,
monitorCtx.xContentRegistry!!,
action.destinationId
)
if (!destination.isAllowed(monitorCtx.allowList)) {
throw IllegalStateException("Monitor contains a Destination type that is not allowed: ${destination.type}")
}
Expand Down Expand Up @@ -323,8 +349,8 @@ object BucketLevelMonitorRunner : MonitorRunner {
if (totalActionableAlertCount > monitorCtx.maxActionableAlertCount) {
logger.debug(
"The total actionable alerts for trigger [$triggerId] in monitor [$monitorId] is [$totalActionableAlertCount] " +
"which exceeds the maximum of [$(monitorCtx.maxActionableAlertCount)]. Defaulting to [${ActionExecutionScope.Type.PER_EXECUTION}] " +
"for action execution."
"which exceeds the maximum of [${monitorCtx.maxActionableAlertCount}]. " +
"Defaulting to [${ActionExecutionScope.Type.PER_EXECUTION}] for action execution."
)
return true
}
Expand Down
Loading