Skip to content

Commit

Permalink
Added validation on IndexWorkflowRequest class (opensearch-project#405)
Browse files Browse the repository at this point in the history
* Added validation on IndexWorkflowRequest class

Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>

* Added unit test for index workflow request

Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>

---------

Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>
  • Loading branch information
stevanbz authored and eirsep committed May 24, 2023
1 parent 7a13cc2 commit bb3f846
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package org.opensearch.commons.alerting.action

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.action.ValidateActions
import org.opensearch.action.support.WriteRequest
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.commons.alerting.model.CompositeInput
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.rest.RestRequest
import java.io.IOException
import java.util.stream.Collectors

class IndexWorkflowRequest : ActionRequest {
val workflowId: String
Expand All @@ -18,6 +21,8 @@ class IndexWorkflowRequest : ActionRequest {
var workflow: Workflow
val rbacRoles: List<String>?

private val MAX_DELEGATE_SIZE = 25

constructor(
workflowId: String,
seqNo: Long,
Expand Down Expand Up @@ -48,7 +53,75 @@ class IndexWorkflowRequest : ActionRequest {
)

override fun validate(): ActionRequestValidationException? {
return null
var validationException: ActionRequestValidationException? = null

if (workflow.inputs.isEmpty()) {
validationException = ValidateActions.addValidationError(
"Input list can not be empty.", validationException
)
return validationException
}
if (workflow.inputs.size > 1) {
validationException = ValidateActions.addValidationError(
"Input list can contain only one element.", validationException
)
return validationException
}
if (workflow.inputs[0] !is CompositeInput) {
validationException = ValidateActions.addValidationError(
"When creating a workflow input must be CompositeInput", validationException
)
}
val compositeInput = workflow.inputs[0] as CompositeInput
val monitorIds = compositeInput.sequence.delegates.stream().map { it.monitorId }.collect(Collectors.toList())

if (monitorIds.isNullOrEmpty()) {
validationException = ValidateActions.addValidationError(
"Delegates list can not be empty.", validationException
)
// Break the flow because next checks are dependant on non-null monitorIds
return validationException
}

if (monitorIds.size > MAX_DELEGATE_SIZE) {
validationException = ValidateActions.addValidationError(
"Delegates list can not be larger then $MAX_DELEGATE_SIZE.", validationException
)
}

if (monitorIds.toSet().size != monitorIds.size) {
validationException = ValidateActions.addValidationError(
"Duplicate delegates not allowed", validationException
)
}
val delegates = compositeInput.sequence.delegates
val orderSet = delegates.stream().filter { it.order > 0 }.map { it.order }.collect(Collectors.toSet())
if (orderSet.size != delegates.size) {
validationException = ValidateActions.addValidationError(
"Sequence ordering of delegate monitor shouldn't contain duplicate order values", validationException
)
}

val monitorIdOrderMap: Map<String, Int> = delegates.associate { it.monitorId to it.order }
delegates.forEach {
if (it.chainedMonitorFindings != null) {
if (monitorIdOrderMap.containsKey(it.chainedMonitorFindings!!.monitorId) == false) {
validationException = ValidateActions.addValidationError(
"Chained Findings Monitor ${it.chainedMonitorFindings!!.monitorId} doesn't exist in sequence",
validationException
)
// Break the flow because next check will generate the NPE
return validationException
}
if (it.order <= monitorIdOrderMap[it.chainedMonitorFindings!!.monitorId]!!) {
validationException = ValidateActions.addValidationError(
"Chained Findings Monitor ${it.chainedMonitorFindings!!.monitorId} should be executed before monitor ${it.monitorId}",
validationException
)
}
}
}
return validationException
}

@Throws(IOException::class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ internal class AlertingPluginInterfaceTests {

@Test
fun indexWorkflow() {
val workflow = randomCompositeWorkflow()
val workflow = randomWorkflow()

val request = mock(IndexWorkflowRequest::class.java)
val response = IndexWorkflowResponse(
Expand Down
29 changes: 26 additions & 3 deletions src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,24 @@ fun randomDocumentLevelMonitor(
)
}

fun randomCompositeWorkflow(
fun randomWorkflow(
name: String = RandomStrings.randomAsciiLettersOfLength(Random(), 10),
user: User? = randomUser(),
inputs: List<WorkflowInput>? = null,
monitorIds: List<String>? = null,
schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES),
enabled: Boolean = Random().nextBoolean(),
enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null,
lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS),
): Workflow {
var input = inputs
val delegates = mutableListOf<Delegate>()
if (!monitorIds.isNullOrEmpty()) {
delegates.add(Delegate(1, monitorIds[0]))
for (i in 1 until monitorIds.size) {
// Order of monitors in workflow will be the same like forwarded meaning that the first monitorId will be used as second monitor chained finding
delegates.add(Delegate(i + 1, monitorIds [i], ChainedMonitorFindings(monitorIds[i - 1])))
}
}
var input = listOf(CompositeInput(Sequence(delegates)))
if (input == null) {
input = listOf(
CompositeInput(
Expand All @@ -187,6 +195,21 @@ fun randomCompositeWorkflow(
)
}

fun randomWorkflowWithDelegates(
name: String = RandomStrings.randomAsciiLettersOfLength(Random(), 10),
user: User? = randomUser(),
input: List<WorkflowInput>,
schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES),
enabled: Boolean = Random().nextBoolean(),
enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null,
lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS),
): Workflow {
return Workflow(
name = name, workflowType = Workflow.WorkflowType.COMPOSITE, enabled = enabled, inputs = input,
schedule = schedule, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
)
}

fun Workflow.toJsonStringWithUser(): String {
val builder = XContentFactory.jsonBuilder()
return this.toXContentWithUser(builder, ToXContent.EMPTY_PARAMS).string()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.opensearch.commons.alerting.action

import org.junit.Assert
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import org.opensearch.action.support.WriteRequest
Expand All @@ -8,10 +9,18 @@ import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput
import org.opensearch.common.io.stream.NamedWriteableRegistry
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.settings.Settings
import org.opensearch.commons.alerting.randomCompositeWorkflow
import org.opensearch.commons.alerting.model.ChainedMonitorFindings
import org.opensearch.commons.alerting.model.CompositeInput
import org.opensearch.commons.alerting.model.Delegate
import org.opensearch.commons.alerting.model.Sequence
import org.opensearch.commons.alerting.randomWorkflow
import org.opensearch.commons.alerting.randomWorkflowWithDelegates
import org.opensearch.commons.utils.recreateObject
import org.opensearch.rest.RestRequest
import org.opensearch.search.SearchModule
import java.lang.Exception
import java.lang.IllegalArgumentException
import java.util.UUID

class IndexWorkflowRequestTests {

Expand All @@ -20,7 +29,7 @@ class IndexWorkflowRequestTests {

val req = IndexWorkflowRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.POST,
randomCompositeWorkflow()
randomWorkflow()
)
Assertions.assertNotNull(req)

Expand All @@ -39,7 +48,7 @@ class IndexWorkflowRequestTests {
fun `test index composite workflow post request`() {
val req = IndexWorkflowRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.POST,
randomCompositeWorkflow()
randomWorkflow()
)
Assertions.assertNotNull(req)

Expand All @@ -59,7 +68,7 @@ class IndexWorkflowRequestTests {
fun `Index composite workflow serialize and deserialize transport object should be equal`() {
val compositeWorkflowRequest = IndexWorkflowRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.POST,
randomCompositeWorkflow()
randomWorkflow()
)

val recreatedObject = recreateObject(
Expand All @@ -79,7 +88,7 @@ class IndexWorkflowRequestTests {

val req = IndexWorkflowRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT,
randomCompositeWorkflow()
randomWorkflow()
)
Assertions.assertNotNull(req)

Expand All @@ -93,4 +102,109 @@ class IndexWorkflowRequestTests {
Assertions.assertEquals(RestRequest.Method.PUT, newReq.method)
Assertions.assertNotNull(newReq.workflow)
}

@Test
fun `test validate`() {
val req = IndexWorkflowRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT,
randomWorkflow(monitorIds = emptyList())
)
Assertions.assertNotNull(req)
// Empty input list
var validate = req.validate()
Assert.assertTrue(validate != null)
Assert.assertTrue(validate!!.message!!.contains("Delegates list can not be empty.;"))
// Duplicate delegate
val req1 = IndexWorkflowRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT,
randomWorkflow(monitorIds = listOf("1L", "1L", "2L"))
)
validate = req1.validate()
Assert.assertTrue(validate != null)
Assert.assertTrue(validate!!.message!!.contains("Duplicate delegates not allowed"))
// Sequence not correct
var delegates = listOf(
Delegate(1, "monitor-1"),
Delegate(1, "monitor-2"),
Delegate(2, "monitor-3")
)
val req2 = IndexWorkflowRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT,
randomWorkflowWithDelegates(
input = listOf(CompositeInput(Sequence(delegates = delegates)))
)
)
validate = req2.validate()
Assert.assertTrue(validate != null)
Assert.assertTrue(validate!!.message!!.contains("Sequence ordering of delegate monitor shouldn't contain duplicate order values"))
// Chained finding sequence not correct
delegates = listOf(
Delegate(1, "monitor-1"),
Delegate(2, "monitor-2", ChainedMonitorFindings("monitor-1")),
Delegate(3, "monitor-3", ChainedMonitorFindings("monitor-x"))
)
val req3 = IndexWorkflowRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT,
randomWorkflowWithDelegates(
input = listOf(CompositeInput(Sequence(delegates = delegates)))
)
)
validate = req3.validate()
Assert.assertTrue(validate != null)
Assert.assertTrue(validate!!.message!!.contains("Chained Findings Monitor monitor-x doesn't exist in sequence"))
// Order not correct
delegates = listOf(
Delegate(1, "monitor-1"),
Delegate(3, "monitor-2", ChainedMonitorFindings("monitor-1")),
Delegate(2, "monitor-3", ChainedMonitorFindings("monitor-2"))
)
val req4 = IndexWorkflowRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT,
randomWorkflowWithDelegates(
input = listOf(CompositeInput(Sequence(delegates = delegates)))
)
)
validate = req4.validate()
Assert.assertTrue(validate != null)
Assert.assertTrue(validate!!.message!!.contains("Chained Findings Monitor monitor-2 should be executed before monitor monitor-3"))
// Max monitor size
val monitorsIds = mutableListOf<String>()
for (i in 0..25) {
monitorsIds.add(UUID.randomUUID().toString())
}
val req5 = IndexWorkflowRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT,
randomWorkflow(
monitorIds = monitorsIds
)
)
validate = req5.validate()
Assert.assertTrue(validate != null)
Assert.assertTrue(validate!!.message!!.contains("Delegates list can not be larger then 25."))
// Input list empty
val req6 = IndexWorkflowRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT,
randomWorkflowWithDelegates(
input = emptyList()
)
)
validate = req6.validate()
Assert.assertTrue(validate != null)
Assert.assertTrue(validate!!.message!!.contains("Input list can not be empty."))
// Input list multiple elements
delegates = listOf(
Delegate(1, "monitor-1")
)
try {
IndexWorkflowRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT,
randomWorkflowWithDelegates(
input = listOf(CompositeInput(Sequence(delegates = delegates)), CompositeInput(Sequence(delegates = delegates)))
)
)
} catch (ex: Exception) {
Assert.assertTrue(ex is IllegalArgumentException)
Assert.assertTrue(ex.message!!.contains("Workflows can only have 1 search input."))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ import org.opensearch.commons.alerting.randomActionWithPolicy
import org.opensearch.commons.alerting.randomAlert
import org.opensearch.commons.alerting.randomBucketLevelMonitor
import org.opensearch.commons.alerting.randomBucketLevelTrigger
import org.opensearch.commons.alerting.randomCompositeWorkflow
import org.opensearch.commons.alerting.randomQueryLevelMonitor
import org.opensearch.commons.alerting.randomQueryLevelMonitorWithoutUser
import org.opensearch.commons.alerting.randomQueryLevelTrigger
import org.opensearch.commons.alerting.randomThrottle
import org.opensearch.commons.alerting.randomUser
import org.opensearch.commons.alerting.randomUserEmpty
import org.opensearch.commons.alerting.randomWorkflow
import org.opensearch.commons.alerting.toJsonString
import org.opensearch.commons.alerting.toJsonStringWithUser
import org.opensearch.commons.alerting.util.string
Expand Down Expand Up @@ -163,7 +163,7 @@ class XContentTests {

@Test
fun `test composite workflow parsing`() {
val workflow = randomCompositeWorkflow()
val workflow = randomWorkflow()
val monitorString = workflow.toJsonStringWithUser()
val parsedMonitor = Workflow.parse(parser(monitorString))
Assertions.assertEquals(workflow, parsedMonitor, "Round tripping BucketLevelMonitor doesn't work")
Expand Down

0 comments on commit bb3f846

Please sign in to comment.