Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support versionType and pipeline parameters in reindex request #3087

Merged
merged 3 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.sksamuel.elastic4s.requests.reindex

import com.sksamuel.elastic4s.handlers.reindex.ReindexBuilderFn
import com.sksamuel.elastic4s.requests.common.VersionType.ExternalGte
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers

class ReindexBuilderFnTest extends AnyFunSuite with Matchers {
import com.sksamuel.elastic4s.ElasticApi._

test("reindex content builder should support version type") {
val req = reindex("source", "target").versionType(ExternalGte)

ReindexBuilderFn(req).string shouldBe
"""{"source":{"index":["source"]},"dest":{"index":"target","version_type":"external_gte"}}""".stripMargin
}

test("reindex content builder should support pipeline") {
val req = reindex("source", "target").pipeline("my_pipeline")

ReindexBuilderFn(req).string shouldBe
"""{"source":{"index":["source"]},"dest":{"index":"target","pipeline":"my_pipeline"}}""".stripMargin
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ sealed trait VersionType
object VersionType {

def valueOf(str: String): VersionType = str.toLowerCase match {
case "external" => VersionType.External
case "externalgte" | "external_gte" => VersionType.ExternalGte
case _ => VersionType.Internal
case "external" | "externalgt" | "external_gt" => VersionType.External
case "externalgte" | "external_gte" => VersionType.ExternalGte
case _ => VersionType.Internal
}

case object External extends VersionType
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package com.sksamuel.elastic4s.requests.reindex

import com.sksamuel.elastic4s.ext.OptionImplicits._
import com.sksamuel.elastic4s.requests.common.{RefreshPolicy, Slice}
import com.sksamuel.elastic4s.requests.common.{RefreshPolicy, Slice, VersionType}
import com.sksamuel.elastic4s.requests.script.Script
import com.sksamuel.elastic4s.requests.searches.queries.Query
import com.sksamuel.elastic4s.{Index, Indexes}

import scala.concurrent.duration.FiniteDuration

case class ReindexRequest(sourceIndexes: Indexes,
Expand All @@ -30,7 +29,9 @@ case class ReindexRequest(sourceIndexes: Indexes,
size: Option[Int] = None,
createOnly: Option[Boolean] = None,
slices: Option[Int] = None,
slice: Option[Slice] = None) {
slice: Option[Slice] = None,
versionType: Option[VersionType] = None,
pipeline: Option[String] = None) {

def remote(uri: String): ReindexRequest = copy(remoteHost = Option(uri))
def remote(uri: String, user: String, pass: String): ReindexRequest =
Expand Down Expand Up @@ -73,4 +74,9 @@ case class ReindexRequest(sourceIndexes: Indexes,
def createOnly(createOnly: Boolean): ReindexRequest = copy(createOnly = createOnly.some)
def slice(slice: Slice): ReindexRequest = copy(slice = slice.some)
def slices(slices: Int): ReindexRequest = copy(slices = slices.some)

def versionType(versionType: String): ReindexRequest = this.versionType(VersionType.valueOf(versionType))
def versionType(versionType: VersionType): ReindexRequest = copy(versionType = versionType.some)

def pipeline(pipeline: String): ReindexRequest = copy(pipeline = pipeline.some)
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,15 @@ object ReindexBuilderFn {
builder.startObject("dest")
builder.field("index", request.targetIndex.name)

request.versionType.foreach(versionType => builder.field("version_type", handlers.VersionTypeHttpString(versionType)))

request.createOnly.foreach {
case true => builder.field("op_type", "create")
case false => builder.field("op_type", "index")
}

request.pipeline.foreach(builder.field("pipeline", _))

// end dest
builder.endObject()
}
Expand Down