Skip to content

Commit

Permalink
Correctly support custom properties in composable index templates in …
Browse files Browse the repository at this point in the history
…the OpenSearch sink. This resolves opensearch-project#3506.

Signed-off-by: David Venable <dlv@amazon.com>
  • Loading branch information
dlvenable committed Nov 6, 2023
1 parent 60f84ad commit a71731f
Show file tree
Hide file tree
Showing 7 changed files with 398 additions and 215 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ private DeclaredOpenSearchVersion(final Distribution distribution, final String
}

static DeclaredOpenSearchVersion parse(final String versionString) {
if(versionString == null) {
if(versionString == null || versionString.isEmpty()) {
return DEFAULT;
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import java.util.Map;
import java.util.Optional;

public class ComposableIndexTemplate implements IndexTemplate {
class ComposableIndexTemplate implements IndexTemplate {
static final String TEMPLATE_KEY = "template";
static final String INDEX_SETTINGS_KEY = "settings";

private final Map<String, Object> indexTemplateMap;
private String name;
Expand All @@ -18,7 +20,6 @@ public ComposableIndexTemplate(final Map<String, Object> indexTemplateMap) {
@Override
public void setTemplateName(final String name) {
this.name = name;

}

@Override
Expand All @@ -28,7 +29,11 @@ public void setIndexPatterns(final List<String> indexPatterns) {

@Override
public void putCustomSetting(final String name, final Object value) {
Map<String, Object> template = (Map<String, Object>) indexTemplateMap.computeIfAbsent(TEMPLATE_KEY, key -> new HashMap<>());

Map<String, Object> settings = (Map<String, Object>) template.computeIfAbsent(INDEX_SETTINGS_KEY, key -> new HashMap<>());

settings.put(name, value);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,21 @@
package org.opensearch.dataprepper.plugins.sink.opensearch.index;

import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.json.stream.JsonParser;
import org.opensearch.client.json.JsonpDeserializer;
import org.opensearch.client.json.JsonpMapper;
import org.opensearch.client.json.ObjectBuilderDeserializer;
import org.opensearch.client.json.ObjectDeserializer;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.ErrorResponse;
import org.opensearch.client.opensearch.indices.ExistsIndexTemplateRequest;
import org.opensearch.client.opensearch.indices.GetIndexTemplateRequest;
import org.opensearch.client.opensearch.indices.GetIndexTemplateResponse;
import org.opensearch.client.opensearch.indices.PutIndexTemplateRequest;
import org.opensearch.client.opensearch.indices.put_index_template.IndexTemplateMapping;
import org.opensearch.client.opensearch.indices.PutIndexTemplateResponse;
import org.opensearch.client.transport.Endpoint;
import org.opensearch.client.transport.endpoints.BooleanResponse;
import org.opensearch.client.transport.endpoints.SimpleEndpoint;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;

public class ComposableTemplateAPIWrapper implements IndexTemplateAPIWrapper<GetIndexTemplateResponse> {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final OpenSearchClient openSearchClient;

public ComposableTemplateAPIWrapper(final OpenSearchClient openSearchClient) {
Expand All @@ -34,19 +29,12 @@ public void putTemplate(final IndexTemplate indexTemplate) throws IOException {
}

final ComposableIndexTemplate composableIndexTemplate = (ComposableIndexTemplate) indexTemplate;
final String indexTemplateString = OBJECT_MAPPER.writeValueAsString(
composableIndexTemplate.getIndexTemplateMap());
Map<String, Object> indexTemplateMap = composableIndexTemplate.getIndexTemplateMap();

final ByteArrayInputStream byteIn = new ByteArrayInputStream(
indexTemplateString.getBytes(StandardCharsets.UTF_8));
final JsonpMapper mapper = openSearchClient._transport().jsonpMapper();
final JsonParser parser = mapper.jsonProvider().createParser(byteIn);

final PutIndexTemplateRequest putIndexTemplateRequest = PutIndexTemplateRequestDeserializer
.getJsonpDeserializer(composableIndexTemplate.getName())
.deserialize(parser, mapper);

openSearchClient.indices().putIndexTemplate(putIndexTemplateRequest);
openSearchClient._transport().performRequest(
indexTemplateMap,
createEndpoint(composableIndexTemplate),
openSearchClient._transportOptions());
}

@Override
Expand All @@ -66,24 +54,15 @@ public Optional<GetIndexTemplateResponse> getTemplate(final String indexTemplate
return Optional.of(openSearchClient.indices().getIndexTemplate(getRequest));
}

private static class PutIndexTemplateRequestDeserializer {
private static void setupPutIndexTemplateRequestDeserializer(final ObjectDeserializer<PutIndexTemplateRequest.Builder> objectDeserializer) {
private Endpoint<Map<String, Object>, PutIndexTemplateResponse, ErrorResponse> createEndpoint(final ComposableIndexTemplate composableIndexTemplate) {
final String path = "/_index_template/" + composableIndexTemplate.getName();

objectDeserializer.add(PutIndexTemplateRequest.Builder::name, JsonpDeserializer.stringDeserializer(), "name");
objectDeserializer.add(PutIndexTemplateRequest.Builder::indexPatterns, JsonpDeserializer.arrayDeserializer(JsonpDeserializer.stringDeserializer()),
"index_patterns");
objectDeserializer.add(PutIndexTemplateRequest.Builder::version, JsonpDeserializer.longDeserializer(), "version");
objectDeserializer.add(PutIndexTemplateRequest.Builder::priority, JsonpDeserializer.integerDeserializer(), "priority");
objectDeserializer.add(PutIndexTemplateRequest.Builder::composedOf, JsonpDeserializer.arrayDeserializer(JsonpDeserializer.stringDeserializer()),
"composed_of");
objectDeserializer.add(PutIndexTemplateRequest.Builder::template, IndexTemplateMapping._DESERIALIZER, "template");
}

static JsonpDeserializer<PutIndexTemplateRequest> getJsonpDeserializer(final String name) {
return ObjectBuilderDeserializer
.lazy(
() -> new PutIndexTemplateRequest.Builder().name(name),
PutIndexTemplateRequestDeserializer::setupPutIndexTemplateRequestDeserializer);
}
return new SimpleEndpoint<>(
request -> "PUT",
request -> path,
request -> Collections.emptyMap(),
SimpleEndpoint.emptyMap(),
true,
PutIndexTemplateResponse._DESERIALIZER);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import java.util.Map;
import java.util.Optional;

public class LegacyIndexTemplate implements IndexTemplate {
class LegacyIndexTemplate implements IndexTemplate {

public static final String SETTINGS_KEY = "settings";
private final Map<String, Object> templateMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,20 @@
import java.util.UUID;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasKey;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.sink.opensearch.index.ComposableIndexTemplate.INDEX_SETTINGS_KEY;
import static org.opensearch.dataprepper.plugins.sink.opensearch.index.ComposableIndexTemplate.TEMPLATE_KEY;

@ExtendWith(MockitoExtension.class)
class ComposableIndexTemplateStrategyTest {
Expand Down Expand Up @@ -188,5 +192,90 @@ void getVersion_returns_version_from_root_map_when_provided_as_int() {
assertThat(optionalVersion.isPresent(), equalTo(true));
assertThat(optionalVersion.get(), equalTo((long) version));
}

@Test
void putCustomSetting_with_no_existing_template_adds_template_and_settings() {
final org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexTemplate indexTemplate =
createObjectUnderTest().createIndexTemplate(providedTemplateMap);

String customKey = UUID.randomUUID().toString();
String customValue = UUID.randomUUID().toString();

indexTemplate.putCustomSetting(customKey, customValue);

Map<String, Object> indexTemplateMap = ((ComposableIndexTemplate) indexTemplate).getIndexTemplateMap();

assertThat(indexTemplateMap, hasKey(TEMPLATE_KEY));
assertThat(indexTemplateMap.get(TEMPLATE_KEY), instanceOf(Map.class));
Map<String, Object> templateMap = (Map<String, Object>) indexTemplateMap.get(TEMPLATE_KEY);
assertThat(templateMap, hasKey(INDEX_SETTINGS_KEY));
assertThat(templateMap.get(INDEX_SETTINGS_KEY), instanceOf(Map.class));
Map<String, Object> settingsMap = (Map<String, Object>) templateMap.get(INDEX_SETTINGS_KEY);
assertThat(settingsMap, hasKey(customKey));
assertThat(settingsMap.get(customKey), equalTo(customValue));
}

@Test
void putCustomSetting_with_existing_template_adds_settings_to_that_template() {
String existingKey = UUID.randomUUID().toString();
String existingValue = UUID.randomUUID().toString();
Map<String, Object> template = new HashMap<>();
template.put(existingKey, existingValue);
providedTemplateMap.put(TEMPLATE_KEY, template);

final org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexTemplate indexTemplate =
createObjectUnderTest().createIndexTemplate(providedTemplateMap);

String customKey = UUID.randomUUID().toString();
String customValue = UUID.randomUUID().toString();

indexTemplate.putCustomSetting(customKey, customValue);

Map<String, Object> indexTemplateMap = ((ComposableIndexTemplate) indexTemplate).getIndexTemplateMap();

assertThat(indexTemplateMap, hasKey(TEMPLATE_KEY));
assertThat(indexTemplateMap.get(TEMPLATE_KEY), instanceOf(Map.class));
Map<String, Object> templateMap = (Map<String, Object>) indexTemplateMap.get(TEMPLATE_KEY);
assertThat(templateMap, hasKey(INDEX_SETTINGS_KEY));
assertThat(templateMap, hasKey(existingKey));
assertThat(templateMap.get(existingKey), equalTo(existingValue));
assertThat(templateMap.get(INDEX_SETTINGS_KEY), instanceOf(Map.class));
Map<String, Object> settingsMap = (Map<String, Object>) templateMap.get(INDEX_SETTINGS_KEY);
assertThat(settingsMap, hasKey(customKey));
assertThat(settingsMap.get(customKey), equalTo(customValue));
}

@Test
void putCustomSetting_with_existing_template_and_settings_puts_settings_to_that_settings() {
String existingKey = UUID.randomUUID().toString();
String existingValue = UUID.randomUUID().toString();
Map<String, Object> template = new HashMap<>();
HashMap<Object, Object> existingSettings = new HashMap<>();
existingSettings.put(existingKey, existingValue);
template.put(INDEX_SETTINGS_KEY, existingSettings);

providedTemplateMap.put(TEMPLATE_KEY, template);

final org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexTemplate indexTemplate =
createObjectUnderTest().createIndexTemplate(providedTemplateMap);

String customKey = UUID.randomUUID().toString();
String customValue = UUID.randomUUID().toString();

indexTemplate.putCustomSetting(customKey, customValue);

Map<String, Object> indexTemplateMap = ((ComposableIndexTemplate) indexTemplate).getIndexTemplateMap();

assertThat(indexTemplateMap, hasKey(TEMPLATE_KEY));
assertThat(indexTemplateMap.get(TEMPLATE_KEY), instanceOf(Map.class));
Map<String, Object> templateMap = (Map<String, Object>) indexTemplateMap.get(TEMPLATE_KEY);
assertThat(templateMap, hasKey(INDEX_SETTINGS_KEY));
assertThat(templateMap.get(INDEX_SETTINGS_KEY), instanceOf(Map.class));
Map<String, Object> settingsMap = (Map<String, Object>) templateMap.get(INDEX_SETTINGS_KEY);
assertThat(settingsMap, hasKey(customKey));
assertThat(settingsMap.get(customKey), equalTo(customValue));
assertThat(settingsMap, hasKey(existingKey));
assertThat(settingsMap.get(existingKey), equalTo(existingValue));
}
}
}
Loading

0 comments on commit a71731f

Please sign in to comment.