Skip to content

Commit

Permalink
Finding Search API (opensearch-project#385)
Browse files Browse the repository at this point in the history
* Findings search API based on Annie's work

Signed-off-by: Annie Lee <leeyun@amazon.com>

* Fix Search API and add IT tests

Signed-off-by: Ashish Agrawal <ashisagr@amazon.com>

Co-authored-by: Annie Lee <leeyun@amazon.com>
  • Loading branch information
lezzago and Annie Lee committed Apr 10, 2022
1 parent 7cdfa7a commit 3a2e24b
Show file tree
Hide file tree
Showing 19 changed files with 778 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import org.opensearch.alerting.action.GetAlertsAction
import org.opensearch.alerting.action.GetDestinationsAction
import org.opensearch.alerting.action.GetEmailAccountAction
import org.opensearch.alerting.action.GetEmailGroupAction
import org.opensearch.alerting.action.GetFindingsAction
import org.opensearch.alerting.action.GetMonitorAction
import org.opensearch.alerting.action.IndexDestinationAction
import org.opensearch.alerting.action.IndexEmailAccountAction
Expand Down Expand Up @@ -53,6 +54,7 @@ import org.opensearch.alerting.resthandler.RestGetAlertsAction
import org.opensearch.alerting.resthandler.RestGetDestinationsAction
import org.opensearch.alerting.resthandler.RestGetEmailAccountAction
import org.opensearch.alerting.resthandler.RestGetEmailGroupAction
import org.opensearch.alerting.resthandler.RestGetFindingsAction
import org.opensearch.alerting.resthandler.RestGetMonitorAction
import org.opensearch.alerting.resthandler.RestIndexDestinationAction
import org.opensearch.alerting.resthandler.RestIndexEmailAccountAction
Expand All @@ -76,6 +78,7 @@ import org.opensearch.alerting.transport.TransportGetAlertsAction
import org.opensearch.alerting.transport.TransportGetDestinationsAction
import org.opensearch.alerting.transport.TransportGetEmailAccountAction
import org.opensearch.alerting.transport.TransportGetEmailGroupAction
import org.opensearch.alerting.transport.TransportGetFindingsSearchAction
import org.opensearch.alerting.transport.TransportGetMonitorAction
import org.opensearch.alerting.transport.TransportIndexDestinationAction
import org.opensearch.alerting.transport.TransportIndexEmailAccountAction
Expand Down Expand Up @@ -141,6 +144,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
@JvmField val EMAIL_GROUP_BASE_URI = "$DESTINATION_BASE_URI/email_groups"
@JvmField val LEGACY_OPENDISTRO_EMAIL_ACCOUNT_BASE_URI = "$LEGACY_OPENDISTRO_DESTINATION_BASE_URI/email_accounts"
@JvmField val LEGACY_OPENDISTRO_EMAIL_GROUP_BASE_URI = "$LEGACY_OPENDISTRO_DESTINATION_BASE_URI/email_groups"
@JvmField val FINDING_BASE_URI = "/_plugins/_alerting/findings"
@JvmField val ALERTING_JOB_TYPES = listOf("monitor")
}

Expand Down Expand Up @@ -180,7 +184,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
RestSearchEmailGroupAction(),
RestGetEmailGroupAction(),
RestGetDestinationsAction(),
RestGetAlertsAction()
RestGetAlertsAction(),
RestGetFindingsAction()
)
}

Expand All @@ -204,7 +209,9 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ActionPlugin.ActionHandler(SearchEmailGroupAction.INSTANCE, TransportSearchEmailGroupAction::class.java),
ActionPlugin.ActionHandler(DeleteEmailGroupAction.INSTANCE, TransportDeleteEmailGroupAction::class.java),
ActionPlugin.ActionHandler(GetDestinationsAction.INSTANCE, TransportGetDestinationsAction::class.java),
ActionPlugin.ActionHandler(GetAlertsAction.INSTANCE, TransportGetAlertsAction::class.java)
ActionPlugin.ActionHandler(GetAlertsAction.INSTANCE, TransportGetAlertsAction::class.java),
ActionPlugin.ActionHandler(GetFindingsAction.INSTANCE, TransportGetFindingsSearchAction::class.java)

)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ object DocumentReturningMonitorRunner : MonitorRunner {
if (!dryrun && monitor.id != Monitor.NO_ID) {
docsToQueries.forEach {
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
val findingId = createFindings(monitor, monitorCtx, index, triggeredQueries, listOf(it.key), trigger)
val findingId = createFindings(monitor, monitorCtx, index, triggeredQueries, listOf(it.key))
findings.add(findingId)

if (triggerResult.triggeredDocs.contains(it.key)) {
Expand Down Expand Up @@ -208,8 +208,7 @@ object DocumentReturningMonitorRunner : MonitorRunner {
monitorCtx: MonitorRunnerExecutionContext,
index: String,
docLevelQueries: List<DocLevelQuery>,
matchingDocIds: List<String>,
trigger: DocumentLevelTrigger
matchingDocIds: List<String>
): String {
val finding = Finding(
id = UUID.randomUUID().toString(),
Expand All @@ -218,9 +217,7 @@ object DocumentReturningMonitorRunner : MonitorRunner {
monitorName = monitor.name,
index = index,
docLevelQueries = docLevelQueries,
timestamp = Instant.now(),
triggerId = trigger.id,
triggerName = trigger.name
timestamp = Instant.now()
)

val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.action

import org.opensearch.action.ActionType

class GetFindingsAction private constructor() : ActionType<GetFindingsResponse>(NAME, ::GetFindingsResponse) {
companion object {
val INSTANCE = GetFindingsAction()
val NAME = "cluster:admin/opendistro/alerting/findings/get"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.action

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.alerting.model.Table
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import java.io.IOException

class GetFindingsRequest : ActionRequest {
val findingId: String?
val table: Table

constructor(
findingId: String?,
table: Table
) : super() {
this.findingId = findingId
this.table = table
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
findingId = sin.readOptionalString(),
table = Table.readFrom(sin)
)

override fun validate(): ActionRequestValidationException? {
return null
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeOptionalString(findingId)
table.writeTo(out)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.action

import org.opensearch.action.ActionResponse
import org.opensearch.alerting.model.FindingWithDocs
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.ToXContentObject
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.rest.RestStatus
import java.io.IOException

class GetFindingsResponse : ActionResponse, ToXContentObject {
var status: RestStatus
var totalFindings: Int?
var findings: List<FindingWithDocs>

constructor(
status: RestStatus,
totalFindings: Int?,
findings: List<FindingWithDocs>
) : super() {
this.status = status
this.totalFindings = totalFindings
this.findings = findings
}

@Throws(IOException::class)
constructor(sin: StreamInput) {
this.status = sin.readEnum(RestStatus::class.java)
val findings = mutableListOf<FindingWithDocs>()
this.totalFindings = sin.readOptionalInt()
var currentSize = sin.readInt()
for (i in 0 until currentSize) {
findings.add(FindingWithDocs.readFrom(sin))
}
this.findings = findings
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeEnum(status)
out.writeOptionalInt(totalFindings)
out.writeInt(findings.size)
for (finding in findings) {
finding.writeTo(out)
}
}

@Throws(IOException::class)
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
.field("total_findings", totalFindings)
.field("findings", findings)

return builder.endObject()
}
}
35 changes: 11 additions & 24 deletions alerting/src/main/kotlin/org/opensearch/alerting/model/Finding.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ class Finding(
val monitorName: String,
val index: String,
val docLevelQueries: List<DocLevelQuery>,
val timestamp: Instant,
val triggerId: String?,
val triggerName: String?
val timestamp: Instant
) : Writeable, ToXContent {

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
id = sin.readString(),
Expand All @@ -39,9 +38,7 @@ class Finding(
monitorName = sin.readString(),
index = sin.readString(),
docLevelQueries = sin.readList((DocLevelQuery)::readFrom),
timestamp = sin.readInstant(),
triggerId = sin.readOptionalString(),
triggerName = sin.readOptionalString()
timestamp = sin.readInstant()
)

fun asTemplateArg(): Map<String, Any?> {
Expand All @@ -52,9 +49,7 @@ class Finding(
MONITOR_NAME_FIELD to monitorName,
INDEX_FIELD to index,
QUERIES_FIELD to docLevelQueries,
TIMESTAMP_FIELD to timestamp.toEpochMilli(),
TRIGGER_ID_FIELD to triggerId,
TRIGGER_NAME_FIELD to triggerName
TIMESTAMP_FIELD to timestamp.toEpochMilli()
)
}

Expand All @@ -67,8 +62,6 @@ class Finding(
.field(INDEX_FIELD, index)
.field(QUERIES_FIELD, docLevelQueries.toTypedArray())
.field(TIMESTAMP_FIELD, timestamp.toEpochMilli())
.field(TRIGGER_ID_FIELD, triggerId)
.field(TRIGGER_NAME_FIELD, triggerName)
builder.endObject()
return builder
}
Expand All @@ -82,8 +75,6 @@ class Finding(
out.writeString(index)
out.writeCollection(docLevelQueries)
out.writeInstant(timestamp)
out.writeOptionalString(triggerId)
out.writeOptionalString(triggerName)
}

companion object {
Expand All @@ -94,28 +85,26 @@ class Finding(
const val INDEX_FIELD = "index"
const val QUERIES_FIELD = "queries"
const val TIMESTAMP_FIELD = "timestamp"
const val TRIGGER_ID_FIELD = "trigger_id"
const val TRIGGER_NAME_FIELD = "trigger_name"
const val NO_ID = ""

@JvmStatic @JvmOverloads
@Throws(IOException::class)
fun parse(xcp: XContentParser, id: String = NO_ID): Finding {
fun parse(xcp: XContentParser): Finding {
var id: String = NO_ID
lateinit var relatedDocId: String
lateinit var monitorId: String
lateinit var monitorName: String
lateinit var index: String
val queries: MutableList<DocLevelQuery> = mutableListOf()
lateinit var timestamp: Instant
lateinit var triggerId: String
lateinit var triggerName: String

ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
FINDING_ID_FIELD -> id = xcp.text()
RELATED_DOC_ID_FIELD -> relatedDocId = xcp.text()
MONITOR_ID_FIELD -> monitorId = xcp.text()
MONITOR_NAME_FIELD -> monitorName = xcp.text()
Expand All @@ -126,9 +115,9 @@ class Finding(
queries.add(DocLevelQuery.parse(xcp))
}
}
TIMESTAMP_FIELD -> timestamp = requireNotNull(xcp.instant())
TRIGGER_ID_FIELD -> triggerId = xcp.text()
TRIGGER_NAME_FIELD -> triggerName = xcp.text()
TIMESTAMP_FIELD -> {
timestamp = requireNotNull(xcp.instant())
}
}
}

Expand All @@ -139,9 +128,7 @@ class Finding(
monitorName = monitorName,
index = index,
docLevelQueries = queries,
timestamp = timestamp,
triggerId = triggerId,
triggerName = triggerName
timestamp = timestamp
)
}

Expand Down
Loading

0 comments on commit 3a2e24b

Please sign in to comment.