From b835c21ac044ea27ea99f63229e2dbc1be274531 Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Wed, 2 Nov 2022 12:49:41 -0700 Subject: [PATCH] =?UTF-8?q?Added=20function=20for=20request=20recreation?= =?UTF-8?q?=20that=20considers=20the=20writeable=20re=E2=80=A6=20(#303)=20?= =?UTF-8?q?(#311)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Stevan Buzejic <30922513+stevanbz@users.noreply.github.com> --- .../alerting/AlertingPluginInterface.kt | 6 ++- .../commons/utils/TransportHelpers.kt | 18 ++++++++ .../alerting/AlertingPluginInterfaceTests.kt | 25 +++++++++- .../action/IndexMonitorRequestTests.kt | 46 +++++++++++++++++++ 4 files changed, 92 insertions(+), 3 deletions(-) diff --git a/src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt b/src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt index cbeb12fc..05c60be8 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt @@ -7,6 +7,7 @@ package org.opensearch.commons.alerting import org.opensearch.action.ActionListener import org.opensearch.action.ActionResponse import org.opensearch.client.node.NodeClient +import org.opensearch.common.io.stream.NamedWriteableRegistry import org.opensearch.common.io.stream.Writeable import org.opensearch.commons.alerting.action.AcknowledgeAlertRequest import org.opensearch.commons.alerting.action.AcknowledgeAlertResponse @@ -31,18 +32,20 @@ object AlertingPluginInterface { * Index monitor interface. * @param client Node client for making transport action * @param request The request object + * @param namedWriteableRegistry Registry for building aggregations * @param listener The listener for getting response */ fun indexMonitor( client: NodeClient, request: IndexMonitorRequest, + namedWriteableRegistry: NamedWriteableRegistry, listener: ActionListener ) { client.execute( AlertingActions.INDEX_MONITOR_ACTION_TYPE, request, wrapActionListener(listener) { response -> - recreateObject(response) { + recreateObject(response, namedWriteableRegistry) { IndexMonitorResponse( it ) @@ -50,7 +53,6 @@ object AlertingPluginInterface { } ) } - fun deleteMonitor( client: NodeClient, request: DeleteMonitorRequest, diff --git a/src/main/kotlin/org/opensearch/commons/utils/TransportHelpers.kt b/src/main/kotlin/org/opensearch/commons/utils/TransportHelpers.kt index 049dabdc..cec7ba81 100644 --- a/src/main/kotlin/org/opensearch/commons/utils/TransportHelpers.kt +++ b/src/main/kotlin/org/opensearch/commons/utils/TransportHelpers.kt @@ -6,6 +6,8 @@ package org.opensearch.commons.utils import org.opensearch.common.io.stream.InputStreamStreamInput +import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput +import org.opensearch.common.io.stream.NamedWriteableRegistry import org.opensearch.common.io.stream.OutputStreamStreamOutput import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.StreamOutput @@ -36,3 +38,19 @@ inline fun recreateObject(writeable: Writeable, block: (Stream } } } + +/** + * Re create the object from the writeable. Uses NamedWriteableRegistry in order to build the aggregations. + * This method needs to be inline and reified so that when this is called from + * doExecute() of transport action, the object may be created from other JVM. + */ +inline fun recreateObject(writeable: Writeable, namedWriteableRegistry: NamedWriteableRegistry, block: (StreamInput) -> Request): Request { + ByteArrayOutputStream().use { byteArrayOutputStream -> + OutputStreamStreamOutput(byteArrayOutputStream).use { + writeable.writeTo(it) + InputStreamStreamInput(ByteArrayInputStream(byteArrayOutputStream.toByteArray())).use { streamInput -> + return block(NamedWriteableAwareStreamInput(streamInput, namedWriteableRegistry)) + } + } + } +} diff --git a/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt index 6171b8a9..62e425d9 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt @@ -12,6 +12,8 @@ import org.mockito.junit.jupiter.MockitoExtension import org.opensearch.action.ActionListener import org.opensearch.action.ActionType import org.opensearch.client.node.NodeClient +import org.opensearch.common.io.stream.NamedWriteableRegistry +import org.opensearch.common.settings.Settings import org.opensearch.commons.alerting.action.DeleteMonitorRequest import org.opensearch.commons.alerting.action.DeleteMonitorResponse import org.opensearch.commons.alerting.action.GetAlertsRequest @@ -25,6 +27,7 @@ import org.opensearch.commons.alerting.model.FindingWithDocs import org.opensearch.commons.alerting.model.Monitor import org.opensearch.index.seqno.SequenceNumbers import org.opensearch.rest.RestStatus +import org.opensearch.search.SearchModule @Suppress("UNCHECKED_CAST") @ExtendWith(MockitoExtension::class) @@ -41,13 +44,33 @@ internal class AlertingPluginInterfaceTests { val response = IndexMonitorResponse(Monitor.NO_ID, Monitor.NO_VERSION, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM, monitor) val listener: ActionListener = mock(ActionListener::class.java) as ActionListener + val namedWriteableRegistry = NamedWriteableRegistry(SearchModule(Settings.EMPTY, emptyList()).namedWriteables) Mockito.doAnswer { (it.getArgument(2) as ActionListener) .onResponse(response) }.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any()) - AlertingPluginInterface.indexMonitor(client, request, listener) + AlertingPluginInterface.indexMonitor(client, request, namedWriteableRegistry, listener) + Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response)) + } + + @Test + fun indexBucketMonitor() { + val monitor = randomBucketLevelMonitor() + + val request = mock(IndexMonitorRequest::class.java) + val response = IndexMonitorResponse(Monitor.NO_ID, Monitor.NO_VERSION, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM, monitor) + val listener: ActionListener = + mock(ActionListener::class.java) as ActionListener + val namedWriteableRegistry = NamedWriteableRegistry(SearchModule(Settings.EMPTY, emptyList()).namedWriteables) + + Mockito.doAnswer { + (it.getArgument(2) as ActionListener) + .onResponse(response) + }.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any()) + AlertingPluginInterface.indexMonitor(client, request, namedWriteableRegistry, listener) + Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response)) Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response)) } diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/IndexMonitorRequestTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/IndexMonitorRequestTests.kt index ce26c7d1..34a1c334 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/action/IndexMonitorRequestTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/IndexMonitorRequestTests.kt @@ -4,10 +4,16 @@ import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test import org.opensearch.action.support.WriteRequest import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput +import org.opensearch.common.io.stream.NamedWriteableRegistry import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.settings.Settings import org.opensearch.commons.alerting.model.SearchInput +import org.opensearch.commons.alerting.randomBucketLevelMonitor import org.opensearch.commons.alerting.randomQueryLevelMonitor +import org.opensearch.commons.utils.recreateObject import org.opensearch.rest.RestRequest +import org.opensearch.search.SearchModule import org.opensearch.search.builder.SearchSourceBuilder class IndexMonitorRequestTests { @@ -32,6 +38,46 @@ class IndexMonitorRequestTests { Assertions.assertNotNull(newReq.monitor) } + @Test + fun `test index bucket monitor post request`() { + val req = IndexMonitorRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.POST, + randomBucketLevelMonitor() + ) + Assertions.assertNotNull(req) + + val out = BytesStreamOutput() + req.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val namedWriteableRegistry = NamedWriteableRegistry(SearchModule(Settings.EMPTY, emptyList()).namedWriteables) + val newReq = IndexMonitorRequest(NamedWriteableAwareStreamInput(sin, namedWriteableRegistry)) + Assertions.assertEquals("1234", newReq.monitorId) + Assertions.assertEquals(1L, newReq.seqNo) + Assertions.assertEquals(2L, newReq.primaryTerm) + Assertions.assertEquals(RestRequest.Method.POST, newReq.method) + Assertions.assertNotNull(newReq.monitor) + } + + @Test + fun `Index bucket monitor serialize and deserialize transport object should be equal`() { + val bucketLevelMonitorRequest = IndexMonitorRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.POST, + randomBucketLevelMonitor() + ) + + Assertions.assertThrows(UnsupportedOperationException::class.java) { + recreateObject(bucketLevelMonitorRequest) { IndexMonitorRequest(it) } + } + + val recreatedObject = recreateObject(bucketLevelMonitorRequest, NamedWriteableRegistry(SearchModule(Settings.EMPTY, emptyList()).namedWriteables)) { IndexMonitorRequest(it) } + Assertions.assertEquals(bucketLevelMonitorRequest.monitorId, recreatedObject.monitorId) + Assertions.assertEquals(bucketLevelMonitorRequest.seqNo, recreatedObject.seqNo) + Assertions.assertEquals(bucketLevelMonitorRequest.primaryTerm, recreatedObject.primaryTerm) + Assertions.assertEquals(bucketLevelMonitorRequest.method, recreatedObject.method) + Assertions.assertNotNull(recreatedObject.monitor) + Assertions.assertEquals(bucketLevelMonitorRequest.monitor, recreatedObject.monitor) + } + @Test fun `test index monitor put request`() {