diff --git a/src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt b/src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt index 05c60be8..18a4ba76 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt @@ -14,12 +14,18 @@ import org.opensearch.commons.alerting.action.AcknowledgeAlertResponse import org.opensearch.commons.alerting.action.AlertingActions import org.opensearch.commons.alerting.action.DeleteMonitorRequest import org.opensearch.commons.alerting.action.DeleteMonitorResponse +import org.opensearch.commons.alerting.action.DeleteWorkflowRequest +import org.opensearch.commons.alerting.action.DeleteWorkflowResponse import org.opensearch.commons.alerting.action.GetAlertsRequest import org.opensearch.commons.alerting.action.GetAlertsResponse import org.opensearch.commons.alerting.action.GetFindingsRequest import org.opensearch.commons.alerting.action.GetFindingsResponse +import org.opensearch.commons.alerting.action.GetWorkflowRequest +import org.opensearch.commons.alerting.action.GetWorkflowResponse import org.opensearch.commons.alerting.action.IndexMonitorRequest import org.opensearch.commons.alerting.action.IndexMonitorResponse +import org.opensearch.commons.alerting.action.IndexWorkflowRequest +import org.opensearch.commons.alerting.action.IndexWorkflowResponse import org.opensearch.commons.notifications.action.BaseResponse import org.opensearch.commons.utils.recreateObject @@ -53,6 +59,7 @@ object AlertingPluginInterface { } ) } + fun deleteMonitor( client: NodeClient, request: DeleteMonitorRequest, @@ -71,6 +78,49 @@ object AlertingPluginInterface { ) } + /** + * Index monitor interface. + * @param client Node client for making transport action + * @param request The request object + * @param namedWriteableRegistry Registry for building aggregations + * @param listener The listener for getting response + */ + fun indexWorkflow( + client: NodeClient, + request: IndexWorkflowRequest, + listener: ActionListener + ) { + client.execute( + AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, + request, + wrapActionListener(listener) { response -> + recreateObject(response) { + IndexWorkflowResponse( + it + ) + } + } + ) + } + + fun deleteWorkflow( + client: NodeClient, + request: DeleteWorkflowRequest, + listener: ActionListener + ) { + client.execute( + AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, + request, + wrapActionListener(listener) { response -> + recreateObject(response) { + DeleteWorkflowResponse( + it + ) + } + } + ) + } + /** * Get Alerts interface. * @param client Node client for making transport action @@ -95,6 +145,30 @@ object AlertingPluginInterface { ) } + /** + * Get Workflow interface. + * @param client Node client for making transport action + * @param request The request object + * @param listener The listener for getting response + */ + fun getWorkflow( + client: NodeClient, + request: GetWorkflowRequest, + listener: ActionListener + ) { + client.execute( + AlertingActions.GET_WORKFLOW_ACTION_TYPE, + request, + wrapActionListener(listener) { response -> + recreateObject(response) { + GetWorkflowResponse( + it + ) + } + } + ) + } + /** * Get Findings interface. * @param client Node client for making transport action diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/AlertingActions.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/AlertingActions.kt index 739217f2..25969ca9 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/action/AlertingActions.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/AlertingActions.kt @@ -8,8 +8,11 @@ import org.opensearch.action.ActionType object AlertingActions { const val INDEX_MONITOR_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/write" + const val INDEX_WORKFLOW_ACTION_NAME = "cluster:admin/opensearch/alerting/workflow/write" const val GET_ALERTS_ACTION_NAME = "cluster:admin/opendistro/alerting/alerts/get" + const val GET_WORKFLOW_ACTION_NAME = "cluster:admin/opensearch/alerting/workflow/get" const val DELETE_MONITOR_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/delete" + const val DELETE_WORKFLOW_ACTION_NAME = "cluster:admin/opensearch/alerting/workflow/delete" const val GET_FINDINGS_ACTION_NAME = "cluster:admin/opensearch/alerting/findings/get" const val ACKNOWLEDGE_ALERTS_ACTION_NAME = "cluster:admin/opendistro/alerting/alerts/ack" @@ -18,14 +21,24 @@ object AlertingActions { ActionType(INDEX_MONITOR_ACTION_NAME, ::IndexMonitorResponse) @JvmField + val INDEX_WORKFLOW_ACTION_TYPE = + ActionType(INDEX_WORKFLOW_ACTION_NAME, ::IndexWorkflowResponse) + @JvmField val GET_ALERTS_ACTION_TYPE = ActionType(GET_ALERTS_ACTION_NAME, ::GetAlertsResponse) + @JvmField + val GET_WORKFLOW_ACTION_TYPE = + ActionType(GET_WORKFLOW_ACTION_NAME, ::GetWorkflowResponse) + @JvmField val DELETE_MONITOR_ACTION_TYPE = ActionType(DELETE_MONITOR_ACTION_NAME, ::DeleteMonitorResponse) @JvmField + val DELETE_WORKFLOW_ACTION_TYPE = + ActionType(DELETE_WORKFLOW_ACTION_NAME, ::DeleteWorkflowResponse) + @JvmField val GET_FINDINGS_ACTION_TYPE = ActionType(GET_FINDINGS_ACTION_NAME, ::GetFindingsResponse) diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowRequest.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowRequest.kt new file mode 100644 index 00000000..4990f497 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowRequest.kt @@ -0,0 +1,38 @@ +package org.opensearch.commons.alerting.action + +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import java.io.IOException + +class DeleteWorkflowRequest : ActionRequest { + + val workflowId: String + /** + * Flag that indicates whether the delegate monitors should be deleted or not. + * If the flag is set to true, Delegate monitors will be deleted only in the case when they are part of the specified workflow and no other. + */ + val deleteDelegateMonitors: Boolean? + + constructor(workflowId: String, deleteDelegateMonitors: Boolean?) : super() { + this.workflowId = workflowId + this.deleteDelegateMonitors = deleteDelegateMonitors + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + workflowId = sin.readString(), + deleteDelegateMonitors = sin.readOptionalBoolean() + ) + + override fun validate(): ActionRequestValidationException? { + return null + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(workflowId) + out.writeOptionalBoolean(deleteDelegateMonitors) + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowResponse.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowResponse.kt new file mode 100644 index 00000000..3aab42ef --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowResponse.kt @@ -0,0 +1,38 @@ +package org.opensearch.commons.alerting.action + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.commons.alerting.util.IndexUtils +import org.opensearch.commons.notifications.action.BaseResponse + +class DeleteWorkflowResponse : BaseResponse { + var id: String + var version: Long + + constructor( + id: String, + version: Long + ) : super() { + this.id = id + this.version = version + } + + constructor(sin: StreamInput) : this( + sin.readString(), // id + sin.readLong() // version + ) + + override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeLong(version) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .field(IndexUtils._ID, id) + .field(IndexUtils._VERSION, version) + .endObject() + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/GetWorkflowRequest.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/GetWorkflowRequest.kt new file mode 100644 index 00000000..1b7948cd --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/GetWorkflowRequest.kt @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.action + +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.rest.RestRequest +import java.io.IOException + +class GetWorkflowRequest : ActionRequest { + val workflowId: String + val method: RestRequest.Method + + constructor( + workflowId: String, + method: RestRequest.Method + ) : super() { + this.workflowId = workflowId + this.method = method + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString(), // workflowId + sin.readEnum(RestRequest.Method::class.java) // method + ) + + override fun validate(): ActionRequestValidationException? { + return null + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(workflowId) + out.writeEnum(method) + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/GetWorkflowResponse.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/GetWorkflowResponse.kt new file mode 100644 index 00000000..21d0f9c4 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/GetWorkflowResponse.kt @@ -0,0 +1,88 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.action + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.commons.alerting.util.IndexUtils.Companion._ID +import org.opensearch.commons.alerting.util.IndexUtils.Companion._PRIMARY_TERM +import org.opensearch.commons.alerting.util.IndexUtils.Companion._SEQ_NO +import org.opensearch.commons.alerting.util.IndexUtils.Companion._VERSION +import org.opensearch.commons.notifications.action.BaseResponse +import org.opensearch.rest.RestStatus +import java.io.IOException + +class GetWorkflowResponse : BaseResponse { + var id: String + var version: Long + var seqNo: Long + var primaryTerm: Long + private var status: RestStatus + var workflow: Workflow? + + constructor( + id: String, + version: Long, + seqNo: Long, + primaryTerm: Long, + status: RestStatus, + workflow: Workflow? + ) : super() { + this.id = id + this.version = version + this.seqNo = seqNo + this.primaryTerm = primaryTerm + this.status = status + this.workflow = workflow + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString(), // id + sin.readLong(), // version + sin.readLong(), // seqNo + sin.readLong(), // primaryTerm + sin.readEnum(RestStatus::class.java), // RestStatus + if (sin.readBoolean()) { + Workflow.readFrom(sin) // monitor + } else null + ) + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeLong(version) + out.writeLong(seqNo) + out.writeLong(primaryTerm) + out.writeEnum(status) + if (workflow != null) { + out.writeBoolean(true) + workflow?.writeTo(out) + } else { + out.writeBoolean(false) + } + } + + @Throws(IOException::class) + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .field(_ID, id) + .field(_VERSION, version) + .field(_SEQ_NO, seqNo) + .field(_PRIMARY_TERM, primaryTerm) + if (workflow != null) + builder.field("workflow", workflow) + + return builder.endObject() + } + + override fun getStatus(): RestStatus { + return this.status + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequest.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequest.kt new file mode 100644 index 00000000..88fbe3ed --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequest.kt @@ -0,0 +1,64 @@ +package org.opensearch.commons.alerting.action + +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.action.support.WriteRequest +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.rest.RestRequest +import java.io.IOException + +class IndexWorkflowRequest : ActionRequest { + val workflowId: String + val seqNo: Long + val primaryTerm: Long + val refreshPolicy: WriteRequest.RefreshPolicy + val method: RestRequest.Method + var workflow: Workflow + val rbacRoles: List? + + constructor( + workflowId: String, + seqNo: Long, + primaryTerm: Long, + refreshPolicy: WriteRequest.RefreshPolicy, + method: RestRequest.Method, + workflow: Workflow, + rbacRoles: List? = null + ) : super() { + this.workflowId = workflowId + this.seqNo = seqNo + this.primaryTerm = primaryTerm + this.refreshPolicy = refreshPolicy + this.method = method + this.workflow = workflow + this.rbacRoles = rbacRoles + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + workflowId = sin.readString(), + seqNo = sin.readLong(), + primaryTerm = sin.readLong(), + refreshPolicy = WriteRequest.RefreshPolicy.readFrom(sin), + method = sin.readEnum(RestRequest.Method::class.java), + workflow = Workflow.readFrom(sin) as Workflow, + rbacRoles = sin.readOptionalStringList() + ) + + override fun validate(): ActionRequestValidationException? { + return null + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(workflowId) + out.writeLong(seqNo) + out.writeLong(primaryTerm) + refreshPolicy.writeTo(out) + out.writeEnum(method) + workflow.writeTo(out) + out.writeOptionalStringCollection(rbacRoles) + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowResponse.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowResponse.kt new file mode 100644 index 00000000..15c9f904 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowResponse.kt @@ -0,0 +1,61 @@ +package org.opensearch.commons.alerting.action + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.commons.alerting.util.IndexUtils +import org.opensearch.commons.notifications.action.BaseResponse +import java.io.IOException + +class IndexWorkflowResponse : BaseResponse { + var id: String + var version: Long + var seqNo: Long + var primaryTerm: Long + var workflow: Workflow + + constructor( + id: String, + version: Long, + seqNo: Long, + primaryTerm: Long, + workflow: Workflow + ) : super() { + this.id = id + this.version = version + this.seqNo = seqNo + this.primaryTerm = primaryTerm + this.workflow = workflow + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString(), // id + sin.readLong(), // version + sin.readLong(), // seqNo + sin.readLong(), // primaryTerm + Workflow.readFrom(sin) as Workflow // workflow + ) + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeLong(version) + out.writeLong(seqNo) + out.writeLong(primaryTerm) + workflow.writeTo(out) + } + + @Throws(IOException::class) + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .field(IndexUtils._ID, id) + .field(IndexUtils._VERSION, version) + .field(IndexUtils._SEQ_NO, seqNo) + .field(IndexUtils._PRIMARY_TERM, primaryTerm) + .field("workflow", workflow) + .endObject() + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedMonitorFindings.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedMonitorFindings.kt new file mode 100644 index 00000000..689096dd --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedMonitorFindings.kt @@ -0,0 +1,77 @@ +package org.opensearch.commons.alerting.model + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.commons.notifications.model.BaseModel +import org.opensearch.commons.utils.validateId +import java.io.IOException + +/** + * Context passed in delegate monitor to filter data queried by a monitor based on the findings of the given monitor id. + */ +// TODO - Remove the class and move the monitorId to Delegate (as a chainedMonitorId property) if this class won't be updated by adding new properties +data class ChainedMonitorFindings( + val monitorId: String +) : BaseModel { + + init { + validateId(monitorId) + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString(), // monitorId + ) + + fun asTemplateArg(): Map { + return mapOf( + MONITOR_ID_FIELD to monitorId, + ) + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(monitorId) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .field(MONITOR_ID_FIELD, monitorId) + .endObject() + return builder + } + + companion object { + const val MONITOR_ID_FIELD = "monitor_id" + + @JvmStatic + @Throws(IOException::class) + fun parse(xcp: XContentParser): ChainedMonitorFindings { + lateinit var monitorId: String + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + MONITOR_ID_FIELD -> { + monitorId = xcp.text() + validateId(monitorId) + } + } + } + return ChainedMonitorFindings(monitorId) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): ChainedMonitorFindings { + return ChainedMonitorFindings(sin) + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/CompositeInput.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/CompositeInput.kt new file mode 100644 index 00000000..3535c90c --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/CompositeInput.kt @@ -0,0 +1,84 @@ +package org.opensearch.commons.alerting.model + +import org.opensearch.common.CheckedFunction +import org.opensearch.common.ParseField +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.NamedXContentRegistry +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils +import java.io.IOException + +data class CompositeInput( + val sequence: Sequence +) : WorkflowInput { + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + Sequence(sin) + ) + + fun asTemplateArg(): Map { + return mapOf( + SEQUENCE_FIELD to sequence + ) + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + sequence.writeTo(out) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .startObject(COMPOSITE_INPUT_FIELD) + .field(SEQUENCE_FIELD, sequence) + .endObject() + .endObject() + return builder + } + + override fun name(): String { + return COMPOSITE_INPUT_FIELD + } + + fun getMonitorIds(): List { + return sequence.delegates.map { delegate -> delegate.monitorId } + } + + companion object { + const val COMPOSITE_INPUT_FIELD = "composite_input" + const val SEQUENCE_FIELD = "sequence" + + val XCONTENT_REGISTRY = NamedXContentRegistry.Entry( + WorkflowInput::class.java, + ParseField(COMPOSITE_INPUT_FIELD), CheckedFunction { CompositeInput.parse(it) } + ) + + @JvmStatic + @Throws(IOException::class) + fun parse(xcp: XContentParser): CompositeInput { + var sequence = Sequence(emptyList()) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + SEQUENCE_FIELD -> { + sequence = Sequence.parse(xcp) + } + } + } + + return CompositeInput(sequence) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): CompositeInput { + return CompositeInput(sin) + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Delegate.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Delegate.kt new file mode 100644 index 00000000..a446409e --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Delegate.kt @@ -0,0 +1,118 @@ +package org.opensearch.commons.alerting.model + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.commons.notifications.model.BaseModel +import org.opensearch.commons.utils.validateId +import java.io.IOException + +/** + * Each underlying monitors defined in the composite monitor sequence input. + * They are executed sequentially in the order mentioned. + * Optionally accepts chained findings context. + * */ +data class Delegate( + /** + * Defines the order of the monitor in delegate list + */ + val order: Int, + /** + * Id of the monitor + */ + val monitorId: String, + /** + * Keeps the track of the previously executed monitor in a chain list. + * Used for pre-filtering by getting the findings doc ids for the given monitor + */ + val chainedMonitorFindings: ChainedMonitorFindings? = null +) : BaseModel { + + init { + validateId(monitorId) + validateOrder(order) + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + order = sin.readInt(), + monitorId = sin.readString(), + chainedMonitorFindings = if (sin.readBoolean()) { + ChainedMonitorFindings(sin) + } else null, + ) + + fun asTemplateArg(): Map { + return mapOf( + ORDER_FIELD to order, + MONITOR_ID_FIELD to monitorId, + ) + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeInt(order) + out.writeString(monitorId) + out.writeBoolean(chainedMonitorFindings != null) + chainedMonitorFindings?.writeTo(out) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .field(ORDER_FIELD, order) + .field(MONITOR_ID_FIELD, monitorId) + if (chainedMonitorFindings != null) { + builder.field(CHAINED_FINDINGS_FIELD, chainedMonitorFindings) + } + builder.endObject() + return builder + } + + companion object { + const val ORDER_FIELD = "order" + const val MONITOR_ID_FIELD = "monitor_id" + const val CHAINED_FINDINGS_FIELD = "chained_findings" + + @JvmStatic + @Throws(IOException::class) + fun parse(xcp: XContentParser): Delegate { + lateinit var monitorId: String + var order = 0 + var chainedMonitorFindings: ChainedMonitorFindings? = null + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + ORDER_FIELD -> { + order = xcp.intValue() + validateOrder(order) + } + MONITOR_ID_FIELD -> { + monitorId = xcp.text() + validateId(monitorId) + } + CHAINED_FINDINGS_FIELD -> { + chainedMonitorFindings = ChainedMonitorFindings.parse(xcp) + } + } + } + return Delegate(order, monitorId, chainedMonitorFindings) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): Delegate { + return Delegate(sin) + } + + fun validateOrder(order: Int) { + require(order > 0) { "Invalid delgate order" } + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Finding.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Finding.kt index e8102779..98e43517 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Finding.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Finding.kt @@ -21,9 +21,33 @@ class Finding( val monitorName: String, val index: String, val docLevelQueries: List, - val timestamp: Instant + val timestamp: Instant, + /** + * Keeps the track of the workflow-monitor exact execution. + * Used for filtering the data when chaining monitors in a workflow. + */ + val executionId: String? = null, ) : Writeable, ToXContent { + constructor( + id: String = NO_ID, + relatedDocIds: List, + monitorId: String, + monitorName: String, + index: String, + docLevelQueries: List, + timestamp: Instant + ) : this ( + id = id, + relatedDocIds = relatedDocIds, + monitorId = monitorId, + monitorName = monitorName, + index = index, + docLevelQueries = docLevelQueries, + timestamp = timestamp, + executionId = null + ) + @Throws(IOException::class) constructor(sin: StreamInput) : this( id = sin.readString(), @@ -32,7 +56,8 @@ class Finding( monitorName = sin.readString(), index = sin.readString(), docLevelQueries = sin.readList((DocLevelQuery)::readFrom), - timestamp = sin.readInstant() + timestamp = sin.readInstant(), + executionId = sin.readOptionalString() ) fun asTemplateArg(): Map { @@ -43,7 +68,8 @@ class Finding( MONITOR_NAME_FIELD to monitorName, INDEX_FIELD to index, QUERIES_FIELD to docLevelQueries, - TIMESTAMP_FIELD to timestamp.toEpochMilli() + TIMESTAMP_FIELD to timestamp.toEpochMilli(), + EXECUTION_ID_FIELD to executionId ) } @@ -56,6 +82,7 @@ class Finding( .field(INDEX_FIELD, index) .field(QUERIES_FIELD, docLevelQueries.toTypedArray()) .field(TIMESTAMP_FIELD, timestamp.toEpochMilli()) + .field(EXECUTION_ID_FIELD, executionId) builder.endObject() return builder } @@ -69,6 +96,7 @@ class Finding( out.writeString(index) out.writeCollection(docLevelQueries) out.writeInstant(timestamp) + out.writeOptionalString(executionId) } companion object { @@ -79,6 +107,7 @@ class Finding( const val INDEX_FIELD = "index" const val QUERIES_FIELD = "queries" const val TIMESTAMP_FIELD = "timestamp" + const val EXECUTION_ID_FIELD = "execution_id" const val NO_ID = "" @JvmStatic @@ -92,6 +121,7 @@ class Finding( lateinit var index: String val queries: MutableList = mutableListOf() lateinit var timestamp: Instant + var executionId: String? = null ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { @@ -118,6 +148,7 @@ class Finding( TIMESTAMP_FIELD -> { timestamp = requireNotNull(xcp.instant()) } + EXECUTION_ID_FIELD -> executionId = xcp.textOrNull() } } @@ -128,7 +159,8 @@ class Finding( monitorName = monitorName, index = index, docLevelQueries = queries, - timestamp = timestamp + timestamp = timestamp, + executionId = executionId ) } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Sequence.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Sequence.kt new file mode 100644 index 00000000..d8ce97a8 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Sequence.kt @@ -0,0 +1,75 @@ +package org.opensearch.commons.alerting.model + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.commons.notifications.model.BaseModel +import java.io.IOException + +/** Delegate monitors passed as input for composite monitors. */ +data class Sequence( + val delegates: List +) : BaseModel { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readList(::Delegate) + ) + + fun asTemplateArg(): Map { + return mapOf( + DELEGATES_FIELD to delegates, + ) + } + + companion object { + const val SEQUENCE_FIELD = "sequence" + const val DELEGATES_FIELD = "delegates" + + @JvmStatic + @Throws(IOException::class) + fun parse(xcp: XContentParser): Sequence { + val delegates: MutableList = mutableListOf() + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + DELEGATES_FIELD -> { + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_ARRAY, + xcp.currentToken(), + xcp + ) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + delegates.add(Delegate.parse(xcp)) + } + } + } + } + return Sequence(delegates) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): DocLevelMonitorInput { + return DocLevelMonitorInput(sin) + } + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeCollection(delegates) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .field(DELEGATES_FIELD, delegates.toTypedArray()) + .endObject() + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Workflow.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Workflow.kt new file mode 100644 index 00000000..a4235fb4 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Workflow.kt @@ -0,0 +1,265 @@ +package org.opensearch.commons.alerting.model + +import org.opensearch.common.CheckedFunction +import org.opensearch.common.ParseField +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.NamedXContentRegistry +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.commons.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION +import org.opensearch.commons.alerting.util.IndexUtils.Companion.WORKFLOW_MAX_INPUTS +import org.opensearch.commons.alerting.util.IndexUtils.Companion._ID +import org.opensearch.commons.alerting.util.IndexUtils.Companion._VERSION +import org.opensearch.commons.alerting.util.instant +import org.opensearch.commons.alerting.util.optionalTimeField +import org.opensearch.commons.alerting.util.optionalUserField +import org.opensearch.commons.authuser.User +import java.io.IOException +import java.time.Instant +import java.util.Locale + +data class Workflow( + override val id: String = NO_ID, + override val version: Long = NO_VERSION, + override val name: String, + override val enabled: Boolean, + override val schedule: Schedule, + override val lastUpdateTime: Instant, + override val enabledTime: Instant?, + // TODO: Check how this behaves during rolling upgrade/multi-version cluster + // Can read/write and parsing break if it's done from an old -> new version of the plugin? + val workflowType: WorkflowType, + val user: User?, + val schemaVersion: Int = NO_SCHEMA_VERSION, + val inputs: List, + val owner: String? = DEFAULT_OWNER +) : ScheduledJob { + override val type = WORKFLOW_TYPE + + init { + if (enabled) { + requireNotNull(enabledTime) + } else { + require(enabledTime == null) + } + require(inputs.size <= WORKFLOW_MAX_INPUTS) { "Workflows can only have $WORKFLOW_MAX_INPUTS search input." } + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + id = sin.readString(), + version = sin.readLong(), + name = sin.readString(), + enabled = sin.readBoolean(), + schedule = Schedule.readFrom(sin), + lastUpdateTime = sin.readInstant(), + enabledTime = sin.readOptionalInstant(), + workflowType = sin.readEnum(WorkflowType::class.java), + user = if (sin.readBoolean()) { + User(sin) + } else null, + schemaVersion = sin.readInt(), + inputs = sin.readList((WorkflowInput)::readFrom), + owner = sin.readOptionalString() + ) + + // This enum classifies different workflows + // This is different from 'type' which denotes the Scheduled Job type + enum class WorkflowType(val value: String) { + COMPOSITE("composite"); + + override fun toString(): String { + return value + } + } + + /** Returns a representation of the workflow suitable for passing into painless and mustache scripts. */ + fun asTemplateArg(): Map { + return mapOf(_ID to id, _VERSION to version, NAME_FIELD to name, ENABLED_FIELD to enabled) + } + + fun toXContentWithUser(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return createXContentBuilder(builder, params, false) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return createXContentBuilder(builder, params, true) + } + + private fun createXContentBuilder( + builder: XContentBuilder, + params: ToXContent.Params, + secure: Boolean + ): XContentBuilder { + builder.startObject() + if (params.paramAsBoolean("with_type", false)) builder.startObject(type) + builder.field(TYPE_FIELD, type) + .field(SCHEMA_VERSION_FIELD, schemaVersion) + .field(NAME_FIELD, name) + .field(WORKFLOW_TYPE_FIELD, workflowType) + + if (!secure) { + builder.optionalUserField(USER_FIELD, user) + } + + builder.field(ENABLED_FIELD, enabled) + .optionalTimeField(ENABLED_TIME_FIELD, enabledTime) + .field(SCHEDULE_FIELD, schedule) + .field(INPUTS_FIELD, inputs.toTypedArray()) + .optionalTimeField(LAST_UPDATE_TIME_FIELD, lastUpdateTime) + builder.field(OWNER_FIELD, owner) + if (params.paramAsBoolean("with_type", false)) builder.endObject() + return builder.endObject() + } + + override fun fromDocument(id: String, version: Long): Workflow = copy(id = id, version = version) + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeLong(version) + out.writeString(name) + out.writeBoolean(enabled) + if (schedule is CronSchedule) { + out.writeEnum(Schedule.TYPE.CRON) + } else { + out.writeEnum(Schedule.TYPE.INTERVAL) + } + schedule.writeTo(out) + out.writeInstant(lastUpdateTime) + out.writeOptionalInstant(enabledTime) + out.writeEnum(workflowType) + out.writeBoolean(user != null) + user?.writeTo(out) + out.writeInt(schemaVersion) + // Outputting type with each Input so that the generic Input.readFrom() can read it + out.writeVInt(inputs.size) + inputs.forEach { + if (it is CompositeInput) out.writeEnum(WorkflowInput.Type.COMPOSITE_INPUT) + it.writeTo(out) + } + // Outputting type with each Trigger so that the generic Trigger.readFrom() can read it + out.writeOptionalString(owner) + } + + companion object { + const val WORKFLOW_DELEGATE_PATH = "workflow.inputs.composite_input.sequence.delegates" + const val WORKFLOW_MONITOR_PATH = "workflow.inputs.composite_input.sequence.delegates.monitor_id" + const val WORKFLOW_TYPE = "workflow" + const val TYPE_FIELD = "type" + const val WORKFLOW_TYPE_FIELD = "workflow_type" + const val SCHEMA_VERSION_FIELD = "schema_version" + const val NAME_FIELD = "name" + const val USER_FIELD = "user" + const val ENABLED_FIELD = "enabled" + const val SCHEDULE_FIELD = "schedule" + const val NO_ID = "" + const val NO_VERSION = 1L + const val INPUTS_FIELD = "inputs" + const val LAST_UPDATE_TIME_FIELD = "last_update_time" + const val ENABLED_TIME_FIELD = "enabled_time" + const val OWNER_FIELD = "owner" + + // This is defined here instead of in ScheduledJob to avoid having the ScheduledJob class know about all + // the different subclasses and creating circular dependencies + val XCONTENT_REGISTRY = NamedXContentRegistry.Entry( + ScheduledJob::class.java, + ParseField(WORKFLOW_TYPE), + CheckedFunction { parse(it) } + ) + + @JvmStatic + @JvmOverloads + @Throws(IOException::class) + fun parse(xcp: XContentParser, id: String = NO_ID, version: Long = NO_VERSION): Workflow { + var name: String? = null + var workflowType: String = WorkflowType.COMPOSITE.toString() + var user: User? = null + var schedule: Schedule? = null + var lastUpdateTime: Instant? = null + var enabledTime: Instant? = null + var enabled = true + var schemaVersion = NO_SCHEMA_VERSION + val inputs: MutableList = mutableListOf() + var owner = DEFAULT_OWNER + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + SCHEMA_VERSION_FIELD -> schemaVersion = xcp.intValue() + NAME_FIELD -> name = xcp.text() + WORKFLOW_TYPE_FIELD -> { + workflowType = xcp.text() + val allowedTypes = WorkflowType.values().map { it.value } + if (!allowedTypes.contains(workflowType)) { + throw IllegalStateException("Workflow type should be one of $allowedTypes") + } + } + USER_FIELD -> { + user = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else User.parse(xcp) + } + ENABLED_FIELD -> enabled = xcp.booleanValue() + SCHEDULE_FIELD -> schedule = Schedule.parse(xcp) + INPUTS_FIELD -> { + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_ARRAY, + xcp.currentToken(), + xcp + ) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + val input = WorkflowInput.parse(xcp) + inputs.add(input) + } + } + ENABLED_TIME_FIELD -> enabledTime = xcp.instant() + LAST_UPDATE_TIME_FIELD -> lastUpdateTime = xcp.instant() + OWNER_FIELD -> { + owner = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) owner else xcp.text() + } + else -> { + xcp.skipChildren() + } + } + } + + if (enabled && enabledTime == null) { + enabledTime = Instant.now() + } else if (!enabled) { + enabledTime = null + } + return Workflow( + id, + version, + requireNotNull(name) { "Workflow name is null" }, + enabled, + requireNotNull(schedule) { "Workflow schedule is null" }, + lastUpdateTime ?: Instant.now(), + enabledTime, + WorkflowType.valueOf(workflowType.uppercase(Locale.ROOT)), + user, + schemaVersion, + inputs.toList(), + owner + ) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): Workflow? { + return Workflow(sin) + } + + @Suppress("UNCHECKED_CAST") + fun suppressWarning(map: MutableMap?): MutableMap { + return map as MutableMap + } + + private const val DEFAULT_OWNER = "alerting" + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowInput.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowInput.kt new file mode 100644 index 00000000..682271eb --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowInput.kt @@ -0,0 +1,48 @@ +package org.opensearch.commons.alerting.model + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.commons.notifications.model.BaseModel +import java.io.IOException + +interface WorkflowInput : BaseModel { + + enum class Type(val value: String) { + COMPOSITE_INPUT(CompositeInput.COMPOSITE_INPUT_FIELD); + + override fun toString(): String { + return value + } + } + + companion object { + + @Throws(IOException::class) + fun parse(xcp: XContentParser): WorkflowInput { + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, xcp.nextToken(), xcp) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + val input = if (xcp.currentName() == Type.COMPOSITE_INPUT.value) { + CompositeInput.parse(xcp) + } else { + throw IllegalStateException("Unexpected input type when reading Input") + } + XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, xcp.nextToken(), xcp) + return input + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): WorkflowInput { + return when (val type = sin.readEnum(Type::class.java)) { + Type.COMPOSITE_INPUT -> CompositeInput(sin) + // This shouldn't be reachable but ensuring exhaustiveness as Kotlin warns + // enum can be null in Java + else -> throw IllegalStateException("Unexpected input [$type] when reading Trigger") + } + } + } + + fun name(): String +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt b/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt index 803ab675..51f9be52 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt @@ -14,6 +14,7 @@ class IndexUtils { const val NO_SCHEMA_VERSION = 0 const val MONITOR_MAX_INPUTS = 1 + const val WORKFLOW_MAX_INPUTS = 1 const val MONITOR_MAX_TRIGGERS = 10 diff --git a/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt index 62e425d9..acce04c1 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt @@ -16,15 +16,20 @@ import org.opensearch.common.io.stream.NamedWriteableRegistry import org.opensearch.common.settings.Settings import org.opensearch.commons.alerting.action.DeleteMonitorRequest import org.opensearch.commons.alerting.action.DeleteMonitorResponse +import org.opensearch.commons.alerting.action.DeleteWorkflowRequest +import org.opensearch.commons.alerting.action.DeleteWorkflowResponse import org.opensearch.commons.alerting.action.GetAlertsRequest import org.opensearch.commons.alerting.action.GetAlertsResponse import org.opensearch.commons.alerting.action.GetFindingsRequest import org.opensearch.commons.alerting.action.GetFindingsResponse import org.opensearch.commons.alerting.action.IndexMonitorRequest import org.opensearch.commons.alerting.action.IndexMonitorResponse +import org.opensearch.commons.alerting.action.IndexWorkflowRequest +import org.opensearch.commons.alerting.action.IndexWorkflowResponse import org.opensearch.commons.alerting.model.FindingDocument import org.opensearch.commons.alerting.model.FindingWithDocs import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.model.Workflow import org.opensearch.index.seqno.SequenceNumbers import org.opensearch.rest.RestStatus import org.opensearch.search.SearchModule @@ -55,6 +60,31 @@ internal class AlertingPluginInterfaceTests { Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response)) } + @Test + fun indexWorkflow() { + val workflow = randomCompositeWorkflow() + + val request = mock(IndexWorkflowRequest::class.java) + val response = IndexWorkflowResponse( + Workflow.NO_ID, + Workflow.NO_VERSION, + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + workflow + ) + val listener: ActionListener = + mock(ActionListener::class.java) as ActionListener + val namedWriteableRegistry = NamedWriteableRegistry(SearchModule(Settings.EMPTY, emptyList()).namedWriteables) + + Mockito.doAnswer { + (it.getArgument(2) as ActionListener) + .onResponse(response) + }.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any()) + + AlertingPluginInterface.indexWorkflow(client, request, listener) + Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response)) + } + @Test fun indexBucketMonitor() { val monitor = randomBucketLevelMonitor() @@ -89,6 +119,21 @@ internal class AlertingPluginInterfaceTests { AlertingPluginInterface.deleteMonitor(client, request, listener) } + @Test + fun deleteWorkflow() { + val request = mock(DeleteWorkflowRequest::class.java) + val response = DeleteWorkflowResponse(Workflow.NO_ID, Workflow.NO_VERSION) + val listener: ActionListener = + mock(ActionListener::class.java) as ActionListener + + Mockito.doAnswer { + (it.getArgument(2) as ActionListener) + .onResponse(response) + }.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any()) + + AlertingPluginInterface.deleteWorkflow(client, request, listener) + } + @Test fun getAlerts() { val monitor = randomQueryLevelMonitor() diff --git a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt index 62d13555..32efd461 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt @@ -21,7 +21,10 @@ import org.opensearch.commons.alerting.model.ActionExecutionResult import org.opensearch.commons.alerting.model.AggregationResultBucket import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.BucketLevelTrigger +import org.opensearch.commons.alerting.model.ChainedMonitorFindings import org.opensearch.commons.alerting.model.ClusterMetricsInput +import org.opensearch.commons.alerting.model.CompositeInput +import org.opensearch.commons.alerting.model.Delegate import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.DocLevelQuery import org.opensearch.commons.alerting.model.DocumentLevelTrigger @@ -32,7 +35,10 @@ import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.QueryLevelTrigger import org.opensearch.commons.alerting.model.Schedule import org.opensearch.commons.alerting.model.SearchInput +import org.opensearch.commons.alerting.model.Sequence import org.opensearch.commons.alerting.model.Trigger +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.commons.alerting.model.WorkflowInput import org.opensearch.commons.alerting.model.action.Action import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy import org.opensearch.commons.alerting.model.action.ActionExecutionScope @@ -155,6 +161,50 @@ fun randomDocumentLevelMonitor( ) } +fun randomCompositeWorkflow( + name: String = RandomStrings.randomAsciiLettersOfLength(Random(), 10), + user: User? = randomUser(), + inputs: List? = null, + schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES), + enabled: Boolean = Random().nextBoolean(), + enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, + lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), +): Workflow { + var input = inputs + if (input == null) { + input = listOf( + CompositeInput( + Sequence( + listOf(Delegate(1, "delegate1")) + ) + ) + ) + } + return Workflow( + name = name, workflowType = Workflow.WorkflowType.COMPOSITE, enabled = enabled, inputs = input, + schedule = schedule, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, + ) +} + +fun Workflow.toJsonStringWithUser(): String { + val builder = XContentFactory.jsonBuilder() + return this.toXContentWithUser(builder, ToXContent.EMPTY_PARAMS).string() +} + +fun randomSequence( + delegates: List = listOf(randomDelegate()) +): Sequence { + return Sequence(delegates) +} + +fun randomDelegate( + order: Int = 1, + monitorId: String = RandomStrings.randomAsciiLettersOfLength(Random(), 10), + chainedMonitorFindings: ChainedMonitorFindings? = null +): Delegate { + return Delegate(order, monitorId, chainedMonitorFindings) +} + fun randomQueryLevelTrigger( id: String = UUIDs.base64UUID(), name: String = RandomStrings.randomAsciiLettersOfLength(Random(), 10), diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowRequestTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowRequestTests.kt new file mode 100644 index 00000000..20bcb27e --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowRequestTests.kt @@ -0,0 +1,23 @@ +package org.opensearch.commons.alerting.action + +import org.junit.Assert +import org.junit.Test +import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.common.io.stream.StreamInput + +class DeleteWorkflowRequestTests { + + @Test + fun `test delete workflow request`() { + + val req = DeleteWorkflowRequest("1234", true) + Assert.assertNotNull(req) + Assert.assertEquals("1234", req.workflowId) + + val out = BytesStreamOutput() + req.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newReq = DeleteWorkflowRequest(sin) + Assert.assertEquals("1234", newReq.workflowId) + } +} diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequestTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequestTests.kt new file mode 100644 index 00000000..a3a94166 --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequestTests.kt @@ -0,0 +1,96 @@ +package org.opensearch.commons.alerting.action + +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test +import org.opensearch.action.support.WriteRequest +import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput +import org.opensearch.common.io.stream.NamedWriteableRegistry +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.settings.Settings +import org.opensearch.commons.alerting.randomCompositeWorkflow +import org.opensearch.commons.utils.recreateObject +import org.opensearch.rest.RestRequest +import org.opensearch.search.SearchModule + +class IndexWorkflowRequestTests { + + @Test + fun `test index workflow post request`() { + + val req = IndexWorkflowRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.POST, + randomCompositeWorkflow() + ) + Assertions.assertNotNull(req) + + val out = BytesStreamOutput() + req.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newReq = IndexWorkflowRequest(sin) + Assertions.assertEquals("1234", newReq.workflowId) + Assertions.assertEquals(1L, newReq.seqNo) + Assertions.assertEquals(2L, newReq.primaryTerm) + Assertions.assertEquals(RestRequest.Method.POST, newReq.method) + Assertions.assertNotNull(newReq.workflow) + } + + @Test + fun `test index composite workflow post request`() { + val req = IndexWorkflowRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.POST, + randomCompositeWorkflow() + ) + Assertions.assertNotNull(req) + + val out = BytesStreamOutput() + req.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val namedWriteableRegistry = NamedWriteableRegistry(SearchModule(Settings.EMPTY, emptyList()).namedWriteables) + val newReq = IndexWorkflowRequest(NamedWriteableAwareStreamInput(sin, namedWriteableRegistry)) + Assertions.assertEquals("1234", newReq.workflowId) + Assertions.assertEquals(1L, newReq.seqNo) + Assertions.assertEquals(2L, newReq.primaryTerm) + Assertions.assertEquals(RestRequest.Method.POST, newReq.method) + Assertions.assertNotNull(newReq.workflow) + } + + @Test + fun `Index composite workflow serialize and deserialize transport object should be equal`() { + val compositeWorkflowRequest = IndexWorkflowRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.POST, + randomCompositeWorkflow() + ) + + val recreatedObject = recreateObject( + compositeWorkflowRequest, + NamedWriteableRegistry(SearchModule(Settings.EMPTY, emptyList()).namedWriteables) + ) { IndexWorkflowRequest(it) } + Assertions.assertEquals(compositeWorkflowRequest.workflowId, recreatedObject.workflowId) + Assertions.assertEquals(compositeWorkflowRequest.seqNo, recreatedObject.seqNo) + Assertions.assertEquals(compositeWorkflowRequest.primaryTerm, recreatedObject.primaryTerm) + Assertions.assertEquals(compositeWorkflowRequest.method, recreatedObject.method) + Assertions.assertNotNull(recreatedObject.workflow) + Assertions.assertEquals(compositeWorkflowRequest.workflow, recreatedObject.workflow) + } + + @Test + fun `test index workflow put request`() { + + val req = IndexWorkflowRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT, + randomCompositeWorkflow() + ) + Assertions.assertNotNull(req) + + val out = BytesStreamOutput() + req.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newReq = IndexWorkflowRequest(sin) + Assertions.assertEquals("1234", newReq.workflowId) + Assertions.assertEquals(1L, newReq.seqNo) + Assertions.assertEquals(2L, newReq.primaryTerm) + Assertions.assertEquals(RestRequest.Method.PUT, newReq.method) + Assertions.assertNotNull(newReq.workflow) + } +} diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowResponseTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowResponseTests.kt new file mode 100644 index 00000000..523f5650 --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowResponseTests.kt @@ -0,0 +1,45 @@ +package org.opensearch.commons.alerting.action + +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test +import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.commons.alerting.model.CronSchedule +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.commons.alerting.randomUser +import java.time.Instant +import java.time.ZoneId + +class IndexWorkflowResponseTests { + + @Test + fun `test index workflow response with workflow`() { + val cronExpression = "31 * * * *" // Run at minute 31. + val testInstance = Instant.ofEpochSecond(1538164858L) + + val cronSchedule = CronSchedule(cronExpression, ZoneId.of("Asia/Kolkata"), testInstance) + val workflow = Workflow( + id = "123", + version = 0L, + name = "test-workflow", + enabled = true, + schedule = cronSchedule, + lastUpdateTime = Instant.now(), + enabledTime = Instant.now(), + workflowType = Workflow.WorkflowType.COMPOSITE, + user = randomUser(), + schemaVersion = 0, + inputs = mutableListOf(), + ) + val req = IndexWorkflowResponse("1234", 1L, 2L, 0L, workflow) + Assertions.assertNotNull(req) + + val out = BytesStreamOutput() + req.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newReq = IndexWorkflowResponse(sin) + Assertions.assertEquals("1234", newReq.id) + Assertions.assertEquals(1L, newReq.version) + Assertions.assertNotNull(newReq.workflow) + } +} diff --git a/src/test/kotlin/org/opensearch/commons/alerting/model/CompositeInputTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/model/CompositeInputTests.kt new file mode 100644 index 00000000..9680bdbe --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/alerting/model/CompositeInputTests.kt @@ -0,0 +1,86 @@ +package org.opensearch.commons.alerting.model + +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test +import org.opensearch.commons.alerting.randomDelegate +import org.opensearch.commons.alerting.randomSequence + +class CompositeInputTests { + @Test + fun `test sequence asTemplateArgs`() { + val sequence = randomSequence() + // WHEN + val templateArgs = sequence.asTemplateArg() + + // THEN + val templateDelegates = templateArgs + Assertions.assertEquals( + templateDelegates[Sequence.DELEGATES_FIELD], + sequence.delegates, + "Template args 'id' field does not match:" + ) + } + + @Test + fun `test delegate asTemplateArgs`() { + val delegate = randomDelegate() + // WHEN + val templateArgs = delegate.asTemplateArg() + + // THEN + val templateDelegates = templateArgs + Assertions.assertEquals( + templateDelegates[Delegate.ORDER_FIELD], + delegate.order, + "Template args 'id' field does not match:" + ) + Assertions.assertEquals( + templateDelegates[Delegate.MONITOR_ID_FIELD], + delegate.monitorId, + "Template args 'id' field does not match:" + ) + } + + @Test + fun `test create Delegate with illegal order value`() { + try { + randomDelegate(-1) + Assertions.fail("Expecting an illegal argument exception") + } catch (e: IllegalArgumentException) { + Assertions.assertEquals( + "Invalid delgate order", + e.message + ) + } + } + + @Test + fun `test create Delegate with illegal monitorId value`() { + try { + randomDelegate(1, "") + Assertions.fail("Expecting an illegal argument exception") + } catch (e: IllegalArgumentException) { + e.message?.let { + Assertions.assertTrue( + it.contains("Invalid characters in id") + + ) + } + } + } + + @Test + fun `test create Chained Findings with illegal monitorId value`() { + try { + ChainedMonitorFindings("") + Assertions.fail("Expecting an illegal argument exception") + } catch (e: IllegalArgumentException) { + e.message?.let { + Assertions.assertTrue( + it.contains("Invalid characters in id") + + ) + } + } + } +} diff --git a/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt index fa93da5f..7ae7ab91 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt @@ -16,6 +16,7 @@ import org.opensearch.commons.alerting.randomActionWithPolicy import org.opensearch.commons.alerting.randomAlert import org.opensearch.commons.alerting.randomBucketLevelMonitor import org.opensearch.commons.alerting.randomBucketLevelTrigger +import org.opensearch.commons.alerting.randomCompositeWorkflow import org.opensearch.commons.alerting.randomQueryLevelMonitor import org.opensearch.commons.alerting.randomQueryLevelMonitorWithoutUser import org.opensearch.commons.alerting.randomQueryLevelTrigger @@ -159,6 +160,14 @@ class XContentTests { Assertions.assertEquals(monitor, parsedMonitor, "Round tripping BucketLevelMonitor doesn't work") } + @Test + fun `test composite workflow parsing`() { + val workflow = randomCompositeWorkflow() + val monitorString = workflow.toJsonStringWithUser() + val parsedMonitor = Workflow.parse(parser(monitorString)) + Assertions.assertEquals(workflow, parsedMonitor, "Round tripping BucketLevelMonitor doesn't work") + } + @Test fun `test query-level trigger parsing`() { val trigger = randomQueryLevelTrigger()