From a71731f9f9568e0673abd471bdfc25e75d2d47b1 Mon Sep 17 00:00:00 2001 From: David Venable Date: Sat, 4 Nov 2023 15:26:09 -0500 Subject: [PATCH] Correctly support custom properties in composable index templates in the OpenSearch sink. This resolves #3506. Signed-off-by: David Venable --- .../opensearch/DeclaredOpenSearchVersion.java | 2 +- .../sink/opensearch/OpenSearchSinkIT.java | 286 ++++++++++-------- .../index/ComposableIndexTemplate.java | 9 +- .../index/ComposableTemplateAPIWrapper.java | 61 ++-- .../opensearch/index/LegacyIndexTemplate.java | 2 +- .../ComposableIndexTemplateStrategyTest.java | 89 ++++++ .../ComposableTemplateAPIWrapperTest.java | 164 ++++++---- 7 files changed, 398 insertions(+), 215 deletions(-) diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/DeclaredOpenSearchVersion.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/DeclaredOpenSearchVersion.java index 8f62098746..8112f075c3 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/DeclaredOpenSearchVersion.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/DeclaredOpenSearchVersion.java @@ -33,7 +33,7 @@ private DeclaredOpenSearchVersion(final Distribution distribution, final String } static DeclaredOpenSearchVersion parse(final String versionString) { - if(versionString == null) { + if(versionString == null || versionString.isEmpty()) { return DEFAULT; } diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java index 755ccb5283..bfe49a7be0 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java @@ -8,11 +8,11 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.JsonNodeType; import io.micrometer.core.instrument.Measurement; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.RandomStringUtils; import org.apache.http.util.EntityUtils; -import org.hamcrest.MatcherAssert; import org.junit.Assert; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -85,6 +85,7 @@ import static org.hamcrest.CoreMatchers.hasItems; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.closeTo; import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.mockito.ArgumentMatchers.any; @@ -173,17 +174,17 @@ public void testInstantiateSinkRawSpanDefault() throws IOException { final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW); Request request = new Request(HttpMethod.HEAD, indexAlias); Response response = client.performRequest(request); - MatcherAssert.assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); + assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); final String index = String.format("%s-000001", indexAlias); final Map mappings = getIndexMappings(index); - MatcherAssert.assertThat(mappings, notNullValue()); - MatcherAssert.assertThat((boolean) mappings.get("date_detection"), equalTo(false)); + assertThat(mappings, notNullValue()); + assertThat((boolean) mappings.get("date_detection"), equalTo(false)); sink.shutdown(); if (isOSBundle()) { // Check managed index await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> { - MatcherAssert.assertThat(getIndexPolicyId(index), equalTo(IndexConstants.RAW_ISM_POLICY)); + assertThat(getIndexPolicyId(index), equalTo(IndexConstants.RAW_ISM_POLICY)); } ); } @@ -192,7 +193,7 @@ public void testInstantiateSinkRawSpanDefault() throws IOException { request = new Request(HttpMethod.POST, String.format("%s/_rollover", indexAlias)); request.setJsonEntity("{ \"conditions\" : { } }\n"); response = client.performRequest(request); - MatcherAssert.assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); + assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); // Instantiate sink again sink = createObjectUnderTest(pluginSetting, true); @@ -200,12 +201,12 @@ public void testInstantiateSinkRawSpanDefault() throws IOException { final String rolloverIndexName = String.format("%s-000002", indexAlias); request = new Request(HttpMethod.GET, rolloverIndexName + "/_alias"); response = client.performRequest(request); - MatcherAssert.assertThat(checkIsWriteIndex(EntityUtils.toString(response.getEntity()), indexAlias, rolloverIndexName), equalTo(true)); + assertThat(checkIsWriteIndex(EntityUtils.toString(response.getEntity()), indexAlias, rolloverIndexName), equalTo(true)); sink.shutdown(); if (isOSBundle()) { // Check managed index - MatcherAssert.assertThat(getIndexPolicyId(rolloverIndexName), equalTo(IndexConstants.RAW_ISM_POLICY)); + assertThat(getIndexPolicyId(rolloverIndexName), equalTo(IndexConstants.RAW_ISM_POLICY)); } } @@ -240,21 +241,21 @@ public void testOutputRawSpanDefault(final boolean estimateBulkSizeUsingCompress final String expIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW); final List> retSources = getSearchResponseDocSources(expIndexAlias); - MatcherAssert.assertThat(retSources.size(), equalTo(2)); - MatcherAssert.assertThat(retSources, hasItems(expData1, expData2)); - MatcherAssert.assertThat(getDocumentCount(expIndexAlias, "_id", (String) expData1.get("spanId")), equalTo(Integer.valueOf(1))); + assertThat(retSources.size(), equalTo(2)); + assertThat(retSources, hasItems(expData1, expData2)); + assertThat(getDocumentCount(expIndexAlias, "_id", (String) expData1.get("spanId")), equalTo(Integer.valueOf(1))); sink.shutdown(); // Verify metrics final List bulkRequestErrors = MetricsTestUtil.getMeasurementList( new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) .add(OpenSearchSink.BULKREQUEST_ERRORS).toString()); - MatcherAssert.assertThat(bulkRequestErrors.size(), equalTo(1)); + assertThat(bulkRequestErrors.size(), equalTo(1)); Assert.assertEquals(0.0, bulkRequestErrors.get(0).getValue(), 0); final List bulkRequestLatencies = MetricsTestUtil.getMeasurementList( new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) .add(OpenSearchSink.BULKREQUEST_LATENCY).toString()); - MatcherAssert.assertThat(bulkRequestLatencies.size(), equalTo(3)); + assertThat(bulkRequestLatencies.size(), equalTo(3)); // COUNT Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0); // TOTAL_TIME @@ -264,18 +265,18 @@ public void testOutputRawSpanDefault(final boolean estimateBulkSizeUsingCompress final List documentsSuccessMeasurements = MetricsTestUtil.getMeasurementList( new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) .add(BulkRetryStrategy.DOCUMENTS_SUCCESS).toString()); - MatcherAssert.assertThat(documentsSuccessMeasurements.size(), equalTo(1)); - MatcherAssert.assertThat(documentsSuccessMeasurements.get(0).getValue(), closeTo(2.0, 0)); + assertThat(documentsSuccessMeasurements.size(), equalTo(1)); + assertThat(documentsSuccessMeasurements.get(0).getValue(), closeTo(2.0, 0)); final List documentsSuccessFirstAttemptMeasurements = MetricsTestUtil.getMeasurementList( new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) .add(BulkRetryStrategy.DOCUMENTS_SUCCESS_FIRST_ATTEMPT).toString()); - MatcherAssert.assertThat(documentsSuccessFirstAttemptMeasurements.size(), equalTo(1)); - MatcherAssert.assertThat(documentsSuccessFirstAttemptMeasurements.get(0).getValue(), closeTo(2.0, 0)); + assertThat(documentsSuccessFirstAttemptMeasurements.size(), equalTo(1)); + assertThat(documentsSuccessFirstAttemptMeasurements.get(0).getValue(), closeTo(2.0, 0)); final List documentErrorsMeasurements = MetricsTestUtil.getMeasurementList( new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) .add(BulkRetryStrategy.DOCUMENT_ERRORS).toString()); - MatcherAssert.assertThat(documentErrorsMeasurements.size(), equalTo(1)); - MatcherAssert.assertThat(documentErrorsMeasurements.get(0).getValue(), closeTo(0.0, 0)); + assertThat(documentErrorsMeasurements.size(), equalTo(1)); + assertThat(documentErrorsMeasurements.get(0).getValue(), closeTo(0.0, 0)); /** * Metrics: Bulk Request Size in Bytes @@ -283,11 +284,11 @@ public void testOutputRawSpanDefault(final boolean estimateBulkSizeUsingCompress final List bulkRequestSizeBytesMetrics = MetricsTestUtil.getMeasurementList( new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) .add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString()); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3)); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0)); + assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3)); + assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0)); final double expectedBulkRequestSizeBytes = isRequestCompressionEnabled && estimateBulkSizeUsingCompression ? 773.0 : 2058.0; - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(expectedBulkRequestSizeBytes, 0)); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(expectedBulkRequestSizeBytes, 0)); + assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(expectedBulkRequestSizeBytes, 0)); + assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(expectedBulkRequestSizeBytes, 0)); } @DisabledIf(value = "isES6", disabledReason = TRACE_INGESTION_TEST_DISABLED_REASON) @@ -317,11 +318,11 @@ public void testOutputRawSpanWithDLQ(final boolean estimateBulkSizeUsingCompress final StringBuilder dlqContent = new StringBuilder(); Files.lines(Paths.get(expDLQFile)).forEach(dlqContent::append); final String nonPrettyJsonString = mapper.writeValueAsString(mapper.readValue(testDoc1, JsonNode.class)); - MatcherAssert.assertThat(dlqContent.toString(), containsString(nonPrettyJsonString)); + assertThat(dlqContent.toString(), containsString(nonPrettyJsonString)); final String expIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW); final List> retSources = getSearchResponseDocSources(expIndexAlias); - MatcherAssert.assertThat(retSources.size(), equalTo(1)); - MatcherAssert.assertThat(retSources.get(0), equalTo(expData)); + assertThat(retSources.size(), equalTo(1)); + assertThat(retSources.get(0), equalTo(expData)); // clean up temporary directory FileUtils.deleteQuietly(tempDirectory); @@ -330,13 +331,13 @@ public void testOutputRawSpanWithDLQ(final boolean estimateBulkSizeUsingCompress final List documentsSuccessMeasurements = MetricsTestUtil.getMeasurementList( new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) .add(BulkRetryStrategy.DOCUMENTS_SUCCESS).toString()); - MatcherAssert.assertThat(documentsSuccessMeasurements.size(), equalTo(1)); - MatcherAssert.assertThat(documentsSuccessMeasurements.get(0).getValue(), closeTo(1.0, 0)); + assertThat(documentsSuccessMeasurements.size(), equalTo(1)); + assertThat(documentsSuccessMeasurements.get(0).getValue(), closeTo(1.0, 0)); final List documentErrorsMeasurements = MetricsTestUtil.getMeasurementList( new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) .add(BulkRetryStrategy.DOCUMENT_ERRORS).toString()); - MatcherAssert.assertThat(documentErrorsMeasurements.size(), equalTo(1)); - MatcherAssert.assertThat(documentErrorsMeasurements.get(0).getValue(), closeTo(1.0, 0)); + assertThat(documentErrorsMeasurements.size(), equalTo(1)); + assertThat(documentErrorsMeasurements.get(0).getValue(), closeTo(1.0, 0)); /** * Metrics: Bulk Request Size in Bytes @@ -344,11 +345,11 @@ public void testOutputRawSpanWithDLQ(final boolean estimateBulkSizeUsingCompress final List bulkRequestSizeBytesMetrics = MetricsTestUtil.getMeasurementList( new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) .add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString()); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3)); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0)); + assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3)); + assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0)); final double expectedBulkRequestSizeBytes = isRequestCompressionEnabled && estimateBulkSizeUsingCompression ? 1066.0 : 2072.0; - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(expectedBulkRequestSizeBytes, 0)); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(expectedBulkRequestSizeBytes, 0)); + assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(expectedBulkRequestSizeBytes, 0)); + assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(expectedBulkRequestSizeBytes, 0)); } @@ -360,15 +361,15 @@ public void testInstantiateSinkServiceMapDefault() throws IOException { final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_SERVICE_MAP); final Request request = new Request(HttpMethod.HEAD, indexAlias); final Response response = client.performRequest(request); - MatcherAssert.assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); + assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); final Map mappings = getIndexMappings(indexAlias); - MatcherAssert.assertThat(mappings, notNullValue()); - MatcherAssert.assertThat((boolean) mappings.get("date_detection"), equalTo(false)); + assertThat(mappings, notNullValue()); + assertThat((boolean) mappings.get("date_detection"), equalTo(false)); sink.shutdown(); if (isOSBundle()) { // Check managed index - MatcherAssert.assertThat(getIndexPolicyId(indexAlias), nullValue()); + assertThat(getIndexPolicyId(indexAlias), nullValue()); } } @@ -387,18 +388,18 @@ public void testOutputServiceMapDefault(final boolean estimateBulkSizeUsingCompr sink.output(testRecords); final String expIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_SERVICE_MAP); final List> retSources = getSearchResponseDocSources(expIndexAlias); - MatcherAssert.assertThat(retSources.size(), equalTo(1)); - MatcherAssert.assertThat(retSources.get(0), equalTo(expData)); - MatcherAssert.assertThat(getDocumentCount(expIndexAlias, "_id", (String) expData.get("hashId")), equalTo(Integer.valueOf(1))); + assertThat(retSources.size(), equalTo(1)); + assertThat(retSources.get(0), equalTo(expData)); + assertThat(getDocumentCount(expIndexAlias, "_id", (String) expData.get("hashId")), equalTo(Integer.valueOf(1))); sink.shutdown(); // verify metrics final List bulkRequestLatencies = MetricsTestUtil.getMeasurementList( new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) .add(OpenSearchSink.BULKREQUEST_LATENCY).toString()); - MatcherAssert.assertThat(bulkRequestLatencies.size(), equalTo(3)); + assertThat(bulkRequestLatencies.size(), equalTo(3)); // COUNT - MatcherAssert.assertThat(bulkRequestLatencies.get(0).getValue(), closeTo(1.0, 0)); + assertThat(bulkRequestLatencies.get(0).getValue(), closeTo(1.0, 0)); /** * Metrics: Bulk Request Size in Bytes @@ -406,11 +407,11 @@ public void testOutputServiceMapDefault(final boolean estimateBulkSizeUsingCompr final List bulkRequestSizeBytesMetrics = MetricsTestUtil.getMeasurementList( new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) .add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString()); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3)); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0)); + assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3)); + assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0)); final double expectedBulkRequestSizeBytes = isRequestCompressionEnabled && estimateBulkSizeUsingCompression ? 366.0 : 265.0; - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(expectedBulkRequestSizeBytes, 0)); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(expectedBulkRequestSizeBytes, 0)); + assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(expectedBulkRequestSizeBytes, 0)); + assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(expectedBulkRequestSizeBytes, 0)); // Check restart for index already exists sink = createObjectUnderTest(pluginSetting, true); @@ -428,7 +429,7 @@ public void testInstantiateSinkCustomIndex_NoRollOver() throws IOException { OpenSearchIntegrationHelper.getVersion()) >= 0 ? INCLUDE_TYPE_NAME_FALSE_URI : ""; final Request request = new Request(HttpMethod.HEAD, testIndexAlias + extraURI); final Response response = client.performRequest(request); - MatcherAssert.assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); + assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); sink.shutdown(); // Check restart for index already exists @@ -436,32 +437,54 @@ public void testInstantiateSinkCustomIndex_NoRollOver() throws IOException { sink.shutdown(); } - @Test - @DisabledIf(value = "isES6", disabledReason = TRACE_INGESTION_TEST_DISABLED_REASON) - public void testInstantiateSinkCustomIndex_WithIsmPolicy() throws IOException { + @ParameterizedTest + @ArgumentsSource(CreateSingleWithTemplatesArgumentsProvider.class) + //@DisabledIf(value = "isES6", disabledReason = TRACE_INGESTION_TEST_DISABLED_REASON) + public void testInstantiateSinkCustomIndex_WithIsmPolicy( + final String templateType, + final String templateFile) throws IOException { final String indexAlias = "sink-custom-index-ism-test-alias"; final String testTemplateFile = Objects.requireNonNull( - getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile(); + getClass().getClassLoader().getResource(templateFile)).getFile(); final Map metadata = initializeConfigurationMetadata(null, indexAlias, testTemplateFile); metadata.put(IndexConfiguration.ISM_POLICY_FILE, TEST_CUSTOM_INDEX_POLICY_FILE); + metadata.put(IndexConfiguration.TEMPLATE_TYPE, templateType); final PluginSetting pluginSetting = generatePluginSettingByMetadata(metadata); + OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); final String extraURI = DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo( OpenSearchIntegrationHelper.getVersion()) >= 0 ? INCLUDE_TYPE_NAME_FALSE_URI : ""; Request request = new Request(HttpMethod.HEAD, indexAlias + extraURI); Response response = client.performRequest(request); - MatcherAssert.assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); + assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); final String index = String.format("%s-000001", indexAlias); final Map mappings = getIndexMappings(index); - MatcherAssert.assertThat(mappings, notNullValue()); - MatcherAssert.assertThat((boolean) mappings.get("date_detection"), equalTo(false)); + assertThat(mappings, notNullValue()); + assertThat((boolean) mappings.get("date_detection"), equalTo(false)); + sink.shutdown(); + JsonNode settings = getIndexSettings(index); + + assertThat(settings, notNullValue()); + JsonNode settingsIndexNode = settings.get("index"); + assertThat(settingsIndexNode, notNullValue()); + assertThat(settingsIndexNode.getNodeType(), equalTo(JsonNodeType.OBJECT)); + //assertThat(settingsIndexNode.get("opendistro.index_state_management.rollover_alias"), notNullValue()); + assertThat(settingsIndexNode.get("opendistro"), notNullValue()); + assertThat(settingsIndexNode.get("opendistro").getNodeType(), equalTo(JsonNodeType.OBJECT)); + JsonNode settingsIsmNode = settingsIndexNode.get("opendistro").get("index_state_management"); + assertThat(settingsIsmNode, notNullValue()); + assertThat(settingsIsmNode.getNodeType(), equalTo(JsonNodeType.OBJECT)); + assertThat(settingsIsmNode.get("rollover_alias"), notNullValue()); + assertThat(settingsIsmNode.get("rollover_alias").getNodeType(), equalTo(JsonNodeType.STRING)); + assertThat(settingsIsmNode.get("rollover_alias").textValue(), equalTo(indexAlias)); + final String expectedIndexPolicyName = indexAlias + "-policy"; if (isOSBundle()) { // Check managed index await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> { - MatcherAssert.assertThat(getIndexPolicyId(index), equalTo(expectedIndexPolicyName)); + assertThat(getIndexPolicyId(index), equalTo(expectedIndexPolicyName)); } ); } @@ -470,7 +493,7 @@ public void testInstantiateSinkCustomIndex_WithIsmPolicy() throws IOException { request = new Request(HttpMethod.POST, String.format("%s/_rollover", indexAlias)); request.setJsonEntity("{ \"conditions\" : { } }\n"); response = client.performRequest(request); - MatcherAssert.assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); + assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); // Instantiate sink again sink = createObjectUnderTest(pluginSetting, true); @@ -478,12 +501,12 @@ public void testInstantiateSinkCustomIndex_WithIsmPolicy() throws IOException { final String rolloverIndexName = String.format("%s-000002", indexAlias); request = new Request(HttpMethod.GET, rolloverIndexName + "/_alias"); response = client.performRequest(request); - MatcherAssert.assertThat(checkIsWriteIndex(EntityUtils.toString(response.getEntity()), indexAlias, rolloverIndexName), equalTo(true)); + assertThat(checkIsWriteIndex(EntityUtils.toString(response.getEntity()), indexAlias, rolloverIndexName), equalTo(true)); sink.shutdown(); if (isOSBundle()) { // Check managed index - MatcherAssert.assertThat(getIndexPolicyId(rolloverIndexName), equalTo(expectedIndexPolicyName)); + assertThat(getIndexPolicyId(rolloverIndexName), equalTo(expectedIndexPolicyName)); } } @@ -509,14 +532,14 @@ public void testInstantiateSinkDoesNotOverwriteNewerIndexTemplates( Request getTemplateRequest = new Request(HttpMethod.GET, "/" + templatePath + "/" + expectedIndexTemplateName + extraURI); Response getTemplateResponse = client.performRequest(getTemplateRequest); - MatcherAssert.assertThat(getTemplateResponse.getStatusLine().getStatusCode(), equalTo(SC_OK)); + assertThat(getTemplateResponse.getStatusLine().getStatusCode(), equalTo(SC_OK)); String responseBody = EntityUtils.toString(getTemplateResponse.getEntity()); @SuppressWarnings("unchecked") final Integer firstResponseVersion = extractVersionFunction.apply(createContentParser(XContentType.JSON.xContent(), responseBody).map(), expectedIndexTemplateName); - MatcherAssert.assertThat(firstResponseVersion, equalTo(Integer.valueOf(1))); + assertThat(firstResponseVersion, equalTo(Integer.valueOf(1))); sink.shutdown(); // Create sink with template version 2 @@ -526,14 +549,14 @@ public void testInstantiateSinkDoesNotOverwriteNewerIndexTemplates( getTemplateRequest = new Request(HttpMethod.GET, "/" + templatePath + "/" + expectedIndexTemplateName + extraURI); getTemplateResponse = client.performRequest(getTemplateRequest); - MatcherAssert.assertThat(getTemplateResponse.getStatusLine().getStatusCode(), equalTo(SC_OK)); + assertThat(getTemplateResponse.getStatusLine().getStatusCode(), equalTo(SC_OK)); responseBody = EntityUtils.toString(getTemplateResponse.getEntity()); @SuppressWarnings("unchecked") final Integer secondResponseVersion = extractVersionFunction.apply(createContentParser(XContentType.JSON.xContent(), responseBody).map(), expectedIndexTemplateName); - MatcherAssert.assertThat(secondResponseVersion, equalTo(Integer.valueOf(2))); + assertThat(secondResponseVersion, equalTo(Integer.valueOf(2))); sink.shutdown(); // Create sink with template version 1 again @@ -543,7 +566,7 @@ public void testInstantiateSinkDoesNotOverwriteNewerIndexTemplates( getTemplateRequest = new Request(HttpMethod.GET, "/" + templatePath + "/" + expectedIndexTemplateName + extraURI); getTemplateResponse = client.performRequest(getTemplateRequest); - MatcherAssert.assertThat(getTemplateResponse.getStatusLine().getStatusCode(), equalTo(SC_OK)); + assertThat(getTemplateResponse.getStatusLine().getStatusCode(), equalTo(SC_OK)); responseBody = EntityUtils.toString(getTemplateResponse.getEntity()); @SuppressWarnings("unchecked") final Integer thirdResponseVersion = @@ -551,7 +574,7 @@ public void testInstantiateSinkDoesNotOverwriteNewerIndexTemplates( responseBody).map(), expectedIndexTemplateName); // Assert version 2 was not overwritten by version 1 - MatcherAssert.assertThat(thirdResponseVersion, equalTo(Integer.valueOf(2))); + assertThat(thirdResponseVersion, equalTo(Integer.valueOf(2))); sink.shutdown(); } @@ -568,7 +591,7 @@ public Stream provideArguments(ExtensionContext context) { ) ); - if(OpenSearchIntegrationHelper.getVersion().compareTo(DeclaredOpenSearchVersion.OPENDISTRO_1_9) >= 0) { + if (OpenSearchIntegrationHelper.getVersion().compareTo(DeclaredOpenSearchVersion.OPENDISTRO_1_9) >= 0) { arguments.add( arguments("index-template", "_index_template", TEST_INDEX_TEMPLATE_V1_FILE, TEST_INDEX_TEMPLATE_V2_FILE, @@ -581,6 +604,19 @@ public Stream provideArguments(ExtensionContext context) { } } + static class CreateSingleWithTemplatesArgumentsProvider implements ArgumentsProvider { + @Override + public Stream provideArguments(ExtensionContext context) { + final List arguments = new ArrayList<>(); + arguments.add(arguments("v1", TEST_TEMPLATE_V1_FILE)); + + if (OpenSearchIntegrationHelper.getVersion().compareTo(DeclaredOpenSearchVersion.OPENDISTRO_1_9) >= 0) { + arguments.add(arguments("index-template", TEST_INDEX_TEMPLATE_V1_FILE)); + } + return arguments.stream(); + } + } + @Test public void testOutputCustomIndex() throws IOException, InterruptedException { final String testIndexAlias = "test-alias"; @@ -594,15 +630,15 @@ public void testOutputCustomIndex() throws IOException, InterruptedException { final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); sink.output(testRecords); final List> retSources = getSearchResponseDocSources(testIndexAlias); - MatcherAssert.assertThat(retSources.size(), equalTo(1)); - MatcherAssert.assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); + assertThat(retSources.size(), equalTo(1)); + assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); sink.shutdown(); // verify metrics final List bulkRequestLatencies = MetricsTestUtil.getMeasurementList( new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) .add(OpenSearchSink.BULKREQUEST_LATENCY).toString()); - MatcherAssert.assertThat(bulkRequestLatencies.size(), equalTo(3)); + assertThat(bulkRequestLatencies.size(), equalTo(3)); // COUNT Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0); } @@ -621,15 +657,15 @@ public void testOpenSearchBulkActionsCreate() throws IOException, InterruptedExc final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); sink.output(testRecords); final List> retSources = getSearchResponseDocSources(testIndexAlias); - MatcherAssert.assertThat(retSources.size(), equalTo(1)); - MatcherAssert.assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); + assertThat(retSources.size(), equalTo(1)); + assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); sink.shutdown(); // verify metrics final List bulkRequestLatencies = MetricsTestUtil.getMeasurementList( new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) .add(OpenSearchSink.BULKREQUEST_LATENCY).toString()); - MatcherAssert.assertThat(bulkRequestLatencies.size(), equalTo(3)); + assertThat(bulkRequestLatencies.size(), equalTo(3)); // COUNT Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0); } @@ -644,7 +680,7 @@ public void testOpenSearchBulkActionsCreateWithExpression() throws IOException, final List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId))); final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile); pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); - Event event = (Event)testRecords.get(0).getData(); + Event event = (Event) testRecords.get(0).getData(); event.getMetadata().setAttribute("action", "create"); when(expressionEvaluator.isValidExpressionStatement("getMetadata(\"action\")")).thenReturn(true); when(expressionEvaluator.evaluate("getMetadata(\"action\")", event)).thenReturn(event.getMetadata().getAttribute("action")); @@ -652,15 +688,15 @@ public void testOpenSearchBulkActionsCreateWithExpression() throws IOException, final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); sink.output(testRecords); final List> retSources = getSearchResponseDocSources(testIndexAlias); - MatcherAssert.assertThat(retSources.size(), equalTo(1)); - MatcherAssert.assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); + assertThat(retSources.size(), equalTo(1)); + assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); sink.shutdown(); // verify metrics final List bulkRequestLatencies = MetricsTestUtil.getMeasurementList( new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) .add(OpenSearchSink.BULKREQUEST_LATENCY).toString()); - MatcherAssert.assertThat(bulkRequestLatencies.size(), equalTo(3)); + assertThat(bulkRequestLatencies.size(), equalTo(3)); // COUNT Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0); } @@ -675,7 +711,7 @@ public void testOpenSearchBulkActionsCreateWithInvalidExpression() throws IOExce final List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId))); final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile); pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); - Event event = (Event)testRecords.get(0).getData(); + Event event = (Event) testRecords.get(0).getData(); event.getMetadata().setAttribute("action", "unknown"); when(expressionEvaluator.isValidExpressionStatement("getMetadata(\"action\")")).thenReturn(true); when(expressionEvaluator.evaluate("getMetadata(\"action\")", event)).thenReturn(event.getMetadata().getAttribute("action")); @@ -683,8 +719,8 @@ public void testOpenSearchBulkActionsCreateWithInvalidExpression() throws IOExce final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); sink.output(testRecords); final List> retSources = getSearchResponseDocSources(testIndexAlias); - MatcherAssert.assertThat(retSources.size(), equalTo(0)); - MatcherAssert.assertThat(sink.getInvalidActionErrorsCount(), equalTo(1.0)); + assertThat(retSources.size(), equalTo(0)); + assertThat(sink.getInvalidActionErrorsCount(), equalTo(1.0)); sink.shutdown(); } @@ -707,15 +743,15 @@ public void testBulkActionCreateWithActions() throws IOException, InterruptedExc final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); sink.output(testRecords); final List> retSources = getSearchResponseDocSources(testIndexAlias); - MatcherAssert.assertThat(retSources.size(), equalTo(1)); - MatcherAssert.assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); + assertThat(retSources.size(), equalTo(1)); + assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); sink.shutdown(); // verify metrics final List bulkRequestLatencies = MetricsTestUtil.getMeasurementList( new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) .add(OpenSearchSink.BULKREQUEST_LATENCY).toString()); - MatcherAssert.assertThat(bulkRequestLatencies.size(), equalTo(3)); + assertThat(bulkRequestLatencies.size(), equalTo(3)); // COUNT Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0); } @@ -740,15 +776,15 @@ public void testBulkActionUpdateWithActions() throws IOException, InterruptedExc OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); sink.output(testRecords); List> retSources = getSearchResponseDocSources(testIndexAlias); - MatcherAssert.assertThat(retSources.size(), equalTo(1)); - MatcherAssert.assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); + assertThat(retSources.size(), equalTo(1)); + assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); sink.shutdown(); // verify metrics final List bulkRequestLatencies = MetricsTestUtil.getMeasurementList( new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) .add(OpenSearchSink.BULKREQUEST_LATENCY).toString()); - MatcherAssert.assertThat(bulkRequestLatencies.size(), equalTo(3)); + assertThat(bulkRequestLatencies.size(), equalTo(3)); // COUNT Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0); testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson2(testIdField, testId, "name", "value2"))); @@ -761,10 +797,10 @@ public void testBulkActionUpdateWithActions() throws IOException, InterruptedExc sink.output(testRecords); retSources = getSearchResponseDocSources(testIndexAlias); - MatcherAssert.assertThat(retSources.size(), equalTo(1)); + assertThat(retSources.size(), equalTo(1)); Map source = retSources.get(0); - MatcherAssert.assertThat((String)source.get("name"), equalTo("value2")); - MatcherAssert.assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); + assertThat((String) source.get("name"), equalTo("value2")); + assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); sink.shutdown(); } @@ -788,15 +824,15 @@ public void testBulkActionUpsertWithActions() throws IOException, InterruptedExc OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); sink.output(testRecords); List> retSources = getSearchResponseDocSources(testIndexAlias); - MatcherAssert.assertThat(retSources.size(), equalTo(1)); - MatcherAssert.assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); + assertThat(retSources.size(), equalTo(1)); + assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); sink.shutdown(); // verify metrics final List bulkRequestLatencies = MetricsTestUtil.getMeasurementList( new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) .add(OpenSearchSink.BULKREQUEST_LATENCY).toString()); - MatcherAssert.assertThat(bulkRequestLatencies.size(), equalTo(3)); + assertThat(bulkRequestLatencies.size(), equalTo(3)); // COUNT Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0); testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson3(testIdField, testId, "name", "value3", "newKey", "newValue"))); @@ -809,11 +845,11 @@ public void testBulkActionUpsertWithActions() throws IOException, InterruptedExc sink.output(testRecords); retSources = getSearchResponseDocSources(testIndexAlias); - MatcherAssert.assertThat(retSources.size(), equalTo(1)); + assertThat(retSources.size(), equalTo(1)); Map source = retSources.get(0); - MatcherAssert.assertThat((String)source.get("name"), equalTo("value3")); - MatcherAssert.assertThat((String)source.get("newKey"), equalTo("newValue")); - MatcherAssert.assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); + assertThat((String) source.get("name"), equalTo("value3")); + assertThat((String) source.get("newKey"), equalTo("newValue")); + assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); sink.shutdown(); } @@ -837,17 +873,17 @@ public void testBulkActionUpsertWithoutCreate() throws IOException, InterruptedE sink.output(testRecords); List> retSources = getSearchResponseDocSources(testIndexAlias); - MatcherAssert.assertThat(retSources.size(), equalTo(1)); + assertThat(retSources.size(), equalTo(1)); Map source = retSources.get(0); - MatcherAssert.assertThat((String)source.get("name"), equalTo("value1")); - MatcherAssert.assertThat((String)source.get("newKey"), equalTo("newValue")); - MatcherAssert.assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); + assertThat((String) source.get("name"), equalTo("value1")); + assertThat((String) source.get("newKey"), equalTo("newValue")); + assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); sink.shutdown(); // verify metrics final List bulkRequestLatencies = MetricsTestUtil.getMeasurementList( new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) .add(OpenSearchSink.BULKREQUEST_LATENCY).toString()); - MatcherAssert.assertThat(bulkRequestLatencies.size(), equalTo(3)); + assertThat(bulkRequestLatencies.size(), equalTo(3)); // COUNT Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0); } @@ -872,7 +908,7 @@ public void testBulkActionDeleteWithActions() throws IOException, InterruptedExc OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); sink.output(testRecords); List> retSources = getSearchResponseDocSources(testIndexAlias); - MatcherAssert.assertThat(retSources.size(), equalTo(0)); + assertThat(retSources.size(), equalTo(0)); sink.shutdown(); } @@ -898,9 +934,9 @@ public void testEventOutputWithTags() throws IOException, InterruptedException { expectedContent.put("log", "foobar"); expectedContent.put(testTagsTargetKey, tagsList); - MatcherAssert.assertThat(retSources.size(), equalTo(1)); - MatcherAssert.assertThat(retSources.containsAll(Arrays.asList(expectedContent)), equalTo(true)); - MatcherAssert.assertThat(getDocumentCount(expIndexAlias, "log", "foobar"), equalTo(Integer.valueOf(1))); + assertThat(retSources.size(), equalTo(1)); + assertThat(retSources.containsAll(Arrays.asList(expectedContent)), equalTo(true)); + assertThat(getDocumentCount(expIndexAlias, "log", "foobar"), equalTo(Integer.valueOf(1))); sink.shutdown(); } @@ -924,9 +960,9 @@ public void testEventOutput() throws IOException, InterruptedException { final Map expectedContent = new HashMap<>(); expectedContent.put("log", "foobar"); - MatcherAssert.assertThat(retSources.size(), equalTo(1)); - MatcherAssert.assertThat(retSources.containsAll(Arrays.asList(expectedContent)), equalTo(true)); - MatcherAssert.assertThat(getDocumentCount(expIndexAlias, "log", "foobar"), equalTo(Integer.valueOf(1))); + assertThat(retSources.size(), equalTo(1)); + assertThat(retSources.containsAll(Arrays.asList(expectedContent)), equalTo(true)); + assertThat(getDocumentCount(expIndexAlias, "log", "foobar"), equalTo(Integer.valueOf(1))); sink.shutdown(); } @@ -950,7 +986,7 @@ public void testOpenSearchDocumentId(final String testDocumentIdField) throws IO final List docIds = getSearchResponseDocIds(testIndexAlias); for (String docId : docIds) { - MatcherAssert.assertThat(docId, equalTo(expectedId)); + assertThat(docId, equalTo(expectedId)); } sink.shutdown(); } @@ -975,7 +1011,7 @@ public void testOpenSearchRoutingField(final String testRoutingField) throws IOE final List routingFields = getSearchResponseRoutingFields(testIndexAlias); for (String routingField : routingFields) { - MatcherAssert.assertThat(routingField, equalTo(expectedRoutingField)); + assertThat(routingField, equalTo(expectedRoutingField)); } sink.shutdown(); } @@ -1002,8 +1038,8 @@ public void testOpenSearchDynamicIndex(final String testIndex) throws IOExceptio final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); sink.output(testRecords); final List> retSources = getSearchResponseDocSources(testIndexAlias); - MatcherAssert.assertThat(retSources.size(), equalTo(1)); - MatcherAssert.assertThat(retSources, hasItem(expectedMap)); + assertThat(retSources.size(), equalTo(1)); + assertThat(retSources, hasItem(expectedMap)); sink.shutdown(); } @@ -1035,8 +1071,8 @@ public void testOpenSearchDynamicIndexWithDate(final String testIndex, final Str final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); sink.output(testRecords); final List> retSources = getSearchResponseDocSources(expectedIndexAlias); - MatcherAssert.assertThat(retSources.size(), equalTo(1)); - MatcherAssert.assertThat(retSources, hasItem(expectedMap)); + assertThat(retSources.size(), equalTo(1)); + assertThat(retSources, hasItem(expectedMap)); sink.shutdown(); } @@ -1063,8 +1099,8 @@ public void testOpenSearchIndexWithDate(final String testDatePattern) throws IOE final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); sink.output(testRecords); final List> retSources = getSearchResponseDocSources(expectedIndexName); - MatcherAssert.assertThat(retSources.size(), equalTo(1)); - MatcherAssert.assertThat(retSources, hasItem(expectedMap)); + assertThat(retSources.size(), equalTo(1)); + assertThat(retSources, hasItem(expectedMap)); sink.shutdown(); } @@ -1118,15 +1154,15 @@ public void testOutputManagementDisabled() throws IOException, InterruptedExcept sink.output(testRecords); final List> retSources = getSearchResponseDocSources(testIndexAlias); - MatcherAssert.assertThat(retSources.size(), equalTo(1)); - MatcherAssert.assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); + assertThat(retSources.size(), equalTo(1)); + assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); sink.shutdown(); // verify metrics final List bulkRequestLatencies = MetricsTestUtil.getMeasurementList( new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) .add(OpenSearchSink.BULKREQUEST_LATENCY).toString()); - MatcherAssert.assertThat(bulkRequestLatencies.size(), equalTo(3)); + assertThat(bulkRequestLatencies.size(), equalTo(3)); // COUNT Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0); } @@ -1309,6 +1345,20 @@ private Map getIndexMappings(final String index) throws IOExcept return mappings; } + private JsonNode getIndexSettings(final String index) throws IOException { + final String extraURI = DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo( + OpenSearchIntegrationHelper.getVersion()) >= 0 ? INCLUDE_TYPE_NAME_FALSE_URI : ""; + final Request request = new Request(HttpMethod.GET, index + "/_settings" + extraURI); + final Response response = client.performRequest(request); + final String responseBody = EntityUtils.toString(response.getEntity()); + + Map responseMap = createContentParser(XContentType.JSON.xContent(), responseBody).map(); + + return new ObjectMapper().convertValue(responseMap, JsonNode.class) + .get(index) + .get("settings"); + } + private String getIndexPolicyId(final String index) throws IOException { // TODO: replace with new _opensearch API final Request request = new Request(HttpMethod.GET, "/_opendistro/_ism/explain/" + index); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ComposableIndexTemplate.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ComposableIndexTemplate.java index f7a42d278b..d94fcfe8cd 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ComposableIndexTemplate.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ComposableIndexTemplate.java @@ -6,7 +6,9 @@ import java.util.Map; import java.util.Optional; -public class ComposableIndexTemplate implements IndexTemplate { +class ComposableIndexTemplate implements IndexTemplate { + static final String TEMPLATE_KEY = "template"; + static final String INDEX_SETTINGS_KEY = "settings"; private final Map indexTemplateMap; private String name; @@ -18,7 +20,6 @@ public ComposableIndexTemplate(final Map indexTemplateMap) { @Override public void setTemplateName(final String name) { this.name = name; - } @Override @@ -28,7 +29,11 @@ public void setIndexPatterns(final List indexPatterns) { @Override public void putCustomSetting(final String name, final Object value) { + Map template = (Map) indexTemplateMap.computeIfAbsent(TEMPLATE_KEY, key -> new HashMap<>()); + + Map settings = (Map) template.computeIfAbsent(INDEX_SETTINGS_KEY, key -> new HashMap<>()); + settings.put(name, value); } @Override diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ComposableTemplateAPIWrapper.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ComposableTemplateAPIWrapper.java index 1b78cb9da5..d2a72a6e75 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ComposableTemplateAPIWrapper.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ComposableTemplateAPIWrapper.java @@ -1,26 +1,21 @@ package org.opensearch.dataprepper.plugins.sink.opensearch.index; -import com.fasterxml.jackson.databind.ObjectMapper; -import jakarta.json.stream.JsonParser; -import org.opensearch.client.json.JsonpDeserializer; -import org.opensearch.client.json.JsonpMapper; -import org.opensearch.client.json.ObjectBuilderDeserializer; -import org.opensearch.client.json.ObjectDeserializer; import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch._types.ErrorResponse; import org.opensearch.client.opensearch.indices.ExistsIndexTemplateRequest; import org.opensearch.client.opensearch.indices.GetIndexTemplateRequest; import org.opensearch.client.opensearch.indices.GetIndexTemplateResponse; -import org.opensearch.client.opensearch.indices.PutIndexTemplateRequest; -import org.opensearch.client.opensearch.indices.put_index_template.IndexTemplateMapping; +import org.opensearch.client.opensearch.indices.PutIndexTemplateResponse; +import org.opensearch.client.transport.Endpoint; import org.opensearch.client.transport.endpoints.BooleanResponse; +import org.opensearch.client.transport.endpoints.SimpleEndpoint; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Map; import java.util.Optional; public class ComposableTemplateAPIWrapper implements IndexTemplateAPIWrapper { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private final OpenSearchClient openSearchClient; public ComposableTemplateAPIWrapper(final OpenSearchClient openSearchClient) { @@ -34,19 +29,12 @@ public void putTemplate(final IndexTemplate indexTemplate) throws IOException { } final ComposableIndexTemplate composableIndexTemplate = (ComposableIndexTemplate) indexTemplate; - final String indexTemplateString = OBJECT_MAPPER.writeValueAsString( - composableIndexTemplate.getIndexTemplateMap()); + Map indexTemplateMap = composableIndexTemplate.getIndexTemplateMap(); - final ByteArrayInputStream byteIn = new ByteArrayInputStream( - indexTemplateString.getBytes(StandardCharsets.UTF_8)); - final JsonpMapper mapper = openSearchClient._transport().jsonpMapper(); - final JsonParser parser = mapper.jsonProvider().createParser(byteIn); - - final PutIndexTemplateRequest putIndexTemplateRequest = PutIndexTemplateRequestDeserializer - .getJsonpDeserializer(composableIndexTemplate.getName()) - .deserialize(parser, mapper); - - openSearchClient.indices().putIndexTemplate(putIndexTemplateRequest); + openSearchClient._transport().performRequest( + indexTemplateMap, + createEndpoint(composableIndexTemplate), + openSearchClient._transportOptions()); } @Override @@ -66,24 +54,15 @@ public Optional getTemplate(final String indexTemplate return Optional.of(openSearchClient.indices().getIndexTemplate(getRequest)); } - private static class PutIndexTemplateRequestDeserializer { - private static void setupPutIndexTemplateRequestDeserializer(final ObjectDeserializer objectDeserializer) { + private Endpoint, PutIndexTemplateResponse, ErrorResponse> createEndpoint(final ComposableIndexTemplate composableIndexTemplate) { + final String path = "/_index_template/" + composableIndexTemplate.getName(); - objectDeserializer.add(PutIndexTemplateRequest.Builder::name, JsonpDeserializer.stringDeserializer(), "name"); - objectDeserializer.add(PutIndexTemplateRequest.Builder::indexPatterns, JsonpDeserializer.arrayDeserializer(JsonpDeserializer.stringDeserializer()), - "index_patterns"); - objectDeserializer.add(PutIndexTemplateRequest.Builder::version, JsonpDeserializer.longDeserializer(), "version"); - objectDeserializer.add(PutIndexTemplateRequest.Builder::priority, JsonpDeserializer.integerDeserializer(), "priority"); - objectDeserializer.add(PutIndexTemplateRequest.Builder::composedOf, JsonpDeserializer.arrayDeserializer(JsonpDeserializer.stringDeserializer()), - "composed_of"); - objectDeserializer.add(PutIndexTemplateRequest.Builder::template, IndexTemplateMapping._DESERIALIZER, "template"); - } - - static JsonpDeserializer getJsonpDeserializer(final String name) { - return ObjectBuilderDeserializer - .lazy( - () -> new PutIndexTemplateRequest.Builder().name(name), - PutIndexTemplateRequestDeserializer::setupPutIndexTemplateRequestDeserializer); - } + return new SimpleEndpoint<>( + request -> "PUT", + request -> path, + request -> Collections.emptyMap(), + SimpleEndpoint.emptyMap(), + true, + PutIndexTemplateResponse._DESERIALIZER); } } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/LegacyIndexTemplate.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/LegacyIndexTemplate.java index 1d6e6be00e..4e54d4005c 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/LegacyIndexTemplate.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/LegacyIndexTemplate.java @@ -5,7 +5,7 @@ import java.util.Map; import java.util.Optional; -public class LegacyIndexTemplate implements IndexTemplate { +class LegacyIndexTemplate implements IndexTemplate { public static final String SETTINGS_KEY = "settings"; private final Map templateMap; diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ComposableIndexTemplateStrategyTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ComposableIndexTemplateStrategyTest.java index f9e0154007..7668d6fee2 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ComposableIndexTemplateStrategyTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ComposableIndexTemplateStrategyTest.java @@ -26,9 +26,11 @@ import java.util.UUID; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasKey; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -36,6 +38,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.sink.opensearch.index.ComposableIndexTemplate.INDEX_SETTINGS_KEY; +import static org.opensearch.dataprepper.plugins.sink.opensearch.index.ComposableIndexTemplate.TEMPLATE_KEY; @ExtendWith(MockitoExtension.class) class ComposableIndexTemplateStrategyTest { @@ -188,5 +192,90 @@ void getVersion_returns_version_from_root_map_when_provided_as_int() { assertThat(optionalVersion.isPresent(), equalTo(true)); assertThat(optionalVersion.get(), equalTo((long) version)); } + + @Test + void putCustomSetting_with_no_existing_template_adds_template_and_settings() { + final org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexTemplate indexTemplate = + createObjectUnderTest().createIndexTemplate(providedTemplateMap); + + String customKey = UUID.randomUUID().toString(); + String customValue = UUID.randomUUID().toString(); + + indexTemplate.putCustomSetting(customKey, customValue); + + Map indexTemplateMap = ((ComposableIndexTemplate) indexTemplate).getIndexTemplateMap(); + + assertThat(indexTemplateMap, hasKey(TEMPLATE_KEY)); + assertThat(indexTemplateMap.get(TEMPLATE_KEY), instanceOf(Map.class)); + Map templateMap = (Map) indexTemplateMap.get(TEMPLATE_KEY); + assertThat(templateMap, hasKey(INDEX_SETTINGS_KEY)); + assertThat(templateMap.get(INDEX_SETTINGS_KEY), instanceOf(Map.class)); + Map settingsMap = (Map) templateMap.get(INDEX_SETTINGS_KEY); + assertThat(settingsMap, hasKey(customKey)); + assertThat(settingsMap.get(customKey), equalTo(customValue)); + } + + @Test + void putCustomSetting_with_existing_template_adds_settings_to_that_template() { + String existingKey = UUID.randomUUID().toString(); + String existingValue = UUID.randomUUID().toString(); + Map template = new HashMap<>(); + template.put(existingKey, existingValue); + providedTemplateMap.put(TEMPLATE_KEY, template); + + final org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexTemplate indexTemplate = + createObjectUnderTest().createIndexTemplate(providedTemplateMap); + + String customKey = UUID.randomUUID().toString(); + String customValue = UUID.randomUUID().toString(); + + indexTemplate.putCustomSetting(customKey, customValue); + + Map indexTemplateMap = ((ComposableIndexTemplate) indexTemplate).getIndexTemplateMap(); + + assertThat(indexTemplateMap, hasKey(TEMPLATE_KEY)); + assertThat(indexTemplateMap.get(TEMPLATE_KEY), instanceOf(Map.class)); + Map templateMap = (Map) indexTemplateMap.get(TEMPLATE_KEY); + assertThat(templateMap, hasKey(INDEX_SETTINGS_KEY)); + assertThat(templateMap, hasKey(existingKey)); + assertThat(templateMap.get(existingKey), equalTo(existingValue)); + assertThat(templateMap.get(INDEX_SETTINGS_KEY), instanceOf(Map.class)); + Map settingsMap = (Map) templateMap.get(INDEX_SETTINGS_KEY); + assertThat(settingsMap, hasKey(customKey)); + assertThat(settingsMap.get(customKey), equalTo(customValue)); + } + + @Test + void putCustomSetting_with_existing_template_and_settings_puts_settings_to_that_settings() { + String existingKey = UUID.randomUUID().toString(); + String existingValue = UUID.randomUUID().toString(); + Map template = new HashMap<>(); + HashMap existingSettings = new HashMap<>(); + existingSettings.put(existingKey, existingValue); + template.put(INDEX_SETTINGS_KEY, existingSettings); + + providedTemplateMap.put(TEMPLATE_KEY, template); + + final org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexTemplate indexTemplate = + createObjectUnderTest().createIndexTemplate(providedTemplateMap); + + String customKey = UUID.randomUUID().toString(); + String customValue = UUID.randomUUID().toString(); + + indexTemplate.putCustomSetting(customKey, customValue); + + Map indexTemplateMap = ((ComposableIndexTemplate) indexTemplate).getIndexTemplateMap(); + + assertThat(indexTemplateMap, hasKey(TEMPLATE_KEY)); + assertThat(indexTemplateMap.get(TEMPLATE_KEY), instanceOf(Map.class)); + Map templateMap = (Map) indexTemplateMap.get(TEMPLATE_KEY); + assertThat(templateMap, hasKey(INDEX_SETTINGS_KEY)); + assertThat(templateMap.get(INDEX_SETTINGS_KEY), instanceOf(Map.class)); + Map settingsMap = (Map) templateMap.get(INDEX_SETTINGS_KEY); + assertThat(settingsMap, hasKey(customKey)); + assertThat(settingsMap.get(customKey), equalTo(customValue)); + assertThat(settingsMap, hasKey(existingKey)); + assertThat(settingsMap.get(existingKey), equalTo(existingValue)); + } } } \ No newline at end of file diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ComposableTemplateAPIWrapperTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ComposableTemplateAPIWrapperTest.java index d6040a017d..fe2829656d 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ComposableTemplateAPIWrapperTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ComposableTemplateAPIWrapperTest.java @@ -9,14 +9,17 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch._types.ErrorResponse; import org.opensearch.client.opensearch.indices.ExistsIndexTemplateRequest; import org.opensearch.client.opensearch.indices.GetIndexTemplateRequest; import org.opensearch.client.opensearch.indices.GetIndexTemplateResponse; import org.opensearch.client.opensearch.indices.OpenSearchIndicesClient; -import org.opensearch.client.opensearch.indices.PutIndexTemplateRequest; +import org.opensearch.client.opensearch.indices.PutIndexTemplateResponse; +import org.opensearch.client.transport.Endpoint; +import org.opensearch.client.transport.JsonEndpoint; import org.opensearch.client.transport.OpenSearchTransport; +import org.opensearch.client.transport.TransportOptions; import org.opensearch.client.transport.endpoints.BooleanResponse; -import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.PreSerializedJsonpMapper; import java.io.IOException; import java.util.Collections; @@ -28,11 +31,15 @@ import java.util.UUID; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasKey; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -47,11 +54,15 @@ class ComposableTemplateAPIWrapperTest { @Mock private OpenSearchTransport openSearchTransport; @Mock + private TransportOptions openSearchTransportOptions; + @Mock private GetIndexTemplateResponse getIndexTemplateResponse; @Mock private BooleanResponse booleanResponse; @Captor - private ArgumentCaptor putIndexTemplateRequestArgumentCaptor; + private ArgumentCaptor> putIndexTemplateRequestArgumentCaptor; + @Captor + private ArgumentCaptor, PutIndexTemplateResponse, ErrorResponse>> endpointArgumentCaptor; @Captor private ArgumentCaptor existsIndexTemplateRequestArgumentCaptor; @@ -76,7 +87,7 @@ void putTemplate_throws_if_template_is_not_ComposableIndexTemplate() { @Test void putTemplate_performs_putIndexTemplate_request() throws IOException { when(openSearchClient._transport()).thenReturn(openSearchTransport); - when(openSearchTransport.jsonpMapper()).thenReturn(new PreSerializedJsonpMapper()); + when(openSearchClient._transportOptions()).thenReturn(openSearchTransportOptions); final List indexPatterns = Collections.singletonList(UUID.randomUUID().toString()); final IndexTemplate indexTemplate = new ComposableIndexTemplate(new HashMap<>()); @@ -84,12 +95,25 @@ void putTemplate_performs_putIndexTemplate_request() throws IOException { indexTemplate.setIndexPatterns(indexPatterns); objectUnderTest.putTemplate(indexTemplate); - verify(openSearchIndicesClient).putIndexTemplate(putIndexTemplateRequestArgumentCaptor.capture()); + verify(openSearchTransport).performRequest( + putIndexTemplateRequestArgumentCaptor.capture(), + endpointArgumentCaptor.capture(), + eq(openSearchTransportOptions) + ); + + Map actualPutRequest = putIndexTemplateRequestArgumentCaptor.getValue(); - final PutIndexTemplateRequest actualPutRequest = putIndexTemplateRequestArgumentCaptor.getValue(); + assertThat(actualPutRequest.get("index_patterns"), equalTo(indexPatterns)); - assertThat(actualPutRequest.name(), equalTo(indexTemplateName)); - assertThat(actualPutRequest.indexPatterns(), equalTo(indexPatterns)); + Endpoint, PutIndexTemplateResponse, ErrorResponse> actualEndpoint = endpointArgumentCaptor.getValue(); + assertThat(actualEndpoint.method(null), equalTo("PUT")); + assertThat(actualEndpoint.requestUrl(null), equalTo("/_index_template/" + indexTemplateName)); + assertThat(actualEndpoint.queryParameters(null), equalTo(Collections.emptyMap())); + assertThat(actualEndpoint.headers(null), equalTo(Collections.emptyMap())); + assertThat(actualEndpoint.hasRequestBody(), equalTo(true)); + + assertThat(actualEndpoint, instanceOf(JsonEndpoint.class)); + assertThat(((JsonEndpoint)actualEndpoint).responseDeserializer(), equalTo(PutIndexTemplateResponse._DESERIALIZER)); } @Test @@ -139,18 +163,15 @@ void getExistingTemplate_should_return_template_if_template_exists() throws IOEx @Nested class IndexTemplateWithCreateTemplateTests { - private ArgumentCaptor putIndexTemplateRequestArgumentCaptor; private List indexPatterns; @BeforeEach void setUp() { - final OpenSearchTransport openSearchTransport = mock(OpenSearchTransport.class); when(openSearchClient._transport()).thenReturn(openSearchTransport); - when(openSearchTransport.jsonpMapper()).thenReturn(new PreSerializedJsonpMapper()); - - putIndexTemplateRequestArgumentCaptor = ArgumentCaptor.forClass(PutIndexTemplateRequest.class); indexPatterns = Collections.singletonList(UUID.randomUUID().toString()); + + when(openSearchClient._transportOptions()).thenReturn(openSearchTransportOptions); } @Test @@ -159,43 +180,64 @@ void putTemplate_with_setTemplateName_performs_putIndexTemplate_request() throws indexTemplate.setTemplateName(indexTemplateName); objectUnderTest.putTemplate(indexTemplate); - verify(openSearchIndicesClient).putIndexTemplate(putIndexTemplateRequestArgumentCaptor.capture()); - - final PutIndexTemplateRequest actualPutRequest = putIndexTemplateRequestArgumentCaptor.getValue(); - - assertThat(actualPutRequest.name(), equalTo(indexTemplateName)); - - assertThat(actualPutRequest.version(), nullValue()); - assertThat(actualPutRequest.indexPatterns(), notNullValue()); - assertThat(actualPutRequest.indexPatterns(), equalTo(Collections.emptyList())); - assertThat(actualPutRequest.template(), nullValue()); - assertThat(actualPutRequest.priority(), nullValue()); - assertThat(actualPutRequest.composedOf(), notNullValue()); - assertThat(actualPutRequest.composedOf(), equalTo(Collections.emptyList())); + verify(openSearchTransport).performRequest( + putIndexTemplateRequestArgumentCaptor.capture(), + endpointArgumentCaptor.capture(), + eq(openSearchTransportOptions) + ); + + final Map actualPutRequest = putIndexTemplateRequestArgumentCaptor.getValue(); + + final Endpoint, PutIndexTemplateResponse, ErrorResponse> actualEndpoint = endpointArgumentCaptor.getValue(); + assertThat(actualEndpoint.method(null), equalTo("PUT")); + assertThat(actualEndpoint.requestUrl(null), equalTo("/_index_template/" + indexTemplateName)); + assertThat(actualEndpoint.queryParameters(null), equalTo(Collections.emptyMap())); + assertThat(actualEndpoint.headers(null), equalTo(Collections.emptyMap())); + assertThat(actualEndpoint.hasRequestBody(), equalTo(true)); + + assertThat(actualEndpoint, instanceOf(JsonEndpoint.class)); + assertThat(((JsonEndpoint)actualEndpoint).responseDeserializer(), equalTo(PutIndexTemplateResponse._DESERIALIZER)); + + assertThat(actualPutRequest.get("version"), nullValue()); + assertThat(actualPutRequest.get("index_patterns"), nullValue()); + assertThat(actualPutRequest.get("template"), nullValue()); + assertThat(actualPutRequest.get("priority"), nullValue()); + assertThat(actualPutRequest.get("composed_of"), nullValue()); } @Test void putTemplate_with_setIndexPatterns_performs_putIndexTemplate_request() throws IOException { final List indexPatterns = Collections.singletonList(UUID.randomUUID().toString()); - final IndexConfiguration indexConfiguration = mock(IndexConfiguration.class); final IndexTemplate indexTemplate = new ComposableIndexTemplate(new HashMap<>()); indexTemplate.setTemplateName(indexTemplateName); indexTemplate.setIndexPatterns(indexPatterns); objectUnderTest.putTemplate(indexTemplate); - verify(openSearchIndicesClient).putIndexTemplate(putIndexTemplateRequestArgumentCaptor.capture()); + verify(openSearchTransport).performRequest( + putIndexTemplateRequestArgumentCaptor.capture(), + endpointArgumentCaptor.capture(), + eq(openSearchTransportOptions) + ); + + Map actualPutRequest = putIndexTemplateRequestArgumentCaptor.getValue(); + + assertThat(actualPutRequest.get("index_patterns"), equalTo(indexPatterns)); - final PutIndexTemplateRequest actualPutRequest = putIndexTemplateRequestArgumentCaptor.getValue(); + Endpoint, PutIndexTemplateResponse, ErrorResponse> actualEndpoint = endpointArgumentCaptor.getValue(); + assertThat(actualEndpoint.method(null), equalTo("PUT")); + assertThat(actualEndpoint.requestUrl(null), equalTo("/_index_template/" + indexTemplateName)); + assertThat(actualEndpoint.queryParameters(null), equalTo(Collections.emptyMap())); + assertThat(actualEndpoint.headers(null), equalTo(Collections.emptyMap())); + assertThat(actualEndpoint.hasRequestBody(), equalTo(true)); - assertThat(actualPutRequest.name(), equalTo(indexTemplateName)); - assertThat(actualPutRequest.indexPatterns(), equalTo(indexPatterns)); + assertThat(actualEndpoint, instanceOf(JsonEndpoint.class)); + assertThat(((JsonEndpoint)actualEndpoint).responseDeserializer(), equalTo(PutIndexTemplateResponse._DESERIALIZER)); - assertThat(actualPutRequest.version(), nullValue()); - assertThat(actualPutRequest.template(), nullValue()); - assertThat(actualPutRequest.priority(), nullValue()); - assertThat(actualPutRequest.composedOf(), notNullValue()); - assertThat(actualPutRequest.composedOf(), equalTo(Collections.emptyList())); + assertThat(actualPutRequest, not(hasKey("template"))); + assertThat(actualPutRequest, not(hasKey("priority"))); + assertThat(actualPutRequest, not(hasKey("composedOf"))); + assertThat(actualPutRequest, not(hasKey("template"))); } @Test @@ -205,7 +247,6 @@ void putTemplate_with_defined_template_values_performs_putIndexTemplate_request( final String numberOfShards = Integer.toString(random.nextInt(1000) + 100); final List composedOf = Collections.singletonList(UUID.randomUUID().toString()); - final IndexConfiguration indexConfiguration = mock(IndexConfiguration.class); final IndexTemplate indexTemplate = new ComposableIndexTemplate( Map.of("version", version, "priority", priority, @@ -220,21 +261,40 @@ void putTemplate_with_defined_template_values_performs_putIndexTemplate_request( indexTemplate.setIndexPatterns(indexPatterns); objectUnderTest.putTemplate(indexTemplate); - verify(openSearchIndicesClient).putIndexTemplate(putIndexTemplateRequestArgumentCaptor.capture()); - - final PutIndexTemplateRequest actualPutRequest = putIndexTemplateRequestArgumentCaptor.getValue(); - - assertThat(actualPutRequest.name(), equalTo(indexTemplateName)); - assertThat(actualPutRequest.indexPatterns(), equalTo(indexPatterns)); - assertThat(actualPutRequest.version(), equalTo(version)); - assertThat(actualPutRequest.priority(), equalTo(priority)); - assertThat(actualPutRequest.composedOf(), equalTo(composedOf)); - assertThat(actualPutRequest.template(), notNullValue()); - assertThat(actualPutRequest.template().mappings(), notNullValue()); - assertThat(actualPutRequest.template().mappings().dateDetection(), equalTo(true)); - assertThat(actualPutRequest.template().settings(), notNullValue()); - assertThat(actualPutRequest.template().settings().index(), notNullValue()); - assertThat(actualPutRequest.template().settings().index().numberOfShards(), equalTo(numberOfShards)); + verify(openSearchTransport).performRequest( + putIndexTemplateRequestArgumentCaptor.capture(), + endpointArgumentCaptor.capture(), + eq(openSearchTransportOptions) + ); + + final Map actualPutRequest = putIndexTemplateRequestArgumentCaptor.getValue(); + + assertThat(actualPutRequest.get("index_patterns"), equalTo(indexPatterns)); + + final Endpoint, PutIndexTemplateResponse, ErrorResponse> actualEndpoint = endpointArgumentCaptor.getValue(); + assertThat(actualEndpoint.method(null), equalTo("PUT")); + assertThat(actualEndpoint.requestUrl(null), equalTo("/_index_template/" + indexTemplateName)); + assertThat(actualEndpoint.queryParameters(null), equalTo(Collections.emptyMap())); + assertThat(actualEndpoint.headers(null), equalTo(Collections.emptyMap())); + assertThat(actualEndpoint.hasRequestBody(), equalTo(true)); + + assertThat(actualEndpoint, instanceOf(JsonEndpoint.class)); + assertThat(((JsonEndpoint)actualEndpoint).responseDeserializer(), equalTo(PutIndexTemplateResponse._DESERIALIZER)); + + assertThat(actualPutRequest, hasKey("template")); + assertThat(actualPutRequest.get("version"), equalTo(version)); + assertThat(actualPutRequest.get("priority"), equalTo(priority)); + assertThat(actualPutRequest.get("composed_of"), equalTo(composedOf)); + assertThat(actualPutRequest.get("template"), notNullValue()); + assertThat(actualPutRequest.get("template"), instanceOf(Map.class)); + Map actualTemplate = (Map) actualPutRequest.get("template"); + assertThat(actualTemplate.get("mappings"), instanceOf(Map.class)); + assertThat(((Map) actualTemplate.get("mappings")).get("date_detection"), equalTo(true)); + assertThat(actualTemplate.get("settings"), instanceOf(Map.class)); + Map actualSettings = (Map) actualTemplate.get("settings"); + assertThat(actualSettings.get("index"), instanceOf(Map.class)); + Map actualIndex = (Map) actualSettings.get("index"); + assertThat(actualIndex.get("number_of_shards"), equalTo(numberOfShards)); } } } \ No newline at end of file