Skip to content

Commit

Permalink
Added function for request recreation that considers the writeable re… (
Browse files Browse the repository at this point in the history
opensearch-project#303) (opensearch-project#304)

* Added function for request recreation that considers the writeable registry used for parsing the aggregations

Co-authored-by: Stevan Buzejic <stevan.buzejic@htecgroup.com>
(cherry picked from commit 6a169f7)

Co-authored-by: Stevan Buzejic <30922513+stevanbz@users.noreply.github.com>
Signed-off-by: AWSHurneyt <hurneyt@amazon.com>
  • Loading branch information
2 people authored and AWSHurneyt committed Apr 12, 2024
1 parent f658e9a commit 5106dc1
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.opensearch.commons.alerting
import org.opensearch.action.ActionListener
import org.opensearch.action.ActionResponse
import org.opensearch.client.node.NodeClient
import org.opensearch.common.io.stream.NamedWriteableRegistry
import org.opensearch.common.io.stream.Writeable
import org.opensearch.commons.alerting.action.AcknowledgeAlertRequest
import org.opensearch.commons.alerting.action.AcknowledgeAlertResponse
Expand All @@ -31,26 +32,27 @@ object AlertingPluginInterface {
* Index monitor interface.
* @param client Node client for making transport action
* @param request The request object
* @param namedWriteableRegistry Registry for building aggregations
* @param listener The listener for getting response
*/
fun indexMonitor(
client: NodeClient,
request: IndexMonitorRequest,
namedWriteableRegistry: NamedWriteableRegistry,
listener: ActionListener<IndexMonitorResponse>
) {
client.execute(
AlertingActions.INDEX_MONITOR_ACTION_TYPE,
request,
wrapActionListener(listener) { response ->
recreateObject(response) {
recreateObject(response, namedWriteableRegistry) {
IndexMonitorResponse(
it
)
}
}
)
}

fun deleteMonitor(
client: NodeClient,
request: DeleteMonitorRequest,
Expand Down
18 changes: 18 additions & 0 deletions src/main/kotlin/org/opensearch/commons/utils/TransportHelpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.commons.utils

import org.opensearch.common.io.stream.InputStreamStreamInput
import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput
import org.opensearch.common.io.stream.NamedWriteableRegistry
import org.opensearch.common.io.stream.OutputStreamStreamOutput
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
Expand Down Expand Up @@ -36,3 +38,19 @@ inline fun <reified Request> recreateObject(writeable: Writeable, block: (Stream
}
}
}

/**
* Re create the object from the writeable. Uses NamedWriteableRegistry in order to build the aggregations.
* This method needs to be inline and reified so that when this is called from
* doExecute() of transport action, the object may be created from other JVM.
*/
inline fun <reified Request> recreateObject(writeable: Writeable, namedWriteableRegistry: NamedWriteableRegistry, block: (StreamInput) -> Request): Request {
ByteArrayOutputStream().use { byteArrayOutputStream ->
OutputStreamStreamOutput(byteArrayOutputStream).use {
writeable.writeTo(it)
InputStreamStreamInput(ByteArrayInputStream(byteArrayOutputStream.toByteArray())).use { streamInput ->
return block(NamedWriteableAwareStreamInput(streamInput, namedWriteableRegistry))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import org.mockito.junit.jupiter.MockitoExtension
import org.opensearch.action.ActionListener
import org.opensearch.action.ActionType
import org.opensearch.client.node.NodeClient
import org.opensearch.common.io.stream.NamedWriteableRegistry
import org.opensearch.common.settings.Settings
import org.opensearch.commons.alerting.action.DeleteMonitorRequest
import org.opensearch.commons.alerting.action.DeleteMonitorResponse
import org.opensearch.commons.alerting.action.GetAlertsRequest
Expand All @@ -25,6 +27,7 @@ import org.opensearch.commons.alerting.model.FindingWithDocs
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.rest.RestStatus
import org.opensearch.search.SearchModule

@Suppress("UNCHECKED_CAST")
@ExtendWith(MockitoExtension::class)
Expand All @@ -41,13 +44,33 @@ internal class AlertingPluginInterfaceTests {
val response = IndexMonitorResponse(Monitor.NO_ID, Monitor.NO_VERSION, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM, monitor)
val listener: ActionListener<IndexMonitorResponse> =
mock(ActionListener::class.java) as ActionListener<IndexMonitorResponse>
val namedWriteableRegistry = NamedWriteableRegistry(SearchModule(Settings.EMPTY, emptyList()).namedWriteables)

Mockito.doAnswer {
(it.getArgument(2) as ActionListener<IndexMonitorResponse>)
.onResponse(response)
}.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any())

AlertingPluginInterface.indexMonitor(client, request, listener)
AlertingPluginInterface.indexMonitor(client, request, namedWriteableRegistry, listener)
Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response))
}

@Test
fun indexBucketMonitor() {
val monitor = randomBucketLevelMonitor()

val request = mock(IndexMonitorRequest::class.java)
val response = IndexMonitorResponse(Monitor.NO_ID, Monitor.NO_VERSION, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM, monitor)
val listener: ActionListener<IndexMonitorResponse> =
mock(ActionListener::class.java) as ActionListener<IndexMonitorResponse>
val namedWriteableRegistry = NamedWriteableRegistry(SearchModule(Settings.EMPTY, emptyList()).namedWriteables)

Mockito.doAnswer {
(it.getArgument(2) as ActionListener<IndexMonitorResponse>)
.onResponse(response)
}.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any())
AlertingPluginInterface.indexMonitor(client, request, namedWriteableRegistry, listener)
Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response))
Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,16 @@ import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import org.opensearch.action.support.WriteRequest
import org.opensearch.common.io.stream.BytesStreamOutput
import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput
import org.opensearch.common.io.stream.NamedWriteableRegistry
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.settings.Settings
import org.opensearch.commons.alerting.model.SearchInput
import org.opensearch.commons.alerting.randomBucketLevelMonitor
import org.opensearch.commons.alerting.randomQueryLevelMonitor
import org.opensearch.commons.utils.recreateObject
import org.opensearch.rest.RestRequest
import org.opensearch.search.SearchModule
import org.opensearch.search.builder.SearchSourceBuilder

class IndexMonitorRequestTests {
Expand All @@ -32,6 +38,46 @@ class IndexMonitorRequestTests {
Assertions.assertNotNull(newReq.monitor)
}

@Test
fun `test index bucket monitor post request`() {
val req = IndexMonitorRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.POST,
randomBucketLevelMonitor()
)
Assertions.assertNotNull(req)

val out = BytesStreamOutput()
req.writeTo(out)
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
val namedWriteableRegistry = NamedWriteableRegistry(SearchModule(Settings.EMPTY, emptyList()).namedWriteables)
val newReq = IndexMonitorRequest(NamedWriteableAwareStreamInput(sin, namedWriteableRegistry))
Assertions.assertEquals("1234", newReq.monitorId)
Assertions.assertEquals(1L, newReq.seqNo)
Assertions.assertEquals(2L, newReq.primaryTerm)
Assertions.assertEquals(RestRequest.Method.POST, newReq.method)
Assertions.assertNotNull(newReq.monitor)
}

@Test
fun `Index bucket monitor serialize and deserialize transport object should be equal`() {
val bucketLevelMonitorRequest = IndexMonitorRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.POST,
randomBucketLevelMonitor()
)

Assertions.assertThrows(UnsupportedOperationException::class.java) {
recreateObject(bucketLevelMonitorRequest) { IndexMonitorRequest(it) }
}

val recreatedObject = recreateObject(bucketLevelMonitorRequest, NamedWriteableRegistry(SearchModule(Settings.EMPTY, emptyList()).namedWriteables)) { IndexMonitorRequest(it) }
Assertions.assertEquals(bucketLevelMonitorRequest.monitorId, recreatedObject.monitorId)
Assertions.assertEquals(bucketLevelMonitorRequest.seqNo, recreatedObject.seqNo)
Assertions.assertEquals(bucketLevelMonitorRequest.primaryTerm, recreatedObject.primaryTerm)
Assertions.assertEquals(bucketLevelMonitorRequest.method, recreatedObject.method)
Assertions.assertNotNull(recreatedObject.monitor)
Assertions.assertEquals(bucketLevelMonitorRequest.monitor, recreatedObject.monitor)
}

@Test
fun `test index monitor put request`() {

Expand Down

0 comments on commit 5106dc1

Please sign in to comment.