Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(dsl): Support sum aggregation #261

Merged
merged 5 commits into from
Jun 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions docs/overview/aggregations/elastic_aggregation_sum.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
---
id: elastic_aggregation_sum
title: "Sum Aggregation"
---

The `Sum` aggregation is a single-value metrics aggregation that keeps track and returns the sum value among the numeric values extracted from the aggregated documents.

In order to use the `Sum` aggregation import the following:
```scala
import zio.elasticsearch.aggregation.SumAggregation
import zio.elasticsearch.ElasticAggregation.sumAggregation
```

You can create a `Sum` aggregation using the `sumAggregation` method this way:
```scala
val aggregation: SumAggregation = sumAggregation(name = "sumAggregation", field = "intField")
```

You can create a [type-safe](https://lambdaworks.github.io/zio-elasticsearch/overview/overview_zio_prelude_schema) `Sum` aggregation using the `sumAggregation` method this way:
```scala
// Document.intField must be number value, because of Sum aggregation
val aggregation: SumAggregation = sumAggregation(name = "sumAggregation", field = Document.intField)
```

If you want to change the `missing` parameter, you can use `missing` method:
```scala
val aggregationWithMissing: SumAggregation = sumAggregation(name = "sumAggregation", field = Document.intField).missing(10.0)
```

If you want to add aggregation (on the same level), you can use `withAgg` method:
```scala
val multipleAggregations: MultipleAggregations = sumAggregation(name = "sumAggregation1", field = Document.intField).withAgg(sumAggregation(name = "sumAggregation2", field = Document.doubleField))
```

You can find more information about `Sum` aggregation [here](https://www.elastic.co/guide/en/elasticsearch/reference/7.17/search-aggregations-metrics-sum-aggregation.html).
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,30 @@ object HttpExecutorSpec extends IntegrationSpec {
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
),
test("aggregate using sum aggregation") {
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
(firstDocumentId, firstDocument, secondDocumentId, secondDocument) =>
for {
_ <- Executor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll))
_ <- Executor.execute(
ElasticRequest
.upsert[TestDocument](firstSearchIndex, firstDocumentId, firstDocument.copy(intField = 200))
)
_ <- Executor.execute(
ElasticRequest
.upsert[TestDocument](firstSearchIndex, secondDocumentId, secondDocument.copy(intField = 23))
.refreshTrue
)
aggregation = sumAggregation(name = "aggregationInt", field = TestDocument.intField)
aggsRes <- Executor
.execute(ElasticRequest.aggregate(index = firstSearchIndex, aggregation = aggregation))
.asSumAggregation("aggregationInt")
} yield assert(aggsRes.head.value)(equalTo(223.0))
}
} @@ around(
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
),
test("aggregate using terms aggregation") {
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
(firstDocumentId, firstDocument, secondDocumentId, secondDocument) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,34 @@ object ElasticAggregation {
final def multipleAggregations: MultipleAggregations =
Multiple(aggregations = Chunk.empty)

/**
* Constructs a type-safe instance of [[zio.elasticsearch.aggregation.SumAggregation]] using the specified parameters.
*
* @param name
* aggregation name
* @param field
* the type-safe field for which sum aggregation will be executed
* @tparam A
* expected number type
* @return
* an instance of [[zio.elasticsearch.aggregation.SumAggregation]] that represents sum aggregation to be performed.
*/
final def sumAggregation[A: Numeric](name: String, field: Field[_, A]): SumAggregation =
Sum(name = name, field = field.toString, missing = None)

/**
* Constructs an instance of [[zio.elasticsearch.aggregation.SumAggregation]] using the specified parameters.
*
* @param name
* aggregation name
* @param field
* the field for which sum aggregation will be executed
* @return
* an instance of [[zio.elasticsearch.aggregation.SumAggregation]] that represents sum aggregation to be performed.
*/
final def sumAggregation(name: String, field: String): SumAggregation =
Sum(name = name, field = field, missing = None)

/**
* Constructs a type-safe instance of [[zio.elasticsearch.aggregation.TermsAggregation]] using the specified
* parameters.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,23 @@ private[elasticsearch] final case class Multiple(aggregations: Chunk[SingleElast
aggregations.map(_.toJson).reduce(_ merge _)
}

sealed trait SumAggregation extends SingleElasticAggregation with HasMissing[SumAggregation] with WithAgg

private[elasticsearch] final case class Sum(name: String, field: String, missing: Option[Double])
extends SumAggregation { self =>
def missing(value: Double): SumAggregation =
self.copy(missing = Some(value))

def withAgg(agg: SingleElasticAggregation): MultipleAggregations =
multipleAggregations.aggregations(self, agg)

private[elasticsearch] def toJson: Json = {
val missingJson: Json = missing.fold(Obj())(m => Obj("missing" -> m.toJson))

Obj(name -> Obj("sum" -> (Obj("field" -> field.toJson) merge missingJson)))
}
}

sealed trait TermsAggregation
extends SingleElasticAggregation
with HasOrder[TermsAggregation]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ object AggregationResponse {
MaxAggregationResult(value)
case MinAggregationResponse(value) =>
MinAggregationResult(value)
case SumAggregationResponse(value) =>
SumAggregationResult(value)
case TermsAggregationResponse(docErrorCount, sumOtherDocCount, buckets) =>
TermsAggregationResult(
docErrorCount = docErrorCount,
Expand Down Expand Up @@ -77,6 +79,13 @@ private[elasticsearch] object MinAggregationResponse {
implicit val decoder: JsonDecoder[MinAggregationResponse] = DeriveJsonDecoder.gen[MinAggregationResponse]
}

private[elasticsearch] final case class SumAggregationResponse(value: Double) extends AggregationResponse

private[elasticsearch] object SumAggregationResponse {
implicit val decoder: JsonDecoder[SumAggregationResponse] = DeriveJsonDecoder.gen[SumAggregationResponse]

}

private[elasticsearch] final case class TermsAggregationResponse(
@jsonField("doc_count_error_upper_bound")
docErrorCount: Int,
Expand Down Expand Up @@ -118,6 +127,8 @@ private[elasticsearch] object TermsAggregationBucket {
Some(field -> MaxAggregationResponse(value = objFields("value").unsafeAs[Double]))
case str if str.contains("min#") =>
Some(field -> MinAggregationResponse(value = objFields("value").unsafeAs[Double]))
case str if str.contains("sum#") =>
Some(field -> SumAggregationResponse(value = objFields("value").unsafeAs[Double]))
case str if str.contains("terms#") =>
Some(
field -> TermsAggregationResponse(
Expand Down Expand Up @@ -145,6 +156,8 @@ private[elasticsearch] object TermsAggregationBucket {
(field.split("#")(1), data.asInstanceOf[MaxAggregationResponse])
case str if str.contains("min#") =>
(field.split("#")(1), data.asInstanceOf[MinAggregationResponse])
case str if str.contains("sum#") =>
(field.split("#")(1), data.asInstanceOf[SumAggregationResponse])
case str if str.contains("terms#") =>
(field.split("#")(1), data.asInstanceOf[TermsAggregationResponse])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ private[elasticsearch] final case class SearchWithAggregationsResponse(
MaxAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("min#") =>
MinAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("sum#") =>
SumAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("cardinality#") =>
CardinalityAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("terms#") =>
Expand Down
12 changes: 12 additions & 0 deletions modules/library/src/main/scala/zio/elasticsearch/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,18 @@ package object elasticsearch extends IndexNameNewtype with RoutingNewtype {
def asMaxAggregation(name: String): RIO[R, Option[MaxAggregationResult]] =
aggregationAs[MaxAggregationResult](name)

/**
* Executes the [[ElasticRequest.SearchRequest]] or the [[ElasticRequest.SearchAndAggregateRequest]].
*
* @param name
* the name of the aggregation to retrieve
* @return
* a [[RIO]] effect that, when executed, will produce the aggregation as instance of
* [[result.SumAggregationResult]].
*/
def asSumAggregation(name: String): RIO[R, Option[SumAggregationResult]] =
aggregationAs[SumAggregationResult](name)

/**
* Executes the [[ElasticRequest.SearchRequest]] or the [[ElasticRequest.SearchAndAggregateRequest]].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ final case class MaxAggregationResult private[elasticsearch] (value: Double) ext

final case class MinAggregationResult private[elasticsearch] (value: Double) extends AggregationResult

final case class SumAggregationResult private[elasticsearch] (value: Double) extends AggregationResult

final case class TermsAggregationResult private[elasticsearch] (
docErrorCount: Int,
sumOtherDocCount: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ private[elasticsearch] sealed trait ResultWithAggregation {
def asMinAggregation(name: String): IO[DecodingException, Option[MinAggregationResult]] =
aggregationAs[MinAggregationResult](name)

def asSumAggregation(name: String): IO[DecodingException, Option[SumAggregationResult]] =
aggregationAs[SumAggregationResult](name)

def asTermsAggregation(name: String): IO[DecodingException, Option[TermsAggregationResult]] =
aggregationAs[TermsAggregationResult](name)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,19 @@ object ElasticAggregationSpec extends ZIOSpecDefault {
)
)
},
test("sum") {
val aggregation = sumAggregation("aggregation", "testField")
val aggregationTs = sumAggregation("aggregation", TestSubDocument.intField)
val aggregationTsRaw = sumAggregation("aggregation", TestSubDocument.intField.raw)
val aggregationWithMissing = sumAggregation("aggregation", TestSubDocument.intField).missing(20.0)

assert(aggregation)(equalTo(Sum(name = "aggregation", field = "testField", missing = None))) &&
assert(aggregationTs)(equalTo(Sum(name = "aggregation", field = "intField", missing = None))) &&
assert(aggregationTsRaw)(equalTo(Sum(name = "aggregation", field = "intField.raw", missing = None))) &&
assert(aggregationWithMissing)(
equalTo(Sum(name = "aggregation", field = "intField", missing = Some(20.0)))
)
},
test("terms") {
val aggregation = termsAggregation("aggregation", "testField")
val aggregationTs = termsAggregation("aggregation", TestSubDocument.stringField)
Expand Down Expand Up @@ -725,6 +738,49 @@ object ElasticAggregationSpec extends ZIOSpecDefault {

assert(aggregation.toJson)(equalTo(expected.toJson))
},
test("sum") {
val aggregation = sumAggregation("aggregation", "testField")
val aggregationTs = sumAggregation("aggregation", TestDocument.intField)
val aggregationWithMissing = sumAggregation("aggregation", TestDocument.intField).missing(20.0)

val expected =
"""
|{
| "aggregation": {
| "sum": {
| "field": "testField"
| }
| }
|}
|""".stripMargin

val expectedTs =
"""
|{
| "aggregation": {
| "sum": {
| "field": "intField"
| }
| }
|}
|""".stripMargin

val expectedWithMissing =
"""
|{
| "aggregation": {
| "sum": {
| "field": "intField",
| "missing": 20.0
| }
| }
|}
|""".stripMargin

assert(aggregation.toJson)(equalTo(expected.toJson)) &&
assert(aggregationTs.toJson)(equalTo(expectedTs.toJson)) &&
assert(aggregationWithMissing.toJson)(equalTo(expectedWithMissing.toJson))
},
test("terms") {
val aggregation = termsAggregation("aggregation", "testField")
val aggregationTs = termsAggregation("aggregation", TestDocument.stringField)
Expand Down
1 change: 1 addition & 0 deletions website/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ module.exports = {
'overview/aggregations/elastic_aggregation_max',
'overview/aggregations/elastic_aggregation_min',
'overview/aggregations/elastic_aggregation_terms',
'overview/aggregations/elastic_aggregation_sum',
],
},
{
Expand Down