Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(dsl): Support 'max' aggregation #195

Merged
merged 5 commits into from
May 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
package zio.elasticsearch

import zio.Chunk
import zio.elasticsearch.ElasticAggregation.{multipleAggregations, termsAggregation}
import zio.elasticsearch.ElasticAggregation.{maxAggregation, multipleAggregations, termsAggregation}
import zio.elasticsearch.ElasticHighlight.highlight
import zio.elasticsearch.ElasticQuery._
import zio.elasticsearch.ElasticSort.sortBy
import zio.elasticsearch.domain.{TestDocument, TestSubDocument}
import zio.elasticsearch.executor.Executor
import zio.elasticsearch.executor.response.MaxAggregationResponse
import zio.elasticsearch.query.sort.SortMode.Max
import zio.elasticsearch.query.sort.SortOrder._
import zio.elasticsearch.query.sort.SourceType.NumberType
Expand All @@ -44,6 +45,31 @@ object HttpExecutorSpec extends IntegrationSpec {
suite("Executor")(
suite("HTTP Executor")(
suite("aggregation")(
test("aggregate using max aggregation") {
val expectedResponse = ("aggregationInt", MaxAggregationResponse(20.0))
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
(firstDocumentId, firstDocument, secondDocumentId, secondDocument) =>
for {
_ <- Executor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll))
_ <- Executor.execute(
ElasticRequest
.upsert[TestDocument](firstSearchIndex, firstDocumentId, firstDocument.copy(intField = 20))
)
_ <- Executor.execute(
ElasticRequest
.upsert[TestDocument](firstSearchIndex, secondDocumentId, secondDocument.copy(intField = 10))
.refreshTrue
)
aggregation = maxAggregation(name = "aggregationInt", field = TestDocument.intField)
aggsRes <- Executor
.execute(ElasticRequest.aggregate(index = firstSearchIndex, aggregation = aggregation))
.aggregations
} yield assert(aggsRes.head)(equalTo(expectedResponse))
}
} @@ around(
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
),
test("aggregate using terms aggregation") {
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
(firstDocumentId, firstDocument, secondDocumentId, secondDocument) =>
Expand All @@ -58,15 +84,9 @@ object HttpExecutorSpec extends IntegrationSpec {
.refreshTrue
)
aggregation =
termsAggregation(
name = "aggregationString",
field = TestDocument.stringField.keyword
)
termsAggregation(name = "aggregationString", field = TestDocument.stringField.keyword)
aggsRes <- Executor
.execute(
ElasticRequest
.aggregate(index = firstSearchIndex, aggregation = aggregation)
)
.execute(ElasticRequest.aggregate(index = firstSearchIndex, aggregation = aggregation))
.aggregations
} yield assert(aggsRes)(isNonEmpty)
}
Expand Down Expand Up @@ -106,7 +126,7 @@ object HttpExecutorSpec extends IntegrationSpec {
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
),
test("aggregate using terms aggregation with nested terms aggregation") {
test("aggregate using terms aggregation with nested max aggregation") {
arnoldlacko marked this conversation as resolved.
Show resolved Hide resolved
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
(firstDocumentId, firstDocument, secondDocumentId, secondDocument) =>
for {
Expand All @@ -123,7 +143,7 @@ object HttpExecutorSpec extends IntegrationSpec {
name = "aggregationString",
field = TestDocument.stringField.keyword
)
.withSubAgg(termsAggregation(name = "aggregationInt", field = "intField.keyword"))
.withSubAgg(maxAggregation(name = "aggregationInt", field = "intField"))
aggsRes <- Executor
.execute(
ElasticRequest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,37 @@ object ElasticAggregation {
* @param name
* aggregation name
* @param field
* field for which terms aggregation will be executed
* the field for which terms aggregation will be executed
* @return
* an instance of [[zio.elasticsearch.aggregation.TermsAggregation]] that represents terms aggregation to be
* performed.
*/
final def termsAggregation(name: String, field: String): TermsAggregation =
Terms(name = name, field = field, order = Set.empty, subAggregations = Nil, size = None)

/**
* Constructs a type-safe instance of [[zio.elasticsearch.aggregation.MaxAggregation]] using the specified parameters.
*
* @param name
* aggregation name
* @param field
* the type-safe field for which max aggregation will be executed
* @return
* an instance of [[zio.elasticsearch.aggregation.MaxAggregation]] that represents max aggregation to be performed.
*/
final def maxAggregation(name: String, field: Field[_, Any]): MaxAggregation =
Max(name = name, field = field.toString, missing = None)

/**
* Constructs an instance of [[zio.elasticsearch.aggregation.MaxAggregation]] using the specified parameters.
*
* @param name
* aggregation name
* @param field
* the field for which max aggregation will be executed
* @return
* an instance of [[zio.elasticsearch.aggregation.MaxAggregation]] that represents max aggregation to be performed.
*/
final def maxAggregation(name: String, field: String): MaxAggregation =
Max(name = name, field = field, missing = None)
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,23 @@ sealed trait ElasticAggregation { self =>

sealed trait SingleElasticAggregation extends ElasticAggregation

sealed trait MaxAggregation extends SingleElasticAggregation with HasMissing[MaxAggregation] with WithAgg

private[elasticsearch] final case class Max(name: String, field: String, missing: Option[Double])
extends MaxAggregation { self =>
def missing(value: Double): MaxAggregation =
self.copy(missing = Some(value))

def withAgg(agg: SingleElasticAggregation): MultipleAggregations =
multipleAggregations.aggregations(self, agg)

private[elasticsearch] def paramsToJson: Json = {
val missingJson: Json = missing.fold(Obj())(m => Obj("missing" -> m.toJson))

Obj(name -> Obj("max" -> (Obj("field" -> field.toJson) merge missingJson)))
}
}

sealed trait MultipleAggregations extends ElasticAggregation with WithAgg {
def aggregations(aggregations: SingleElasticAggregation*): MultipleAggregations
}
Expand All @@ -40,7 +57,7 @@ private[elasticsearch] final case class Multiple(aggregations: List[SingleElasti
def aggregations(aggregations: SingleElasticAggregation*): MultipleAggregations =
self.copy(aggregations = self.aggregations ++ aggregations)

def paramsToJson: Json =
private[elasticsearch] def paramsToJson: Json =
aggregations.map(_.paramsToJson).reduce(_ merge _)

def withAgg(agg: SingleElasticAggregation): MultipleAggregations =
Expand Down Expand Up @@ -75,7 +92,7 @@ private[elasticsearch] final case class Terms(
def orderBy(order: AggregationOrder, orders: AggregationOrder*): TermsAggregation =
self.copy(order = self.order + order ++ orders.toSet)

def paramsToJson: Json =
private[elasticsearch] def paramsToJson: Json =
Obj(name -> paramsToJsonHelper)

def size(value: Int): TermsAggregation =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2022 LambdaWorks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package zio.elasticsearch.aggregation.options

private[elasticsearch] trait HasMissing[A <: HasMissing[A]] {

/**
* Sets the `missing` parameter for the [[zio.elasticsearch.aggregation.ElasticAggregation]]. The`missing` parameter
* provides a value to use when a document is missing the field that the aggregation is running on.
*
* @param value
* the value to use for missing documents
* @return
* an instance of the [[zio.elasticsearch.aggregation.ElasticAggregation]] enriched with the `missing` parameter.
*/
def missing(value: Double): A
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ import zio.json.{DeriveJsonDecoder, JsonDecoder, jsonField}

sealed trait AggregationResponse

private[elasticsearch] final case class MaxAggregationResponse(value: Double) extends AggregationResponse

private[elasticsearch] object MaxAggregationResponse {
implicit val decoder: JsonDecoder[MaxAggregationResponse] = DeriveJsonDecoder.gen[MaxAggregationResponse]
}

private[elasticsearch] final case class TermsAggregationResponse(
@jsonField("doc_count_error_upper_bound")
docErrorCount: Int,
Expand Down Expand Up @@ -65,6 +71,10 @@ private[elasticsearch] object TermsAggregationBucket {
.map(_.unsafeAs[TermsAggregationBucket](TermsAggregationBucket.decoder))
)
)
case str if str.contains("max#") =>
Some(
field -> MaxAggregationResponse(value = objFields("value").unsafeAs[Double])
)
}
}
}.toMap
Expand All @@ -76,6 +86,8 @@ private[elasticsearch] object TermsAggregationBucket {
(field: @unchecked) match {
case str if str.contains("terms#") =>
(field.split("#")(1), data.asInstanceOf[TermsAggregationResponse])
case str if str.contains("max#") =>
(field.split("#")(1), data.asInstanceOf[MaxAggregationResponse])
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ private[elasticsearch] final case class SearchWithAggregationsResponse(
TermsAggregationResponse.decoder
.decodeJson(data.toString)
.map(field.split("#")(1) -> _)
case str if str.contains("max#") =>
MaxAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)

}
)
}
Expand Down
Loading