Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added function for request recreation that considers the writeable re… #303

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.opensearch.commons.alerting
import org.opensearch.action.ActionListener
import org.opensearch.action.ActionResponse
import org.opensearch.client.node.NodeClient
import org.opensearch.common.io.stream.NamedWriteableRegistry
import org.opensearch.common.io.stream.Writeable
import org.opensearch.commons.alerting.action.AcknowledgeAlertRequest
import org.opensearch.commons.alerting.action.AcknowledgeAlertResponse
Expand All @@ -31,26 +32,27 @@ object AlertingPluginInterface {
* Index monitor interface.
* @param client Node client for making transport action
* @param request The request object
* @param namedWriteableRegistry Registry for building aggregations
* @param listener The listener for getting response
*/
fun indexMonitor(
client: NodeClient,
request: IndexMonitorRequest,
namedWriteableRegistry: NamedWriteableRegistry,
listener: ActionListener<IndexMonitorResponse>
) {
client.execute(
AlertingActions.INDEX_MONITOR_ACTION_TYPE,
request,
wrapActionListener(listener) { response ->
recreateObject(response) {
recreateObject(response, namedWriteableRegistry) {
IndexMonitorResponse(
it
)
}
}
)
}

fun deleteMonitor(
client: NodeClient,
request: DeleteMonitorRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.commons.utils

import org.opensearch.common.io.stream.InputStreamStreamInput
import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput
import org.opensearch.common.io.stream.NamedWriteableRegistry
import org.opensearch.common.io.stream.OutputStreamStreamOutput
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
Expand Down Expand Up @@ -36,3 +38,19 @@ inline fun <reified Request> recreateObject(writeable: Writeable, block: (Stream
}
}
}

/**
* Re create the object from the writeable. Uses NamedWriteableRegistry in order to build the aggregations.
* This method needs to be inline and reified so that when this is called from
* doExecute() of transport action, the object may be created from other JVM.
*/
inline fun <reified Request> recreateObject(writeable: Writeable, namedWriteableRegistry: NamedWriteableRegistry, block: (StreamInput) -> Request): Request {
ByteArrayOutputStream().use { byteArrayOutputStream ->
OutputStreamStreamOutput(byteArrayOutputStream).use {
writeable.writeTo(it)
InputStreamStreamInput(ByteArrayInputStream(byteArrayOutputStream.toByteArray())).use { streamInput ->
return block(NamedWriteableAwareStreamInput(streamInput, namedWriteableRegistry))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import org.mockito.junit.jupiter.MockitoExtension
import org.opensearch.action.ActionListener
import org.opensearch.action.ActionType
import org.opensearch.client.node.NodeClient
import org.opensearch.common.io.stream.NamedWriteableRegistry
import org.opensearch.common.settings.Settings
import org.opensearch.commons.alerting.action.DeleteMonitorRequest
import org.opensearch.commons.alerting.action.DeleteMonitorResponse
import org.opensearch.commons.alerting.action.GetAlertsRequest
Expand All @@ -25,6 +27,7 @@ import org.opensearch.commons.alerting.model.FindingWithDocs
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.rest.RestStatus
import org.opensearch.search.SearchModule

@Suppress("UNCHECKED_CAST")
@ExtendWith(MockitoExtension::class)
Expand All @@ -41,13 +44,33 @@ internal class AlertingPluginInterfaceTests {
val response = IndexMonitorResponse(Monitor.NO_ID, Monitor.NO_VERSION, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM, monitor)
val listener: ActionListener<IndexMonitorResponse> =
mock(ActionListener::class.java) as ActionListener<IndexMonitorResponse>
val namedWriteableRegistry = NamedWriteableRegistry(SearchModule(Settings.EMPTY, emptyList()).namedWriteables)

Mockito.doAnswer {
(it.getArgument(2) as ActionListener<IndexMonitorResponse>)
.onResponse(response)
}.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any())

AlertingPluginInterface.indexMonitor(client, request, listener)
AlertingPluginInterface.indexMonitor(client, request, namedWriteableRegistry, listener)
Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response))
}

@Test
fun indexBucketMonitor() {
val monitor = randomBucketLevelMonitor()

val request = mock(IndexMonitorRequest::class.java)
val response = IndexMonitorResponse(Monitor.NO_ID, Monitor.NO_VERSION, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM, monitor)
val listener: ActionListener<IndexMonitorResponse> =
mock(ActionListener::class.java) as ActionListener<IndexMonitorResponse>
val namedWriteableRegistry = NamedWriteableRegistry(SearchModule(Settings.EMPTY, emptyList()).namedWriteables)

Mockito.doAnswer {
(it.getArgument(2) as ActionListener<IndexMonitorResponse>)
.onResponse(response)
}.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any())
AlertingPluginInterface.indexMonitor(client, request, namedWriteableRegistry, listener)
Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response))
Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,16 @@ import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import org.opensearch.action.support.WriteRequest
import org.opensearch.common.io.stream.BytesStreamOutput
import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput
import org.opensearch.common.io.stream.NamedWriteableRegistry
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.settings.Settings
import org.opensearch.commons.alerting.model.SearchInput
import org.opensearch.commons.alerting.randomBucketLevelMonitor
import org.opensearch.commons.alerting.randomQueryLevelMonitor
import org.opensearch.commons.utils.recreateObject
import org.opensearch.rest.RestRequest
import org.opensearch.search.SearchModule
import org.opensearch.search.builder.SearchSourceBuilder

class IndexMonitorRequestTests {
Expand All @@ -32,6 +38,46 @@ class IndexMonitorRequestTests {
Assertions.assertNotNull(newReq.monitor)
}

@Test
fun `test index bucket monitor post request`() {
val req = IndexMonitorRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.POST,
randomBucketLevelMonitor()
)
Assertions.assertNotNull(req)

val out = BytesStreamOutput()
req.writeTo(out)
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
val namedWriteableRegistry = NamedWriteableRegistry(SearchModule(Settings.EMPTY, emptyList()).namedWriteables)
val newReq = IndexMonitorRequest(NamedWriteableAwareStreamInput(sin, namedWriteableRegistry))
Assertions.assertEquals("1234", newReq.monitorId)
Assertions.assertEquals(1L, newReq.seqNo)
Assertions.assertEquals(2L, newReq.primaryTerm)
Assertions.assertEquals(RestRequest.Method.POST, newReq.method)
Assertions.assertNotNull(newReq.monitor)
}

@Test
fun `Index bucket monitor serialize and deserialize transport object should be equal`() {
val bucketLevelMonitorRequest = IndexMonitorRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.POST,
randomBucketLevelMonitor()
)

Assertions.assertThrows(UnsupportedOperationException::class.java) {
recreateObject(bucketLevelMonitorRequest) { IndexMonitorRequest(it) }
}

val recreatedObject = recreateObject(bucketLevelMonitorRequest, NamedWriteableRegistry(SearchModule(Settings.EMPTY, emptyList()).namedWriteables)) { IndexMonitorRequest(it) }
Assertions.assertEquals(bucketLevelMonitorRequest.monitorId, recreatedObject.monitorId)
Assertions.assertEquals(bucketLevelMonitorRequest.seqNo, recreatedObject.seqNo)
Assertions.assertEquals(bucketLevelMonitorRequest.primaryTerm, recreatedObject.primaryTerm)
Assertions.assertEquals(bucketLevelMonitorRequest.method, recreatedObject.method)
Assertions.assertNotNull(recreatedObject.monitor)
Assertions.assertEquals(bucketLevelMonitorRequest.monitor, recreatedObject.monitor)
}

@Test
fun `test index monitor put request`() {

Expand Down