diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequest.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequest.kt index 88fbe3ed..15a895ca 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequest.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequest.kt @@ -2,12 +2,15 @@ package org.opensearch.commons.alerting.action import org.opensearch.action.ActionRequest import org.opensearch.action.ActionRequestValidationException +import org.opensearch.action.ValidateActions import org.opensearch.action.support.WriteRequest import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.commons.alerting.model.CompositeInput import org.opensearch.commons.alerting.model.Workflow import org.opensearch.rest.RestRequest import java.io.IOException +import java.util.stream.Collectors class IndexWorkflowRequest : ActionRequest { val workflowId: String @@ -18,6 +21,8 @@ class IndexWorkflowRequest : ActionRequest { var workflow: Workflow val rbacRoles: List? + private val MAX_DELEGATE_SIZE = 25 + constructor( workflowId: String, seqNo: Long, @@ -48,7 +53,75 @@ class IndexWorkflowRequest : ActionRequest { ) override fun validate(): ActionRequestValidationException? { - return null + var validationException: ActionRequestValidationException? = null + + if (workflow.inputs.isEmpty()) { + validationException = ValidateActions.addValidationError( + "Input list can not be empty.", validationException + ) + return validationException + } + if (workflow.inputs.size > 1) { + validationException = ValidateActions.addValidationError( + "Input list can contain only one element.", validationException + ) + return validationException + } + if (workflow.inputs[0] !is CompositeInput) { + validationException = ValidateActions.addValidationError( + "When creating a workflow input must be CompositeInput", validationException + ) + } + val compositeInput = workflow.inputs[0] as CompositeInput + val monitorIds = compositeInput.sequence.delegates.stream().map { it.monitorId }.collect(Collectors.toList()) + + if (monitorIds.isNullOrEmpty()) { + validationException = ValidateActions.addValidationError( + "Delegates list can not be empty.", validationException + ) + // Break the flow because next checks are dependant on non-null monitorIds + return validationException + } + + if (monitorIds.size > MAX_DELEGATE_SIZE) { + validationException = ValidateActions.addValidationError( + "Delegates list can not be larger then $MAX_DELEGATE_SIZE.", validationException + ) + } + + if (monitorIds.toSet().size != monitorIds.size) { + validationException = ValidateActions.addValidationError( + "Duplicate delegates not allowed", validationException + ) + } + val delegates = compositeInput.sequence.delegates + val orderSet = delegates.stream().filter { it.order > 0 }.map { it.order }.collect(Collectors.toSet()) + if (orderSet.size != delegates.size) { + validationException = ValidateActions.addValidationError( + "Sequence ordering of delegate monitor shouldn't contain duplicate order values", validationException + ) + } + + val monitorIdOrderMap: Map = delegates.associate { it.monitorId to it.order } + delegates.forEach { + if (it.chainedMonitorFindings != null) { + if (monitorIdOrderMap.containsKey(it.chainedMonitorFindings!!.monitorId) == false) { + validationException = ValidateActions.addValidationError( + "Chained Findings Monitor ${it.chainedMonitorFindings!!.monitorId} doesn't exist in sequence", + validationException + ) + // Break the flow because next check will generate the NPE + return validationException + } + if (it.order <= monitorIdOrderMap[it.chainedMonitorFindings!!.monitorId]!!) { + validationException = ValidateActions.addValidationError( + "Chained Findings Monitor ${it.chainedMonitorFindings!!.monitorId} should be executed before monitor ${it.monitorId}", + validationException + ) + } + } + } + return validationException } @Throws(IOException::class) diff --git a/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt index acce04c1..781097e4 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt @@ -62,7 +62,7 @@ internal class AlertingPluginInterfaceTests { @Test fun indexWorkflow() { - val workflow = randomCompositeWorkflow() + val workflow = randomWorkflow() val request = mock(IndexWorkflowRequest::class.java) val response = IndexWorkflowResponse( diff --git a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt index 925900ca..b7f463ed 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt @@ -162,16 +162,24 @@ fun randomDocumentLevelMonitor( ) } -fun randomCompositeWorkflow( +fun randomWorkflow( name: String = RandomStrings.randomAsciiLettersOfLength(Random(), 10), user: User? = randomUser(), - inputs: List? = null, + monitorIds: List? = null, schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES), enabled: Boolean = Random().nextBoolean(), enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), ): Workflow { - var input = inputs + val delegates = mutableListOf() + if (!monitorIds.isNullOrEmpty()) { + delegates.add(Delegate(1, monitorIds[0])) + for (i in 1 until monitorIds.size) { + // Order of monitors in workflow will be the same like forwarded meaning that the first monitorId will be used as second monitor chained finding + delegates.add(Delegate(i + 1, monitorIds [i], ChainedMonitorFindings(monitorIds[i - 1]))) + } + } + var input = listOf(CompositeInput(Sequence(delegates))) if (input == null) { input = listOf( CompositeInput( @@ -187,6 +195,21 @@ fun randomCompositeWorkflow( ) } +fun randomWorkflowWithDelegates( + name: String = RandomStrings.randomAsciiLettersOfLength(Random(), 10), + user: User? = randomUser(), + input: List, + schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES), + enabled: Boolean = Random().nextBoolean(), + enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, + lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), +): Workflow { + return Workflow( + name = name, workflowType = Workflow.WorkflowType.COMPOSITE, enabled = enabled, inputs = input, + schedule = schedule, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, + ) +} + fun Workflow.toJsonStringWithUser(): String { val builder = XContentFactory.jsonBuilder() return this.toXContentWithUser(builder, ToXContent.EMPTY_PARAMS).string() diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequestTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequestTests.kt index a3a94166..d25cd5b2 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequestTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequestTests.kt @@ -1,5 +1,6 @@ package org.opensearch.commons.alerting.action +import org.junit.Assert import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test import org.opensearch.action.support.WriteRequest @@ -8,10 +9,18 @@ import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput import org.opensearch.common.io.stream.NamedWriteableRegistry import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.settings.Settings -import org.opensearch.commons.alerting.randomCompositeWorkflow +import org.opensearch.commons.alerting.model.ChainedMonitorFindings +import org.opensearch.commons.alerting.model.CompositeInput +import org.opensearch.commons.alerting.model.Delegate +import org.opensearch.commons.alerting.model.Sequence +import org.opensearch.commons.alerting.randomWorkflow +import org.opensearch.commons.alerting.randomWorkflowWithDelegates import org.opensearch.commons.utils.recreateObject import org.opensearch.rest.RestRequest import org.opensearch.search.SearchModule +import java.lang.Exception +import java.lang.IllegalArgumentException +import java.util.UUID class IndexWorkflowRequestTests { @@ -20,7 +29,7 @@ class IndexWorkflowRequestTests { val req = IndexWorkflowRequest( "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.POST, - randomCompositeWorkflow() + randomWorkflow() ) Assertions.assertNotNull(req) @@ -39,7 +48,7 @@ class IndexWorkflowRequestTests { fun `test index composite workflow post request`() { val req = IndexWorkflowRequest( "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.POST, - randomCompositeWorkflow() + randomWorkflow() ) Assertions.assertNotNull(req) @@ -59,7 +68,7 @@ class IndexWorkflowRequestTests { fun `Index composite workflow serialize and deserialize transport object should be equal`() { val compositeWorkflowRequest = IndexWorkflowRequest( "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.POST, - randomCompositeWorkflow() + randomWorkflow() ) val recreatedObject = recreateObject( @@ -79,7 +88,7 @@ class IndexWorkflowRequestTests { val req = IndexWorkflowRequest( "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT, - randomCompositeWorkflow() + randomWorkflow() ) Assertions.assertNotNull(req) @@ -93,4 +102,109 @@ class IndexWorkflowRequestTests { Assertions.assertEquals(RestRequest.Method.PUT, newReq.method) Assertions.assertNotNull(newReq.workflow) } + + @Test + fun `test validate`() { + val req = IndexWorkflowRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT, + randomWorkflow(monitorIds = emptyList()) + ) + Assertions.assertNotNull(req) + // Empty input list + var validate = req.validate() + Assert.assertTrue(validate != null) + Assert.assertTrue(validate!!.message!!.contains("Delegates list can not be empty.;")) + // Duplicate delegate + val req1 = IndexWorkflowRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT, + randomWorkflow(monitorIds = listOf("1L", "1L", "2L")) + ) + validate = req1.validate() + Assert.assertTrue(validate != null) + Assert.assertTrue(validate!!.message!!.contains("Duplicate delegates not allowed")) + // Sequence not correct + var delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(1, "monitor-2"), + Delegate(2, "monitor-3") + ) + val req2 = IndexWorkflowRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT, + randomWorkflowWithDelegates( + input = listOf(CompositeInput(Sequence(delegates = delegates))) + ) + ) + validate = req2.validate() + Assert.assertTrue(validate != null) + Assert.assertTrue(validate!!.message!!.contains("Sequence ordering of delegate monitor shouldn't contain duplicate order values")) + // Chained finding sequence not correct + delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(2, "monitor-2", ChainedMonitorFindings("monitor-1")), + Delegate(3, "monitor-3", ChainedMonitorFindings("monitor-x")) + ) + val req3 = IndexWorkflowRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT, + randomWorkflowWithDelegates( + input = listOf(CompositeInput(Sequence(delegates = delegates))) + ) + ) + validate = req3.validate() + Assert.assertTrue(validate != null) + Assert.assertTrue(validate!!.message!!.contains("Chained Findings Monitor monitor-x doesn't exist in sequence")) + // Order not correct + delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(3, "monitor-2", ChainedMonitorFindings("monitor-1")), + Delegate(2, "monitor-3", ChainedMonitorFindings("monitor-2")) + ) + val req4 = IndexWorkflowRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT, + randomWorkflowWithDelegates( + input = listOf(CompositeInput(Sequence(delegates = delegates))) + ) + ) + validate = req4.validate() + Assert.assertTrue(validate != null) + Assert.assertTrue(validate!!.message!!.contains("Chained Findings Monitor monitor-2 should be executed before monitor monitor-3")) + // Max monitor size + val monitorsIds = mutableListOf() + for (i in 0..25) { + monitorsIds.add(UUID.randomUUID().toString()) + } + val req5 = IndexWorkflowRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT, + randomWorkflow( + monitorIds = monitorsIds + ) + ) + validate = req5.validate() + Assert.assertTrue(validate != null) + Assert.assertTrue(validate!!.message!!.contains("Delegates list can not be larger then 25.")) + // Input list empty + val req6 = IndexWorkflowRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT, + randomWorkflowWithDelegates( + input = emptyList() + ) + ) + validate = req6.validate() + Assert.assertTrue(validate != null) + Assert.assertTrue(validate!!.message!!.contains("Input list can not be empty.")) + // Input list multiple elements + delegates = listOf( + Delegate(1, "monitor-1") + ) + try { + IndexWorkflowRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT, + randomWorkflowWithDelegates( + input = listOf(CompositeInput(Sequence(delegates = delegates)), CompositeInput(Sequence(delegates = delegates))) + ) + ) + } catch (ex: Exception) { + Assert.assertTrue(ex is IllegalArgumentException) + Assert.assertTrue(ex.message!!.contains("Workflows can only have 1 search input.")) + } + } } diff --git a/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt index d0acbe38..c5afed05 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt @@ -16,13 +16,13 @@ import org.opensearch.commons.alerting.randomActionWithPolicy import org.opensearch.commons.alerting.randomAlert import org.opensearch.commons.alerting.randomBucketLevelMonitor import org.opensearch.commons.alerting.randomBucketLevelTrigger -import org.opensearch.commons.alerting.randomCompositeWorkflow import org.opensearch.commons.alerting.randomQueryLevelMonitor import org.opensearch.commons.alerting.randomQueryLevelMonitorWithoutUser import org.opensearch.commons.alerting.randomQueryLevelTrigger import org.opensearch.commons.alerting.randomThrottle import org.opensearch.commons.alerting.randomUser import org.opensearch.commons.alerting.randomUserEmpty +import org.opensearch.commons.alerting.randomWorkflow import org.opensearch.commons.alerting.toJsonString import org.opensearch.commons.alerting.toJsonStringWithUser import org.opensearch.commons.alerting.util.string @@ -163,7 +163,7 @@ class XContentTests { @Test fun `test composite workflow parsing`() { - val workflow = randomCompositeWorkflow() + val workflow = randomWorkflow() val monitorString = workflow.toJsonStringWithUser() val parsedMonitor = Workflow.parse(parser(monitorString)) Assertions.assertEquals(workflow, parsedMonitor, "Round tripping BucketLevelMonitor doesn't work")