Skip to content

Commit

Permalink
Merge pull request #100 from opencb/TASK-7134
Browse files Browse the repository at this point in the history
TASK-7134 - Re-implement Aggregations Stats for all Catalog Browsers
  • Loading branch information
jtarraga authored Feb 10, 2025
2 parents a3359bc + e0b5d3f commit b2a79c7
Show file tree
Hide file tree
Showing 7 changed files with 1,623 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
/**
* Created by jtarraga on 09/03/17.
*/

public class FacetField {
private String name;
private long count;
Expand All @@ -38,6 +37,13 @@ public FacetField(String name, long count, List<Bucket> buckets) {
this.buckets = buckets;
}

public FacetField(String name, long count, String aggregationName, List<Double> aggregationValues) {
this.name = name;
this.count = count;
this.aggregationName = aggregationName;
this.aggregationValues = aggregationValues;
}

public FacetField(String name, String aggregationName, List<Double> aggregationValues) {
this.name = name;
this.aggregationName = aggregationName;
Expand Down Expand Up @@ -78,7 +84,7 @@ public FacetField setCount(long count) {
}

public FacetField addCount(long delta) {
this.count += delta;
this.count = this.count + delta;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ private <T> DataResult<T> endQuery(List result, long numMatches, double start) {
long end = System.currentTimeMillis();
int numResults = (result != null) ? result.size() : 0;

DataResult<T> queryResult = new DataResult((int) (end - start), Collections.emptyList(), numResults, result, numMatches, null);
return queryResult;
return new DataResult((int) (end - start), Collections.emptyList(), numResults, result, numMatches, null);
}

private DataResult endWrite(long start) {
Expand Down Expand Up @@ -331,31 +330,25 @@ public <T> DataResult<T> aggregate(List<? extends Bson> operations, ComplexTypeC
QueryOptions options) {

long start = startQuery();

DataResult<T> queryResult;
MongoDBIterator<T> iterator = mongoDBNativeQuery.aggregate(operations, converter, options);
// MongoCursor<Document> iterator = output.iterator();
List<T> list = new LinkedList<>();
if (queryResultWriter != null) {
try {
queryResultWriter.open();
if (operations != null && !operations.isEmpty()) {
MongoDBIterator<T> iterator = mongoDBNativeQuery.aggregate(operations, converter, options);
if (queryResultWriter != null) {
try {
queryResultWriter.open();
while (iterator.hasNext()) {
queryResultWriter.write(iterator.next());
}
queryResultWriter.close();
} catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
}
} else {
while (iterator.hasNext()) {
queryResultWriter.write(iterator.next());
list.add(iterator.next());
}
queryResultWriter.close();
} catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
}
} else {
// if (converter != null) {
// while (iterator.hasNext()) {
// list.add(converter.convertToDataModelType(iterator.next()));
// }
// } else {
while (iterator.hasNext()) {
list.add((T) iterator.next());
}
// }
}
queryResult = endQuery(list, start);
return queryResult;
Expand Down Expand Up @@ -435,7 +428,7 @@ public DataResult update(ClientSession clientSession, List<? extends Bson> queri

return endWrite(
wr.getMatchedCount(),
wr.getInsertedCount() + wr.getUpserts().size(),
(long) wr.getInsertedCount() + wr.getUpserts().size(),
wr.getModifiedCount(),
wr.getDeletedCount(),
0,
Expand Down Expand Up @@ -553,8 +546,7 @@ public DataResult createIndex(Bson keys, ObjectMap options) {
}

mongoDBNativeQuery.createIndex(keys, i);
DataResult dataResult = endQuery(Collections.emptyList(), start);
return dataResult;
return endQuery(Collections.emptyList(), start);
}

public void dropIndexes() {
Expand All @@ -564,15 +556,13 @@ public void dropIndexes() {
public DataResult dropIndex(Bson keys) {
long start = startQuery();
mongoDBNativeQuery.dropIndex(keys);
DataResult dataResult = endQuery(Collections.emptyList(), start);
return dataResult;
return endQuery(Collections.emptyList(), start);
}

public DataResult<Document> getIndex() {
long start = startQuery();
List<Document> index = mongoDBNativeQuery.getIndex();
DataResult<Document> queryResult = endQuery(index, start);
return queryResult;
return endQuery(index, start);
}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
package org.opencb.commons.datastore.mongodb;

import org.apache.commons.lang3.StringUtils;
import org.bson.Document;
import org.opencb.commons.datastore.core.ComplexTypeConverter;
import org.opencb.commons.datastore.core.FacetField;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.*;

import static org.opencb.commons.datastore.mongodb.GenericDocumentComplexConverter.TO_REPLACE_DOTS;
import static org.opencb.commons.datastore.mongodb.MongoDBQueryUtils.Accumulator.*;
import static org.opencb.commons.datastore.mongodb.MongoDBQueryUtils.*;

public class MongoDBDocumentToFacetFieldsConverter implements ComplexTypeConverter<List<FacetField>, Document> {

private static final Map<String, String> MONTH_MAP = new HashMap<>();

static {
MONTH_MAP.put("01", "Jan");
MONTH_MAP.put("02", "Feb");
MONTH_MAP.put("03", "Mar");
MONTH_MAP.put("04", "Apr");
MONTH_MAP.put("05", "May");
MONTH_MAP.put("06", "Jun");
MONTH_MAP.put("07", "Jul");
MONTH_MAP.put("08", "Aug");
MONTH_MAP.put("09", "Sep");
MONTH_MAP.put("10", "Oct");
MONTH_MAP.put("11", "Nov");
MONTH_MAP.put("12", "Dec");
}

@Override
public List<FacetField> convertToDataModelType(Document document) {
if (document == null || document.entrySet().size() == 0) {
return Collections.emptyList();
}

String facetFieldName;
List<FacetField> facets = new ArrayList<>();
for (Map.Entry<String, Object> entry : document.entrySet()) {
String key = entry.getKey();
List<Document> documentValues = (List<Document>) entry.getValue();
if (key.endsWith(COUNTS_SUFFIX) || key.endsWith(FACET_ACC_SUFFIX) || key.endsWith(YEAR_SUFFIX) || key.endsWith(MONTH_SUFFIX)
|| key.endsWith(DAY_SUFFIX)) {
facetFieldName = key.split(SEPARATOR)[0].replace(TO_REPLACE_DOTS, ".");

List<FacetField.Bucket> buckets = new ArrayList<>(documentValues.size());
long total = 0;
for (Document documentValue : documentValues) {

long counter = documentValue.getInteger(count.name());
String bucketValue = "";
Object internalIdValue = documentValue.get(INTERNAL_ID);
if (internalIdValue instanceof String) {
bucketValue = (String) internalIdValue;
} else if (internalIdValue instanceof Boolean
|| internalIdValue instanceof Integer
|| internalIdValue instanceof Long
|| internalIdValue instanceof Double) {
bucketValue = internalIdValue.toString();
} else if (internalIdValue instanceof Document) {
bucketValue = StringUtils.join(((Document) internalIdValue).values(), SEPARATOR);
if (key.endsWith(COUNTS_SUFFIX)) {
facetFieldName = key.substring(0, key.indexOf(COUNTS_SUFFIX));
}
}

List<FacetField> bucketFacetFields = null;
if (key.endsWith(FACET_ACC_SUFFIX)) {
String[] split = key.split(SEPARATOR);
String name = split[2];
String aggregationName = split[1];
Double value;
if (documentValue.get(aggregationName) instanceof Integer) {
value = 1.0d * documentValue.getInteger(aggregationName);
} else if (documentValue.get(aggregationName) instanceof Long) {
value = 1.0d * documentValue.getLong(aggregationName);
} else {
value = documentValue.getDouble(aggregationName);
}
List<Double> aggregationValues = Collections.singletonList(value);
FacetField facetField = new FacetField(name.replace(TO_REPLACE_DOTS, "."), aggregationName, aggregationValues);
// Perhaps it’s redundant, as it is also set in the bucket
facetField.setCount(counter);
bucketFacetFields = Collections.singletonList(facetField);
}

buckets.add(new FacetField.Bucket(bucketValue, counter, bucketFacetFields));
total += counter;
}
FacetField facetField = new FacetField(facetFieldName, total, buckets);
facetField.setAggregationName(count.name());
if (key.endsWith(YEAR_SUFFIX) || key.endsWith(MONTH_SUFFIX) || key.endsWith(DAY_SUFFIX)) {
Collections.sort(buckets, Comparator.comparing(FacetField.Bucket::getValue));
if (key.endsWith(MONTH_SUFFIX)) {
for (FacetField.Bucket b : buckets) {
String[] split = b.getValue().split(SEPARATOR);
b.setValue(MONTH_MAP.get(split[1]) + " " + split[0]);
}
} else if (key.endsWith(DAY_SUFFIX)) {
for (FacetField.Bucket b : buckets) {
String[] split = b.getValue().split(SEPARATOR);
b.setValue(split[2] + " " + MONTH_MAP.get(split[1]) + " " + split[0]);
}
}
// Remove the data field and keep year, month and day
List<String> labels = new ArrayList<>(Arrays.asList(key.split(SEPARATOR)));
labels.remove(0);
facetField.setAggregationName(StringUtils.join(labels, SEPARATOR).toLowerCase(Locale.ROOT));
}
facets.add(facetField);
} else if (key.endsWith(RANGES_SUFFIX)) {
List<FacetField.Bucket> buckets = new ArrayList<>(documentValues.size());
int total = 0;

String[] split = key.split(SEPARATOR);
double start = Double.parseDouble(split[1].replace(TO_REPLACE_DOTS, "."));
double end = Double.parseDouble(split[2].replace(TO_REPLACE_DOTS, "."));
double step = Double.parseDouble(split[3].replace(TO_REPLACE_DOTS, "."));

int other = 0;
for (double i = start; i <= end; i += step) {
int bucketCount = getBucketCountFromRanges(i, documentValues);
FacetField.Bucket bucket = new FacetField.Bucket(String.valueOf(roundToTwoSignificantDecimals(i)), bucketCount, null);
buckets.add(bucket);
total += bucketCount;
}

for (Document value : documentValues) {
if (value.get(INTERNAL_ID) instanceof String && OTHER.equals(value.getString(INTERNAL_ID))) {
other = value.getInteger(count.name());
}
}
facetFieldName = key.split(SEPARATOR)[0].replace(TO_REPLACE_DOTS, ".");
if (other > 0) {
FacetField.Bucket bucket = new FacetField.Bucket("Other", other, null);
buckets.add(bucket);
total += bucket.getCount();
}
FacetField facetField = new FacetField(facetFieldName, total, buckets)
.setStart(start)
.setEnd(end)
.setStep(step);
facets.add(facetField);
} else {
Document documentValue = ((List<Document>) entry.getValue()).get(0);
MongoDBQueryUtils.Accumulator accumulator = getAccumulator(documentValue);
switch (accumulator) {
case sum:
case avg:
case max:
case min:
case stdDevPop:
case stdDevSamp: {
List<Double> fieldValues = new ArrayList<>();
if (documentValue.get(accumulator.name()) instanceof Integer) {
fieldValues.add(1.0d * documentValue.getInteger(accumulator.name()));
} else if (documentValue.get(accumulator.name()) instanceof Long) {
fieldValues.add(1.0d * documentValue.getLong(accumulator.name()));
} else if (documentValue.get(accumulator.name()) instanceof List) {
List<Number> list = (List<Number>) documentValue.get(accumulator.name());
for (Number number : list) {
fieldValues.add(number.doubleValue());
}
} else {
fieldValues.add(documentValue.getDouble(accumulator.name()));
}
long count = 0;
if (documentValue.containsKey("count")) {
count = Long.valueOf(documentValue.getInteger("count"));
}
facetFieldName = documentValue.getString(INTERNAL_ID).replace(TO_REPLACE_DOTS, ".");
facets.add(new FacetField(facetFieldName, count, accumulator.name(), fieldValues));
break;
}
default: {
// Do nothing, exception is raised
}
}
}
}
return facets;
}

private MongoDBQueryUtils.Accumulator getAccumulator(Document document) {
for (Map.Entry<String, Object> entry : document.entrySet()) {
try {
MongoDBQueryUtils.Accumulator accumulator = MongoDBQueryUtils.Accumulator.valueOf(entry.getKey());
return accumulator;
} catch (IllegalArgumentException e) {
// Do nothing
}
}
throw new IllegalArgumentException("No accumulators found in facet document: " + StringUtils.join(document.keySet(), ", ")
+ "Valid accumulator functions: " + StringUtils.join(Arrays.asList(count, sum, max, min, avg, stdDevPop, stdDevSamp), ","));
}

@Override
public Document convertToStorageType(List<FacetField> facetFields) {
throw new RuntimeException("Not yet implemented");
}

private static double roundToTwoSignificantDecimals(double value) {
if (value == 0) {
return 0;
}

BigDecimal bd = new BigDecimal(value);
int integerDigits = bd.precision() - bd.scale();
int scale = Math.max(0, 2 + integerDigits);
return bd.setScale(scale, RoundingMode.HALF_UP).doubleValue();
}


private int getBucketCountFromRanges(double inputRange, List<Document> documentValues) {
for (Document document : documentValues) {
if (!OTHER.equals(document.get(INTERNAL_ID))) {
if (inputRange == document.getDouble(INTERNAL_ID)) {
return document.getInteger(count.name());
}
}
}
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public <T> MongoDBIterator<T> aggregate(ClientSession clientSession, List<? exte
// we need to be sure that the List is mutable
List<Bson> bsonOperations = new ArrayList<>(operations);
parseQueryOptions(bsonOperations, options);
System.out.println("bsonOperations = " + bsonOperations);
MongoDBIterator<T> iterator = null;
if (bsonOperations.size() > 0) {
long numMatches = -1;
Expand Down
Loading

0 comments on commit b2a79c7

Please sign in to comment.