Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added validation on IndexWorkflowRequest class #405

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package org.opensearch.commons.alerting.action

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.action.ValidateActions
import org.opensearch.action.support.WriteRequest
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.commons.alerting.model.CompositeInput
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.rest.RestRequest
import java.io.IOException
import java.util.stream.Collectors

class IndexWorkflowRequest : ActionRequest {
val workflowId: String
Expand All @@ -18,6 +21,8 @@ class IndexWorkflowRequest : ActionRequest {
var workflow: Workflow
val rbacRoles: List<String>?

private val MAX_DELEGATE_SIZE = 25

constructor(
workflowId: String,
seqNo: Long,
Expand Down Expand Up @@ -48,7 +53,75 @@ class IndexWorkflowRequest : ActionRequest {
)

override fun validate(): ActionRequestValidationException? {
return null
var validationException: ActionRequestValidationException? = null

if (workflow.inputs.isEmpty()) {
validationException = ValidateActions.addValidationError(
"Input list can not be empty.", validationException
)
return validationException
}
if (workflow.inputs.size > 1) {
validationException = ValidateActions.addValidationError(
"Input list can contain only one element.", validationException
)
return validationException
}
if (workflow.inputs[0] !is CompositeInput) {
validationException = ValidateActions.addValidationError(
"When creating a workflow input must be CompositeInput", validationException
)
}
val compositeInput = workflow.inputs[0] as CompositeInput
val monitorIds = compositeInput.sequence.delegates.stream().map { it.monitorId }.collect(Collectors.toList())

if (monitorIds.isNullOrEmpty()) {
validationException = ValidateActions.addValidationError(
"Delegates list can not be empty.", validationException
)
// Break the flow because next checks are dependant on non-null monitorIds
return validationException
}

if (monitorIds.size > MAX_DELEGATE_SIZE) {
validationException = ValidateActions.addValidationError(
"Delegates list can not be larger then $MAX_DELEGATE_SIZE.", validationException
)
}

if (monitorIds.toSet().size != monitorIds.size) {
validationException = ValidateActions.addValidationError(
"Duplicate delegates not allowed", validationException
)
}
val delegates = compositeInput.sequence.delegates
val orderSet = delegates.stream().filter { it.order > 0 }.map { it.order }.collect(Collectors.toSet())
if (orderSet.size != delegates.size) {
validationException = ValidateActions.addValidationError(
"Sequence ordering of delegate monitor shouldn't contain duplicate order values", validationException
)
}

val monitorIdOrderMap: Map<String, Int> = delegates.associate { it.monitorId to it.order }
delegates.forEach {
if (it.chainedMonitorFindings != null) {
if (monitorIdOrderMap.containsKey(it.chainedMonitorFindings!!.monitorId) == false) {
validationException = ValidateActions.addValidationError(
"Chained Findings Monitor ${it.chainedMonitorFindings!!.monitorId} doesn't exist in sequence",
validationException
)
// Break the flow because next check will generate the NPE
return validationException
}
if (it.order <= monitorIdOrderMap[it.chainedMonitorFindings!!.monitorId]!!) {
validationException = ValidateActions.addValidationError(
"Chained Findings Monitor ${it.chainedMonitorFindings!!.monitorId} should be executed before monitor ${it.monitorId}",
validationException
)
}
}
}
return validationException
}

@Throws(IOException::class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ internal class AlertingPluginInterfaceTests {

@Test
fun indexWorkflow() {
val workflow = randomCompositeWorkflow()
val workflow = randomWorkflow()

val request = mock(IndexWorkflowRequest::class.java)
val response = IndexWorkflowResponse(
Expand Down
29 changes: 26 additions & 3 deletions src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,24 @@ fun randomDocumentLevelMonitor(
)
}

fun randomCompositeWorkflow(
fun randomWorkflow(
name: String = RandomStrings.randomAsciiLettersOfLength(Random(), 10),
user: User? = randomUser(),
inputs: List<WorkflowInput>? = null,
monitorIds: List<String>? = null,
schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES),
enabled: Boolean = Random().nextBoolean(),
enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null,
lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS),
): Workflow {
var input = inputs
val delegates = mutableListOf<Delegate>()
if (!monitorIds.isNullOrEmpty()) {
delegates.add(Delegate(1, monitorIds[0]))
for (i in 1 until monitorIds.size) {
// Order of monitors in workflow will be the same like forwarded meaning that the first monitorId will be used as second monitor chained finding
delegates.add(Delegate(i + 1, monitorIds [i], ChainedMonitorFindings(monitorIds[i - 1])))
}
}
var input = listOf(CompositeInput(Sequence(delegates)))
if (input == null) {
input = listOf(
CompositeInput(
Expand All @@ -186,6 +194,21 @@ fun randomCompositeWorkflow(
)
}

fun randomWorkflowWithDelegates(
name: String = RandomStrings.randomAsciiLettersOfLength(Random(), 10),
user: User? = randomUser(),
input: List<WorkflowInput>,
schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES),
enabled: Boolean = Random().nextBoolean(),
enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null,
lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS),
): Workflow {
return Workflow(
name = name, workflowType = Workflow.WorkflowType.COMPOSITE, enabled = enabled, inputs = input,
schedule = schedule, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
)
}

fun Workflow.toJsonStringWithUser(): String {
val builder = XContentFactory.jsonBuilder()
return this.toXContentWithUser(builder, ToXContent.EMPTY_PARAMS).string()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.opensearch.commons.alerting.action

import org.junit.Assert
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import org.opensearch.action.support.WriteRequest
Expand All @@ -8,10 +9,18 @@ import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput
import org.opensearch.common.io.stream.NamedWriteableRegistry
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.settings.Settings
import org.opensearch.commons.alerting.randomCompositeWorkflow
import org.opensearch.commons.alerting.model.ChainedMonitorFindings
import org.opensearch.commons.alerting.model.CompositeInput
import org.opensearch.commons.alerting.model.Delegate
import org.opensearch.commons.alerting.model.Sequence
import org.opensearch.commons.alerting.randomWorkflow
import org.opensearch.commons.alerting.randomWorkflowWithDelegates
import org.opensearch.commons.utils.recreateObject
import org.opensearch.rest.RestRequest
import org.opensearch.search.SearchModule
import java.lang.Exception
import java.lang.IllegalArgumentException
import java.util.UUID

class IndexWorkflowRequestTests {

Expand All @@ -20,7 +29,7 @@ class IndexWorkflowRequestTests {

val req = IndexWorkflowRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.POST,
randomCompositeWorkflow()
randomWorkflow()
)
Assertions.assertNotNull(req)

Expand All @@ -39,7 +48,7 @@ class IndexWorkflowRequestTests {
fun `test index composite workflow post request`() {
val req = IndexWorkflowRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.POST,
randomCompositeWorkflow()
randomWorkflow()
)
Assertions.assertNotNull(req)

Expand All @@ -59,7 +68,7 @@ class IndexWorkflowRequestTests {
fun `Index composite workflow serialize and deserialize transport object should be equal`() {
val compositeWorkflowRequest = IndexWorkflowRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.POST,
randomCompositeWorkflow()
randomWorkflow()
)

val recreatedObject = recreateObject(
Expand All @@ -79,7 +88,7 @@ class IndexWorkflowRequestTests {

val req = IndexWorkflowRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT,
randomCompositeWorkflow()
randomWorkflow()
)
Assertions.assertNotNull(req)

Expand All @@ -93,4 +102,109 @@ class IndexWorkflowRequestTests {
Assertions.assertEquals(RestRequest.Method.PUT, newReq.method)
Assertions.assertNotNull(newReq.workflow)
}

@Test
fun `test validate`() {
val req = IndexWorkflowRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT,
randomWorkflow(monitorIds = emptyList())
)
Assertions.assertNotNull(req)
// Empty input list
var validate = req.validate()
Assert.assertTrue(validate != null)
Assert.assertTrue(validate!!.message!!.contains("Delegates list can not be empty.;"))
// Duplicate delegate
val req1 = IndexWorkflowRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT,
randomWorkflow(monitorIds = listOf("1L", "1L", "2L"))
)
validate = req1.validate()
Assert.assertTrue(validate != null)
Assert.assertTrue(validate!!.message!!.contains("Duplicate delegates not allowed"))
// Sequence not correct
var delegates = listOf(
Delegate(1, "monitor-1"),
Delegate(1, "monitor-2"),
Delegate(2, "monitor-3")
)
val req2 = IndexWorkflowRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT,
randomWorkflowWithDelegates(
input = listOf(CompositeInput(Sequence(delegates = delegates)))
)
)
validate = req2.validate()
Assert.assertTrue(validate != null)
Assert.assertTrue(validate!!.message!!.contains("Sequence ordering of delegate monitor shouldn't contain duplicate order values"))
// Chained finding sequence not correct
delegates = listOf(
Delegate(1, "monitor-1"),
Delegate(2, "monitor-2", ChainedMonitorFindings("monitor-1")),
Delegate(3, "monitor-3", ChainedMonitorFindings("monitor-x"))
)
val req3 = IndexWorkflowRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT,
randomWorkflowWithDelegates(
input = listOf(CompositeInput(Sequence(delegates = delegates)))
)
)
validate = req3.validate()
Assert.assertTrue(validate != null)
Assert.assertTrue(validate!!.message!!.contains("Chained Findings Monitor monitor-x doesn't exist in sequence"))
// Order not correct
delegates = listOf(
Delegate(1, "monitor-1"),
Delegate(3, "monitor-2", ChainedMonitorFindings("monitor-1")),
Delegate(2, "monitor-3", ChainedMonitorFindings("monitor-2"))
)
val req4 = IndexWorkflowRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT,
randomWorkflowWithDelegates(
input = listOf(CompositeInput(Sequence(delegates = delegates)))
)
)
validate = req4.validate()
Assert.assertTrue(validate != null)
Assert.assertTrue(validate!!.message!!.contains("Chained Findings Monitor monitor-2 should be executed before monitor monitor-3"))
// Max monitor size
val monitorsIds = mutableListOf<String>()
for (i in 0..25) {
monitorsIds.add(UUID.randomUUID().toString())
}
val req5 = IndexWorkflowRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT,
randomWorkflow(
monitorIds = monitorsIds
)
)
validate = req5.validate()
Assert.assertTrue(validate != null)
Assert.assertTrue(validate!!.message!!.contains("Delegates list can not be larger then 25."))
// Input list empty
val req6 = IndexWorkflowRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT,
randomWorkflowWithDelegates(
input = emptyList()
)
)
validate = req6.validate()
Assert.assertTrue(validate != null)
Assert.assertTrue(validate!!.message!!.contains("Input list can not be empty."))
// Input list multiple elements
delegates = listOf(
Delegate(1, "monitor-1")
)
try {
IndexWorkflowRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT,
randomWorkflowWithDelegates(
input = listOf(CompositeInput(Sequence(delegates = delegates)), CompositeInput(Sequence(delegates = delegates)))
)
)
} catch (ex: Exception) {
Assert.assertTrue(ex is IllegalArgumentException)
Assert.assertTrue(ex.message!!.contains("Workflows can only have 1 search input."))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ import org.opensearch.commons.alerting.randomActionWithPolicy
import org.opensearch.commons.alerting.randomAlert
import org.opensearch.commons.alerting.randomBucketLevelMonitor
import org.opensearch.commons.alerting.randomBucketLevelTrigger
import org.opensearch.commons.alerting.randomCompositeWorkflow
import org.opensearch.commons.alerting.randomQueryLevelMonitor
import org.opensearch.commons.alerting.randomQueryLevelMonitorWithoutUser
import org.opensearch.commons.alerting.randomQueryLevelTrigger
import org.opensearch.commons.alerting.randomThrottle
import org.opensearch.commons.alerting.randomUser
import org.opensearch.commons.alerting.randomUserEmpty
import org.opensearch.commons.alerting.randomWorkflow
import org.opensearch.commons.alerting.toJsonString
import org.opensearch.commons.alerting.toJsonStringWithUser
import org.opensearch.commons.alerting.util.string
Expand Down Expand Up @@ -162,7 +162,7 @@ class XContentTests {

@Test
fun `test composite workflow parsing`() {
val workflow = randomCompositeWorkflow()
val workflow = randomWorkflow()
val monitorString = workflow.toJsonStringWithUser()
val parsedMonitor = Workflow.parse(parser(monitorString))
Assertions.assertEquals(workflow, parsedMonitor, "Round tripping BucketLevelMonitor doesn't work")
Expand Down