From df4e18dff738e83ad2f2599860937cd106bc3016 Mon Sep 17 00:00:00 2001 From: Enno <458526+ennru@users.noreply.github.com> Date: Thu, 30 Nov 2023 09:24:09 +0100 Subject: [PATCH 1/4] Elasticsearch: keep path from base URI --- .../elasticsearch/impl/ElasticsearchSimpleFlowStage.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSimpleFlowStage.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSimpleFlowStage.scala index d2d722a722..c9bce43554 100644 --- a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSimpleFlowStage.scala +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSimpleFlowStage.scala @@ -79,7 +79,7 @@ private[elasticsearch] final class ElasticsearchSimpleFlowStage[T, C]( log.debug("Posting data to Elasticsearch: {}", json) if (json.nonEmpty) { - val uri = baseUri.withPath(Path(endpoint)) + val uri = baseUri.withPath(baseUri.path / endpoint) val request = HttpRequest(HttpMethods.POST) .withUri(uri) .withEntity(HttpEntity(NDJsonProtocol.`application/x-ndjson`, json)) From 4fe7a55b198ab346ab00cd9393c2cad02eb81307 Mon Sep 17 00:00:00 2001 From: Enno <458526+ennru@users.noreply.github.com> Date: Thu, 30 Nov 2023 09:30:50 +0100 Subject: [PATCH 2/4] remove import --- .../elasticsearch/impl/ElasticsearchSimpleFlowStage.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSimpleFlowStage.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSimpleFlowStage.scala index c9bce43554..3897a8788f 100644 --- a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSimpleFlowStage.scala +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSimpleFlowStage.scala @@ -6,7 +6,6 @@ package akka.stream.alpakka.elasticsearch.impl import akka.annotation.InternalApi import akka.http.scaladsl.HttpExt -import akka.http.scaladsl.model.Uri.Path import akka.http.scaladsl.model._ import akka.http.scaladsl.unmarshalling.Unmarshal import akka.stream.alpakka.elasticsearch._ From 984dcbee62fa379930cf9734d408921e04f6604b Mon Sep 17 00:00:00 2001 From: Enno <458526+ennru@users.noreply.github.com> Date: Thu, 30 Nov 2023 09:59:55 +0100 Subject: [PATCH 3/4] build endpoint path with DSL --- .../elasticsearch/impl/ElasticsearchSimpleFlowStage.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSimpleFlowStage.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSimpleFlowStage.scala index 3897a8788f..12ec50e3c8 100644 --- a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSimpleFlowStage.scala +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSimpleFlowStage.scala @@ -70,7 +70,9 @@ private[elasticsearch] final class ElasticsearchSimpleFlowStage[T, C]( override def onPull(): Unit = tryPull() override def onPush(): Unit = { - val endpoint = if (settings.allowExplicitIndex) "/_bulk" else s"/${elasticsearchParams.indexName}/_bulk" + val endpoint = + if (settings.allowExplicitIndex) baseUri.path / "_bulk" + else baseUri.path / elasticsearchParams.indexName / "_bulk" val (messages, resultsPassthrough) = grab(in) inflight = true val json: String = restApi.toJson(messages) @@ -78,7 +80,7 @@ private[elasticsearch] final class ElasticsearchSimpleFlowStage[T, C]( log.debug("Posting data to Elasticsearch: {}", json) if (json.nonEmpty) { - val uri = baseUri.withPath(baseUri.path / endpoint) + val uri = baseUri.withPath(endpoint) val request = HttpRequest(HttpMethods.POST) .withUri(uri) .withEntity(HttpEntity(NDJsonProtocol.`application/x-ndjson`, json)) From 36d85c2c8405499d488d913314667e2cd48adbac Mon Sep 17 00:00:00 2001 From: Enno <458526+ennru@users.noreply.github.com> Date: Fri, 1 Dec 2023 09:15:19 +0100 Subject: [PATCH 4/4] conditional slash --- .../elasticsearch/impl/ElasticsearchSimpleFlowStage.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSimpleFlowStage.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSimpleFlowStage.scala index 12ec50e3c8..2780b17972 100644 --- a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSimpleFlowStage.scala +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSimpleFlowStage.scala @@ -71,8 +71,8 @@ private[elasticsearch] final class ElasticsearchSimpleFlowStage[T, C]( override def onPush(): Unit = { val endpoint = - if (settings.allowExplicitIndex) baseUri.path / "_bulk" - else baseUri.path / elasticsearchParams.indexName / "_bulk" + if (settings.allowExplicitIndex) baseUri.path ?/ "_bulk" + else baseUri.path ?/ elasticsearchParams.indexName / "_bulk" val (messages, resultsPassthrough) = grab(in) inflight = true val json: String = restApi.toJson(messages)