diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java index afda5ba0e7449..133daa8cd6a68 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java @@ -144,7 +144,7 @@ public boolean deleteIndex(String index) throws IOException { public boolean deleteDocument(String index, String documentId) throws IOException { final DeleteRequest req = new DeleteRequest.Builder() - .index(config.getIndexName()) + .index(index) .id(documentId) .build(); @@ -156,7 +156,7 @@ public boolean deleteDocument(String index, String documentId) throws IOExceptio public boolean indexDocument(String index, String documentId, String documentSource) throws IOException { final Map mapped = objectMapper.readValue(documentSource, Map.class); final IndexRequest indexRequest = new IndexRequest.Builder<>() - .index(config.getIndexName()) + .index(index) .document(mapped) .id(documentId) .build(); diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java index 9a2cb4ab5658a..f1da6fd0c7e15 100644 --- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java +++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java @@ -28,6 +28,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import co.elastic.clients.transport.ElasticsearchTransport; import com.fasterxml.jackson.core.JsonParseException; @@ -43,6 +44,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.schema.GenericObject; @@ -152,6 +154,7 @@ public Object getNativeObject() { }); when(mockRecord.getSchema()).thenAnswer((Answer>>) invocation -> kvSchema); + when(mockRecord.getEventTime()).thenAnswer(invocation -> Optional.of(System.currentTimeMillis())); } @AfterMethod(alwaysRun = true) @@ -209,6 +212,16 @@ public final void send100Test() throws Exception { verify(mockRecord, times(100)).ack(); } + @Test + public final void send1WithFormattedIndexTest() throws Exception { + map.put("indexName", "test-formatted-index-%{+yyyy-MM-dd}"); + sink.open(map, mockSinkContext); + send(1); + verify(mockRecord, times(1)).ack(); + String value = getHitIdAtIndex("test-formatted-index-*", 0); + assertTrue(StringUtils.isNotBlank(value)); + } + @Test public final void sendNoSchemaTest() throws Exception {