From c9110a153efaa064bf2649eb3db377df23fd221d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Velimir=20Milinkovi=C4=87?= <81649656+mvelimir@users.noreply.github.com> Date: Thu, 23 Feb 2023 20:35:32 +0100 Subject: [PATCH] Use module pattern (remove execute method on requests) (#83) --- .../example/src/main/scala/example/Main.scala | 17 +- .../example/RepositoriesElasticsearch.scala | 38 +-- .../example/external/github/RepoFetcher.scala | 3 +- .../zio/elasticsearch/HttpExecutorSpec.scala | 245 ++++++++++-------- .../zio/elasticsearch/IntegrationSpec.scala | 7 +- .../zio/elasticsearch/ElasticExecutor.scala | 11 +- .../zio/elasticsearch/ElasticRequest.scala | 4 - .../zio/elasticsearch/Elasticsearch.scala | 35 +++ .../HttpElasticExecutorSpec.scala | 84 +++--- .../zio/elasticsearch/WireMockSpec.scala | 3 +- 10 files changed, 257 insertions(+), 190 deletions(-) create mode 100644 modules/library/src/main/scala/zio/elasticsearch/Elasticsearch.scala diff --git a/modules/example/src/main/scala/example/Main.scala b/modules/example/src/main/scala/example/Main.scala index de3a3e7fd..32de83ea2 100644 --- a/modules/example/src/main/scala/example/Main.scala +++ b/modules/example/src/main/scala/example/Main.scala @@ -23,7 +23,7 @@ import sttp.client3.SttpBackend import sttp.client3.httpclient.zio.HttpClientZioBackend import zio._ import zio.config.getConfig -import zio.elasticsearch.{ElasticConfig, ElasticExecutor, ElasticRequest} +import zio.elasticsearch.{ElasticConfig, ElasticExecutor, ElasticRequest, Elasticsearch} import zio.http.{Server, ServerConfig} import scala.io.Source @@ -38,25 +38,26 @@ object Main extends ZIOAppDefault { AppConfig.live, elasticConfigLive, ElasticExecutor.live, + Elasticsearch.layer, HttpClientZioBackend.layer() ) } - private[this] def prepare: RIO[SttpBackend[Task, Any] with ElasticExecutor, Unit] = { - val deleteIndex: RIO[ElasticExecutor, Unit] = + private[this] def prepare: RIO[SttpBackend[Task, Any] with Elasticsearch, Unit] = { + val deleteIndex: RIO[Elasticsearch, Unit] = for { _ <- ZIO.logInfo(s"Deleting index '$Index'...") - _ <- ElasticRequest.deleteIndex(Index).execute + _ <- Elasticsearch.execute(ElasticRequest.deleteIndex(Index)) } yield () - val createIndex: RIO[ElasticExecutor, Unit] = + val createIndex: RIO[Elasticsearch, Unit] = for { _ <- ZIO.logInfo(s"Creating index '$Index'...") mapping <- ZIO.fromTry(Using(Source.fromURL(getClass.getResource("/mapping.json")))(_.mkString)) - _ <- ElasticRequest.createIndex(Index, Some(mapping)).execute + _ <- Elasticsearch.execute(ElasticRequest.createIndex(Index, Some(mapping))) } yield () - val populate: RIO[SttpBackend[Task, Any] with ElasticExecutor, Unit] = + val populate: RIO[SttpBackend[Task, Any] with Elasticsearch, Unit] = (for { repositories <- RepoFetcher.fetchAllByOrganization(organization) _ <- ZIO.logInfo("Adding GitHub repositories...") @@ -66,7 +67,7 @@ object Main extends ZIOAppDefault { deleteIndex *> createIndex *> populate } - private[this] def runServer: RIO[HttpConfig with ElasticExecutor, ExitCode] = { + private[this] def runServer: RIO[HttpConfig with Elasticsearch, ExitCode] = { val serverConfigLive = ZLayer.fromFunction((http: HttpConfig) => ServerConfig.default.port(http.port)) (for { diff --git a/modules/example/src/main/scala/example/RepositoriesElasticsearch.scala b/modules/example/src/main/scala/example/RepositoriesElasticsearch.scala index f5294357b..8a84df0d1 100644 --- a/modules/example/src/main/scala/example/RepositoriesElasticsearch.scala +++ b/modules/example/src/main/scala/example/RepositoriesElasticsearch.scala @@ -22,58 +22,60 @@ import zio.elasticsearch.{ CreationOutcome, DeletionOutcome, DocumentId, - ElasticExecutor, ElasticQuery, ElasticRequest, + Elasticsearch, Routing } import zio.prelude.Newtype.unsafeWrap -final case class RepositoriesElasticsearch(executor: ElasticExecutor) { +final case class RepositoriesElasticsearch(elasticsearch: Elasticsearch) { def findAll(): Task[List[GitHubRepo]] = - executor.execute(ElasticRequest.search[GitHubRepo](Index, matchAll)) + elasticsearch.execute(ElasticRequest.search[GitHubRepo](Index, matchAll)) def findById(organization: String, id: String): Task[Option[GitHubRepo]] = for { routing <- routingOf(organization) - req = ElasticRequest.getById[GitHubRepo](Index, DocumentId(id)).routing(routing) - res <- executor.execute(req) + res <- elasticsearch.execute(ElasticRequest.getById[GitHubRepo](Index, DocumentId(id)).routing(routing)) } yield res def create(repository: GitHubRepo): Task[CreationOutcome] = for { routing <- routingOf(repository.organization) - req = ElasticRequest.create(Index, DocumentId(repository.id), repository).routing(routing).refreshTrue - res <- executor.execute(req) + res <- elasticsearch.execute( + ElasticRequest.create(Index, DocumentId(repository.id), repository).routing(routing).refreshTrue + ) } yield res def createAll(repositories: List[GitHubRepo]): Task[Unit] = for { routing <- routingOf(organization) - reqs = repositories.map { repository => - ElasticRequest.create[GitHubRepo](Index, unsafeWrap(DocumentId)(repository.id), repository) - } - bulkReq = ElasticRequest.bulk(reqs: _*).routing(routing) - _ <- executor.execute(bulkReq) + _ <- elasticsearch.execute( + ElasticRequest + .bulk(repositories.map { repository => + ElasticRequest.create[GitHubRepo](Index, unsafeWrap(DocumentId)(repository.id), repository) + }: _*) + .routing(routing) + ) } yield () def upsert(id: String, repository: GitHubRepo): Task[Unit] = for { routing <- routingOf(repository.organization) - req = ElasticRequest.upsert(Index, DocumentId(id), repository).routing(routing).refresh(value = true) - _ <- executor.execute(req) + _ <- elasticsearch.execute( + ElasticRequest.upsert(Index, DocumentId(id), repository).routing(routing).refresh(value = true) + ) } yield () def remove(organization: String, id: String): Task[DeletionOutcome] = for { routing <- routingOf(organization) - req = ElasticRequest.deleteById(Index, DocumentId(id)).routing(routing).refreshFalse - res <- executor.execute(req) + res <- elasticsearch.execute(ElasticRequest.deleteById(Index, DocumentId(id)).routing(routing).refreshFalse) } yield res def search(query: ElasticQuery[_]): Task[List[GitHubRepo]] = - executor.execute(ElasticRequest.search[GitHubRepo](Index, query)) + elasticsearch.execute(ElasticRequest.search[GitHubRepo](Index, query)) private def routingOf(value: String): IO[IllegalArgumentException, Routing.Type] = Routing.make(value).toZIO.mapError(e => new IllegalArgumentException(e)) @@ -103,6 +105,6 @@ object RepositoriesElasticsearch { def search(query: ElasticQuery[_]): RIO[RepositoriesElasticsearch, List[GitHubRepo]] = ZIO.serviceWithZIO[RepositoriesElasticsearch](_.search(query)) - lazy val live: URLayer[ElasticExecutor, RepositoriesElasticsearch] = + lazy val live: URLayer[Elasticsearch, RepositoriesElasticsearch] = ZLayer.fromFunction(RepositoriesElasticsearch(_)) } diff --git a/modules/example/src/main/scala/example/external/github/RepoFetcher.scala b/modules/example/src/main/scala/example/external/github/RepoFetcher.scala index 5cc75f336..0b9a6ee1f 100644 --- a/modules/example/src/main/scala/example/external/github/RepoFetcher.scala +++ b/modules/example/src/main/scala/example/external/github/RepoFetcher.scala @@ -30,8 +30,7 @@ object RepoFetcher { ): RIO[SttpBackend[Task, Any], List[GitHubRepo]] = for { client <- ZIO.service[SttpBackend[Task, Any]] - req = basicRequest.get(uri"https://api.github.com/orgs/$organization/repos?per_page=$limit") - res <- req.send(client) + res <- basicRequest.get(uri"https://api.github.com/orgs/$organization/repos?per_page=$limit").send(client) } yield res.body.toOption .map(_.fromJson[List[RepoResponse]].fold(_ => Nil, _.map(GitHubRepo.fromResponse).toList)) .getOrElse(Nil) diff --git a/modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala b/modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala index f380cdeba..39127376c 100644 --- a/modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala +++ b/modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala @@ -30,51 +30,53 @@ object HttpExecutorSpec extends IntegrationSpec { test("successfully create document") { checkOnce(genCustomer) { customer => for { - docId <- ElasticRequest.create[CustomerDocument](index, customer).execute - res <- ElasticRequest.getById[CustomerDocument](index, docId).execute + docId <- ElasticExecutor.execute(ElasticRequest.create[CustomerDocument](index, customer)) + res <- ElasticExecutor.execute(ElasticRequest.getById[CustomerDocument](index, docId)) } yield assert(res)(isSome(equalTo(customer))) } }, test("successfully create document with ID given") { checkOnce(genDocumentId, genCustomer) { (documentId, customer) => - assertZIO(ElasticRequest.create[CustomerDocument](index, documentId, customer).execute)(equalTo(Created)) + assertZIO(ElasticExecutor.execute(ElasticRequest.create[CustomerDocument](index, documentId, customer)))( + equalTo(Created) + ) } }, test("return 'AlreadyExists' if document with given ID already exists") { checkOnce(genDocumentId, genCustomer, genCustomer) { (documentId, customer1, customer2) => for { - _ <- ElasticRequest.upsert[CustomerDocument](index, documentId, customer1).execute - res <- ElasticRequest.create[CustomerDocument](index, documentId, customer2).execute + _ <- ElasticExecutor.execute(ElasticRequest.upsert[CustomerDocument](index, documentId, customer1)) + res <- ElasticExecutor.execute(ElasticRequest.create[CustomerDocument](index, documentId, customer2)) } yield assert(res)(equalTo(AlreadyExists)) } } ), suite("creating index")( test("successfully create index") { - assertZIO(ElasticRequest.createIndex(createIndexTestName, None).execute)(equalTo(Created)) + assertZIO(ElasticExecutor.execute(ElasticRequest.createIndex(createIndexTestName, None)))(equalTo(Created)) }, test("return 'AlreadyExists' if index already exists") { for { - _ <- ElasticRequest.createIndex(createIndexTestName, None).execute - res <- ElasticRequest.createIndex(createIndexTestName, None).execute + _ <- ElasticExecutor.execute(ElasticRequest.createIndex(createIndexTestName, None)) + res <- ElasticExecutor.execute(ElasticRequest.createIndex(createIndexTestName, None)) } yield assert(res)(equalTo(AlreadyExists)) } - ) @@ after(ElasticRequest.deleteIndex(createIndexTestName).execute.orDie), + ) @@ after(ElasticExecutor.execute(ElasticRequest.deleteIndex(createIndexTestName)).orDie), suite("creating or updating document")( test("successfully create document") { checkOnce(genDocumentId, genCustomer) { (documentId, customer) => for { - _ <- ElasticRequest.upsert[CustomerDocument](index, documentId, customer).execute - doc <- ElasticRequest.getById[CustomerDocument](index, documentId).execute + _ <- ElasticExecutor.execute(ElasticRequest.upsert[CustomerDocument](index, documentId, customer)) + doc <- ElasticExecutor.execute(ElasticRequest.getById[CustomerDocument](index, documentId)) } yield assert(doc)(isSome(equalTo(customer))) } }, test("successfully update document") { checkOnce(genDocumentId, genCustomer, genCustomer) { (documentId, firstCustomer, secondCustomer) => for { - _ <- ElasticRequest.create[CustomerDocument](index, documentId, firstCustomer).execute - _ <- ElasticRequest.upsert[CustomerDocument](index, documentId, secondCustomer).execute - doc <- ElasticRequest.getById[CustomerDocument](index, documentId).execute + _ <- ElasticExecutor.execute(ElasticRequest.create[CustomerDocument](index, documentId, firstCustomer)) + _ <- ElasticExecutor.execute(ElasticRequest.upsert[CustomerDocument](index, documentId, secondCustomer)) + doc <- ElasticExecutor.execute(ElasticRequest.getById[CustomerDocument](index, documentId)) } yield assert(doc)(isSome(equalTo(secondCustomer))) } } @@ -83,14 +85,14 @@ object HttpExecutorSpec extends IntegrationSpec { test("successfully delete existing document") { checkOnce(genDocumentId, genCustomer) { (documentId, customer) => for { - _ <- ElasticRequest.upsert[CustomerDocument](index, documentId, customer).execute - res <- ElasticRequest.deleteById(index, documentId).execute + _ <- ElasticExecutor.execute(ElasticRequest.upsert[CustomerDocument](index, documentId, customer)) + res <- ElasticExecutor.execute(ElasticRequest.deleteById(index, documentId)) } yield assert(res)(equalTo(Deleted)) } }, test("return 'NotFound' if the document does not exist") { checkOnce(genDocumentId) { documentId => - assertZIO(ElasticRequest.deleteById(index, documentId).execute)(equalTo(NotFound)) + assertZIO(ElasticExecutor.execute(ElasticRequest.deleteById(index, documentId)))(equalTo(NotFound)) } } ), @@ -98,14 +100,14 @@ object HttpExecutorSpec extends IntegrationSpec { test("successfully delete existing index") { checkOnce(genIndexName) { name => for { - _ <- ElasticRequest.createIndex(name, None).execute - res <- ElasticRequest.deleteIndex(name).execute + _ <- ElasticExecutor.execute(ElasticRequest.createIndex(name, None)) + res <- ElasticExecutor.execute(ElasticRequest.deleteIndex(name)) } yield assert(res)(equalTo(Deleted)) } }, test("return 'NotFound' if index does not exists") { checkOnce(genIndexName) { name => - assertZIO(ElasticRequest.deleteIndex(name).execute)(equalTo(NotFound)) + assertZIO(ElasticExecutor.execute(ElasticRequest.deleteIndex(name)))(equalTo(NotFound)) } } ), @@ -113,14 +115,14 @@ object HttpExecutorSpec extends IntegrationSpec { test("return true if the document exists") { checkOnce(genDocumentId, genCustomer) { (documentId, customer) => for { - _ <- ElasticRequest.upsert[CustomerDocument](index, documentId, customer).execute - res <- ElasticRequest.exists(index, documentId).execute + _ <- ElasticExecutor.execute(ElasticRequest.upsert[CustomerDocument](index, documentId, customer)) + res <- ElasticExecutor.execute(ElasticRequest.exists(index, documentId)) } yield assert(res)(isTrue) } }, test("return false if the document does not exist") { checkOnce(genDocumentId) { documentId => - assertZIO(ElasticRequest.exists(index, documentId).execute)(isFalse) + assertZIO(ElasticExecutor.execute(ElasticRequest.exists(index, documentId)))(isFalse) } } ), @@ -128,21 +130,21 @@ object HttpExecutorSpec extends IntegrationSpec { test("successfully return document") { checkOnce(genDocumentId, genCustomer) { (documentId, customer) => for { - _ <- ElasticRequest.upsert[CustomerDocument](index, documentId, customer).execute - res <- ElasticRequest.getById[CustomerDocument](index, documentId).execute + _ <- ElasticExecutor.execute(ElasticRequest.upsert[CustomerDocument](index, documentId, customer)) + res <- ElasticExecutor.execute(ElasticRequest.getById[CustomerDocument](index, documentId)) } yield assert(res)(isSome(equalTo(customer))) } }, test("return None if the document does not exist") { checkOnce(genDocumentId) { documentId => - assertZIO(ElasticRequest.getById[CustomerDocument](index, documentId).execute)(isNone) + assertZIO(ElasticExecutor.execute(ElasticRequest.getById[CustomerDocument](index, documentId)))(isNone) } }, test("fail with throwable if decoding fails") { checkOnce(genDocumentId, genEmployee) { (documentId, employee) => val result = for { - _ <- ElasticRequest.upsert[EmployeeDocument](index, documentId, employee).execute - res <- ElasticRequest.getById[CustomerDocument](index, documentId).execute + _ <- ElasticExecutor.execute(ElasticRequest.upsert[EmployeeDocument](index, documentId, employee)) + res <- ElasticExecutor.execute(ElasticRequest.getById[CustomerDocument](index, documentId)) } yield res assertZIO(result.exit)( @@ -156,36 +158,42 @@ object HttpExecutorSpec extends IntegrationSpec { checkOnce(genDocumentId, genCustomer, genDocumentId, genCustomer) { (firstDocumentId, firstCustomer, secondDocumentId, secondCustomer) => for { - _ <- ElasticRequest.deleteByQuery(firstSearchIndex, matchAll).execute + _ <- ElasticExecutor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll)) _ <- - ElasticRequest.upsert[CustomerDocument](firstSearchIndex, firstDocumentId, firstCustomer).execute + ElasticExecutor.execute( + ElasticRequest.upsert[CustomerDocument](firstSearchIndex, firstDocumentId, firstCustomer) + ) _ <- - ElasticRequest - .upsert[CustomerDocument](firstSearchIndex, secondDocumentId, secondCustomer) - .refreshTrue - .execute + ElasticExecutor.execute( + ElasticRequest + .upsert[CustomerDocument](firstSearchIndex, secondDocumentId, secondCustomer) + .refreshTrue + ) query = range("balance").gte(100) - res <- ElasticRequest.search[CustomerDocument](firstSearchIndex, query).execute + res <- ElasticExecutor.execute(ElasticRequest.search[CustomerDocument](firstSearchIndex, query)) } yield assert(res)(isNonEmpty) } } @@ around( - ElasticRequest.createIndex(firstSearchIndex, None).execute, - ElasticRequest.deleteIndex(firstSearchIndex).execute.orDie + ElasticExecutor.execute(ElasticRequest.createIndex(firstSearchIndex, None)), + ElasticExecutor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie ), test("fail if any of results cannot be decoded") { checkOnce(genDocumentId, genDocumentId, genEmployee, genCustomer) { (employeeDocumentId, customerDocumentId, employee, customer) => val result = for { - _ <- ElasticRequest.deleteByQuery(secondSearchIndex, matchAll).execute + _ <- ElasticExecutor.execute(ElasticRequest.deleteByQuery(secondSearchIndex, matchAll)) _ <- - ElasticRequest.upsert[CustomerDocument](secondSearchIndex, customerDocumentId, customer).execute - _ <- ElasticRequest - .upsert[EmployeeDocument](secondSearchIndex, employeeDocumentId, employee) - .refreshTrue - .execute + ElasticExecutor.execute( + ElasticRequest.upsert[CustomerDocument](secondSearchIndex, customerDocumentId, customer) + ) + _ <- ElasticExecutor.execute( + ElasticRequest + .upsert[EmployeeDocument](secondSearchIndex, employeeDocumentId, employee) + .refreshTrue + ) query = range("age").gte(0) - res <- ElasticRequest.search[CustomerDocument](secondSearchIndex, query).execute + res <- ElasticExecutor.execute(ElasticRequest.search[CustomerDocument](secondSearchIndex, query)) } yield res assertZIO(result.exit)( @@ -197,69 +205,78 @@ object HttpExecutorSpec extends IntegrationSpec { ) } } @@ around( - ElasticRequest.createIndex(secondSearchIndex, None).execute, - ElasticRequest.deleteIndex(secondSearchIndex).execute.orDie + ElasticExecutor.execute(ElasticRequest.createIndex(secondSearchIndex, None)), + ElasticExecutor.execute(ElasticRequest.deleteIndex(secondSearchIndex)).orDie ), test("search for a document which contains a term using a wildcard query") { checkOnce(genDocumentId, genCustomer, genDocumentId, genCustomer) { (firstDocumentId, firstCustomer, secondDocumentId, secondCustomer) => for { - _ <- ElasticRequest.deleteByQuery(firstSearchIndex, matchAll).execute + _ <- ElasticExecutor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll)) _ <- - ElasticRequest.upsert[CustomerDocument](firstSearchIndex, firstDocumentId, firstCustomer).execute + ElasticExecutor.execute( + ElasticRequest.upsert[CustomerDocument](firstSearchIndex, firstDocumentId, firstCustomer) + ) _ <- - ElasticRequest - .upsert[CustomerDocument](firstSearchIndex, secondDocumentId, secondCustomer) - .refreshTrue - .execute + ElasticExecutor.execute( + ElasticRequest + .upsert[CustomerDocument](firstSearchIndex, secondDocumentId, secondCustomer) + .refreshTrue + ) query = ElasticQuery.contains("name.keyword", firstCustomer.name.take(3)) - res <- ElasticRequest.search[CustomerDocument](firstSearchIndex, query).execute + res <- ElasticExecutor.execute(ElasticRequest.search[CustomerDocument](firstSearchIndex, query)) } yield assert(res)(Assertion.contains(firstCustomer)) } } @@ around( - ElasticRequest.createIndex(firstSearchIndex, None).execute, - ElasticRequest.deleteIndex(firstSearchIndex).execute.orDie + ElasticExecutor.execute(ElasticRequest.createIndex(firstSearchIndex, None)), + ElasticExecutor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie ), test("search for a document which starts with a term using a wildcard query") { checkOnce(genDocumentId, genCustomer, genDocumentId, genCustomer) { (firstDocumentId, firstCustomer, secondDocumentId, secondCustomer) => for { - _ <- ElasticRequest.deleteByQuery(firstSearchIndex, matchAll).execute + _ <- ElasticExecutor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll)) _ <- - ElasticRequest.upsert[CustomerDocument](firstSearchIndex, firstDocumentId, firstCustomer).execute + ElasticExecutor.execute( + ElasticRequest.upsert[CustomerDocument](firstSearchIndex, firstDocumentId, firstCustomer) + ) _ <- - ElasticRequest - .upsert[CustomerDocument](firstSearchIndex, secondDocumentId, secondCustomer) - .refreshTrue - .execute + ElasticExecutor.execute( + ElasticRequest + .upsert[CustomerDocument](firstSearchIndex, secondDocumentId, secondCustomer) + .refreshTrue + ) query = ElasticQuery.startsWith("name.keyword", firstCustomer.name.take(3)) - res <- ElasticRequest.search[CustomerDocument](firstSearchIndex, query).execute + res <- ElasticExecutor.execute(ElasticRequest.search[CustomerDocument](firstSearchIndex, query)) } yield assert(res)(Assertion.contains(firstCustomer)) } } @@ around( - ElasticRequest.createIndex(firstSearchIndex, None).execute, - ElasticRequest.deleteIndex(firstSearchIndex).execute.orDie + ElasticExecutor.execute(ElasticRequest.createIndex(firstSearchIndex, None)), + ElasticExecutor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie ), test("search for a document which conforms to a pattern using a wildcard query") { checkOnce(genDocumentId, genCustomer, genDocumentId, genCustomer) { (firstDocumentId, firstCustomer, secondDocumentId, secondCustomer) => for { - _ <- ElasticRequest.deleteByQuery(firstSearchIndex, matchAll).execute + _ <- ElasticExecutor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll)) _ <- - ElasticRequest.upsert[CustomerDocument](firstSearchIndex, firstDocumentId, firstCustomer).execute + ElasticExecutor.execute( + ElasticRequest.upsert[CustomerDocument](firstSearchIndex, firstDocumentId, firstCustomer) + ) _ <- - ElasticRequest - .upsert[CustomerDocument](firstSearchIndex, secondDocumentId, secondCustomer) - .refreshTrue - .execute + ElasticExecutor.execute( + ElasticRequest + .upsert[CustomerDocument](firstSearchIndex, secondDocumentId, secondCustomer) + .refreshTrue + ) query = wildcard("name.keyword", s"${firstCustomer.name.take(2)}*${firstCustomer.name.takeRight(2)}") - res <- ElasticRequest.search[CustomerDocument](firstSearchIndex, query).execute + res <- ElasticExecutor.execute(ElasticRequest.search[CustomerDocument](firstSearchIndex, query)) } yield assert(res)(Assertion.contains(firstCustomer)) } } @@ around( - ElasticRequest.createIndex(firstSearchIndex, None).execute, - ElasticRequest.deleteIndex(firstSearchIndex).execute.orDie + ElasticExecutor.execute(ElasticRequest.createIndex(firstSearchIndex, None)), + ElasticExecutor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie ) ) @@ shrinks(0), suite("deleting by query")( @@ -267,42 +284,48 @@ object HttpExecutorSpec extends IntegrationSpec { checkOnce(genDocumentId, genCustomer, genDocumentId, genCustomer, genDocumentId, genCustomer) { (firstDocumentId, firstCustomer, secondDocumentId, secondCustomer, thirdDocumentId, thirdCustomer) => for { - _ <- ElasticRequest - .upsert[CustomerDocument]( - deleteByQueryIndex, - firstDocumentId, - firstCustomer.copy(balance = 150) - ) - .execute + _ <- ElasticExecutor.execute( + ElasticRequest + .upsert[CustomerDocument]( + deleteByQueryIndex, + firstDocumentId, + firstCustomer.copy(balance = 150) + ) + ) _ <- - ElasticRequest - .upsert[CustomerDocument]( - deleteByQueryIndex, - secondDocumentId, - secondCustomer.copy(balance = 350) - ) - .execute + ElasticExecutor.execute( + ElasticRequest + .upsert[CustomerDocument]( + deleteByQueryIndex, + secondDocumentId, + secondCustomer.copy(balance = 350) + ) + ) _ <- - ElasticRequest - .upsert[CustomerDocument]( - deleteByQueryIndex, - thirdDocumentId, - thirdCustomer.copy(balance = 400) - ) - .refreshTrue - .execute + ElasticExecutor.execute( + ElasticRequest + .upsert[CustomerDocument]( + deleteByQueryIndex, + thirdDocumentId, + thirdCustomer.copy(balance = 400) + ) + .refreshTrue + ) deleteQuery = range("balance").gte(300) - _ <- ElasticRequest.deleteByQuery(deleteByQueryIndex, deleteQuery).refreshTrue.execute - res <- ElasticRequest.search[CustomerDocument](deleteByQueryIndex, matchAll).execute + _ <- + ElasticExecutor.execute(ElasticRequest.deleteByQuery(deleteByQueryIndex, deleteQuery).refreshTrue) + res <- ElasticExecutor.execute(ElasticRequest.search[CustomerDocument](deleteByQueryIndex, matchAll)) } yield assert(res)(hasSameElements(List(firstCustomer.copy(balance = 150)))) } } @@ around( - ElasticRequest.createIndex(deleteByQueryIndex, None).execute, - ElasticRequest.deleteIndex(deleteByQueryIndex).execute.orDie + ElasticExecutor.execute(ElasticRequest.createIndex(deleteByQueryIndex, None)), + ElasticExecutor.execute(ElasticRequest.deleteIndex(deleteByQueryIndex)).orDie ), test("returns NotFound when provided index is missing") { checkOnce(genIndexName) { missingIndex => - assertZIO(ElasticRequest.deleteByQuery(missingIndex, matchAll).execute)(equalTo(NotFound)) + assertZIO(ElasticExecutor.execute(ElasticRequest.deleteByQuery(missingIndex, matchAll)))( + equalTo(NotFound) + ) } } ), @@ -311,29 +334,29 @@ object HttpExecutorSpec extends IntegrationSpec { checkOnce(genDocumentId, genDocumentId, genDocumentId, genCustomer) { (firstDocId, secondDocId, thirdDocId, customer) => for { - _ <- ElasticRequest - .create[CustomerDocument](index, firstDocId, customer.copy(id = "randomIdString")) - .execute - _ <- ElasticRequest - .create[CustomerDocument](index, secondDocId, customer.copy(id = "randomIdString2")) - .refreshTrue - .execute + _ <- ElasticExecutor.execute( + ElasticRequest + .create[CustomerDocument](index, firstDocId, customer.copy(id = "randomIdString")) + ) + _ <- ElasticExecutor.execute( + ElasticRequest + .create[CustomerDocument](index, secondDocId, customer.copy(id = "randomIdString2")) + .refreshTrue + ) req1 = ElasticRequest.create[CustomerDocument](index, thirdDocId, customer) req2 = ElasticRequest.create[CustomerDocument](index, customer.copy(id = "randomIdString3")) req3 = ElasticRequest.upsert[CustomerDocument](index, firstDocId, customer.copy(balance = 3000)) req4 = ElasticRequest.deleteById(index, secondDocId) - res <- ElasticRequest.bulk(req1, req2, req3, req4).execute + res <- ElasticExecutor.execute(ElasticRequest.bulk(req1, req2, req3, req4)) } yield assert(res)(isUnit) } } ) ) @@ nondeterministic @@ sequential @@ prepareElasticsearchIndexForTests @@ afterAll( - ElasticRequest.deleteIndex(index).execute.orDie + ElasticExecutor.execute(ElasticRequest.deleteIndex(index)).orDie ) ).provideShared( elasticsearchLayer ) - } - } diff --git a/modules/library/src/it/scala/zio/elasticsearch/IntegrationSpec.scala b/modules/library/src/it/scala/zio/elasticsearch/IntegrationSpec.scala index f94c89552..76a641cab 100644 --- a/modules/library/src/it/scala/zio/elasticsearch/IntegrationSpec.scala +++ b/modules/library/src/it/scala/zio/elasticsearch/IntegrationSpec.scala @@ -27,8 +27,7 @@ import zio.test.{Assertion, Gen, TestAspect, ZIOSpecDefault, checkN} trait IntegrationSpec extends ZIOSpecDefault { - val elasticsearchLayer: ZLayer[Any, Throwable, ElasticExecutor] = - HttpClientZioBackend.layer() >>> ElasticExecutor.local + val elasticsearchLayer: TaskLayer[ElasticExecutor] = HttpClientZioBackend.layer() >>> ElasticExecutor.local val index: IndexName = IndexName("users") @@ -41,8 +40,8 @@ trait IntegrationSpec extends ZIOSpecDefault { val createIndexTestName: IndexName = IndexName("create-index-test-name") val prepareElasticsearchIndexForTests: TestAspect[Nothing, Any, Throwable, Any] = beforeAll((for { - _ <- ElasticRequest.createIndex(index, None).execute - _ <- ElasticRequest.deleteByQuery(index, matchAll).refreshTrue.execute + _ <- ElasticExecutor.execute(ElasticRequest.createIndex(index, None)) + _ <- ElasticExecutor.execute(ElasticRequest.deleteByQuery(index, matchAll).refreshTrue) } yield ()).provide(elasticsearchLayer)) def genIndexName: Gen[Any, IndexName] = diff --git a/modules/library/src/main/scala/zio/elasticsearch/ElasticExecutor.scala b/modules/library/src/main/scala/zio/elasticsearch/ElasticExecutor.scala index 7ccaffb17..990b4be3b 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/ElasticExecutor.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/ElasticExecutor.scala @@ -18,19 +18,22 @@ package zio.elasticsearch import sttp.client3.SttpBackend import zio.stm.TMap -import zio.{Task, ULayer, ZLayer} +import zio.{RIO, Task, ULayer, URLayer, ZIO, ZLayer} -trait ElasticExecutor { +private[elasticsearch] trait ElasticExecutor { def execute[A](request: ElasticRequest[A, _]): Task[A] } object ElasticExecutor { - lazy val live: ZLayer[ElasticConfig with SttpBackend[Task, Any], Throwable, ElasticExecutor] = + lazy val live: URLayer[ElasticConfig with SttpBackend[Task, Any], ElasticExecutor] = ZLayer.fromFunction(HttpElasticExecutor.apply _) - lazy val local: ZLayer[SttpBackend[Task, Any], Throwable, ElasticExecutor] = + lazy val local: URLayer[SttpBackend[Task, Any], ElasticExecutor] = ZLayer.succeed(ElasticConfig.Default) >>> live lazy val test: ULayer[TestExecutor] = ZLayer(TMap.empty[IndexName, TMap[DocumentId, Document]].map(TestExecutor).commit) + + private[elasticsearch] def execute[A](request: ElasticRequest[A, _]): RIO[ElasticExecutor, A] = + ZIO.serviceWithZIO[ElasticExecutor](_.execute(request)) } diff --git a/modules/library/src/main/scala/zio/elasticsearch/ElasticRequest.scala b/modules/library/src/main/scala/zio/elasticsearch/ElasticRequest.scala index abaa01136..9b27b9200 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/ElasticRequest.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/ElasticRequest.scala @@ -21,16 +21,12 @@ import zio.elasticsearch.Routing.{Routing, WithRouting} import zio.prelude._ import zio.schema.Schema import zio.schema.codec.JsonCodec.JsonDecoder -import zio.{RIO, ZIO} import scala.annotation.unused import scala.language.implicitConversions sealed trait ElasticRequest[+A, ERT <: ElasticRequestType] { self => - final def execute: RIO[ElasticExecutor, A] = - ZIO.serviceWithZIO[ElasticExecutor](_.execute(self)) - final def map[B](f: A => Either[DecodingException, B]): ElasticRequest[B, ERT] = ElasticRequest.Map(self, f) final def refresh(value: Boolean)(implicit wr: WithRefresh[ERT]): ElasticRequest[A, ERT] = diff --git a/modules/library/src/main/scala/zio/elasticsearch/Elasticsearch.scala b/modules/library/src/main/scala/zio/elasticsearch/Elasticsearch.scala new file mode 100644 index 000000000..a06ab56d9 --- /dev/null +++ b/modules/library/src/main/scala/zio/elasticsearch/Elasticsearch.scala @@ -0,0 +1,35 @@ +/* + * Copyright 2022 LambdaWorks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zio.elasticsearch + +import zio.{RIO, Task, URLayer, ZIO, ZLayer} + +trait Elasticsearch { + def execute[A](request: ElasticRequest[A, _]): Task[A] +} + +object Elasticsearch { + def execute[A](request: ElasticRequest[A, _]): RIO[Elasticsearch, A] = + ZIO.serviceWithZIO[Elasticsearch](_.execute(request)) + + lazy val layer: URLayer[ElasticExecutor, Elasticsearch] = + ZLayer.fromFunction { executor: ElasticExecutor => + new Elasticsearch { + def execute[A](request: ElasticRequest[A, _]): Task[A] = executor.execute(request) + } + } +} diff --git a/modules/library/src/test/scala/zio/elasticsearch/HttpElasticExecutorSpec.scala b/modules/library/src/test/scala/zio/elasticsearch/HttpElasticExecutorSpec.scala index ab9589aa7..2f61a915d 100644 --- a/modules/library/src/test/scala/zio/elasticsearch/HttpElasticExecutorSpec.scala +++ b/modules/library/src/test/scala/zio/elasticsearch/HttpElasticExecutorSpec.scala @@ -66,7 +66,9 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) ) - assertZIO(addStubMapping *> ElasticRequest.bulk(ElasticRequest.create(index, repo)).refreshTrue.execute)( + assertZIO( + addStubMapping *> ElasticExecutor.execute(ElasticRequest.bulk(ElasticRequest.create(index, repo)).refreshTrue) + )( isUnit ) }, @@ -89,11 +91,12 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) assertZIO( - addStubMapping *> ElasticRequest - .create[GitHubRepo](index = index, doc = repo) - .routing(Routing("routing")) - .refreshTrue - .execute + addStubMapping *> ElasticExecutor.execute( + ElasticRequest + .create[GitHubRepo](index = index, doc = repo) + .routing(Routing("routing")) + .refreshTrue + ) )(equalTo(DocumentId("V4x8q4UB3agN0z75fv5r"))) }, test("creating request with given ID") { @@ -106,11 +109,12 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) assertZIO( - addStubMapping *> ElasticRequest - .create[GitHubRepo](index = index, id = DocumentId("V4x8q4UB3agN0z75fv5r"), doc = repo) - .routing(Routing("routing")) - .refreshTrue - .execute + addStubMapping *> ElasticExecutor.execute( + ElasticRequest + .create[GitHubRepo](index = index, id = DocumentId("V4x8q4UB3agN0z75fv5r"), doc = repo) + .routing(Routing("routing")) + .refreshTrue + ) )(equalTo(Created)) }, test("creating index request") { @@ -120,7 +124,9 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) ) - assertZIO(addStubMapping *> ElasticRequest.createIndex(name = index, definition = None).execute)( + assertZIO( + addStubMapping *> ElasticExecutor.execute(ElasticRequest.createIndex(name = index, definition = None)) + )( equalTo(Created) ) }, @@ -134,11 +140,12 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) assertZIO( - addStubMapping *> ElasticRequest - .upsert[GitHubRepo](index = index, id = DocumentId("V4x8q4UB3agN0z75fv5r"), doc = repo) - .routing(Routing("routing")) - .refreshTrue - .execute + addStubMapping *> ElasticExecutor.execute( + ElasticRequest + .upsert[GitHubRepo](index = index, id = DocumentId("V4x8q4UB3agN0z75fv5r"), doc = repo) + .routing(Routing("routing")) + .refreshTrue + ) )(isUnit) }, test("deleting by ID request") { @@ -151,11 +158,12 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) assertZIO( - addStubMapping *> ElasticRequest - .deleteById(index = index, id = DocumentId("V4x8q4UB3agN0z75fv5r")) - .routing(Routing("routing")) - .refreshTrue - .execute + addStubMapping *> ElasticExecutor.execute( + ElasticRequest + .deleteById(index = index, id = DocumentId("V4x8q4UB3agN0z75fv5r")) + .routing(Routing("routing")) + .refreshTrue + ) )(equalTo(Deleted)) }, test("deleting by query request") { @@ -168,11 +176,9 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) assertZIO( - addStubMapping *> ElasticRequest - .deleteByQuery(index = index, query = matchAll) - .refreshTrue - .routing(Routing("routing")) - .execute + addStubMapping *> ElasticExecutor.execute( + ElasticRequest.deleteByQuery(index = index, query = matchAll).refreshTrue.routing(Routing("routing")) + ) )( equalTo(Deleted) ) @@ -186,7 +192,7 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) ) - assertZIO(addStubMapping *> ElasticRequest.deleteIndex(name = index).execute)( + assertZIO(addStubMapping *> ElasticExecutor.execute(ElasticRequest.deleteIndex(name = index)))( equalTo(Deleted) ) }, @@ -200,10 +206,11 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) assertZIO( - addStubMapping *> ElasticRequest - .exists(index = index, id = DocumentId("example-id")) - .routing(Routing("routing")) - .execute + addStubMapping *> ElasticExecutor.execute( + ElasticRequest + .exists(index = index, id = DocumentId("example-id")) + .routing(Routing("routing")) + ) )(isTrue) }, test("getting by ID request") { @@ -231,10 +238,11 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) assertZIO( - addStubMapping *> ElasticRequest - .getById[GitHubRepo](index = index, id = DocumentId("V4x8q4UB3agN0z75fv5r")) - .routing(Routing("routing")) - .execute + addStubMapping *> ElasticExecutor.execute( + ElasticRequest + .getById[GitHubRepo](index = index, id = DocumentId("V4x8q4UB3agN0z75fv5r")) + .routing(Routing("routing")) + ) )(isSome(equalTo(repo))) }, test("getting by query request") { @@ -284,7 +292,9 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) ) - assertZIO(addStubMapping *> ElasticRequest.search[GitHubRepo](index = index, query = matchAll).execute)( + assertZIO( + addStubMapping *> ElasticExecutor.execute(ElasticRequest.search[GitHubRepo](index = index, query = matchAll)) + )( equalTo(List(repo)) ) } diff --git a/modules/library/src/test/scala/zio/elasticsearch/WireMockSpec.scala b/modules/library/src/test/scala/zio/elasticsearch/WireMockSpec.scala index 449050b0c..ba8da5af3 100644 --- a/modules/library/src/test/scala/zio/elasticsearch/WireMockSpec.scala +++ b/modules/library/src/test/scala/zio/elasticsearch/WireMockSpec.scala @@ -31,8 +31,7 @@ trait WireMockSpec extends ZIOSpecDefault { val port: Int = 9300 val elasticsearchWireMockLayer: TaskLayer[ElasticExecutor] = - HttpClientZioBackend - .layer() >>> (ZLayer.succeed(ElasticConfig.apply("localhost", port)) >>> ElasticExecutor.live) + (HttpClientZioBackend.layer() ++ ZLayer.succeed(ElasticConfig.apply("localhost", port))) >>> ElasticExecutor.live val wireMockServerLayer: TaskLayer[WireMockServer] = { val server = ZIO.acquireRelease(