Skip to content

Commit

Permalink
allow queries with filters to use fast-lane reducers
Browse files Browse the repository at this point in the history
  • Loading branch information
tyrasd committed Jul 14, 2023
1 parent 34ca36a commit 39c341a
Show file tree
Hide file tree
Showing 11 changed files with 151 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ cacheName, cellIdRangeToCellIds(), cellIdRanges, cellProcessor, cellIterator

@Override
protected <R, S> S mapReduceCellsOSMContribution(
SerializableFunction<OSMContribution, R> mapper,
SerializableFunction<OSMContribution, Optional<R>> mapper,
SerializableSupplier<S> identitySupplier,
SerializableBiFunction<S, R, S> accumulator,
SerializableBinaryOperator<S> combiner
Expand Down Expand Up @@ -331,7 +331,7 @@ protected <R, S> S flatMapReduceCellsOSMContributionGroupedById(

@Override
protected <R, S> S mapReduceCellsOSMEntitySnapshot(
SerializableFunction<OSMEntitySnapshot, R> mapper,
SerializableFunction<OSMEntitySnapshot, Optional<R>> mapper,
SerializableSupplier<S> identitySupplier,
SerializableBiFunction<S, R, S> accumulator,
SerializableBinaryOperator<S> combiner
Expand Down Expand Up @@ -371,7 +371,7 @@ protected <R, S> S flatMapReduceCellsOSMEntitySnapshotGroupedById(

@Override
protected Stream<X> mapStreamCellsOSMContribution(
SerializableFunction<OSMContribution, X> mapper) throws Exception {
SerializableFunction<OSMContribution, Optional<X>> mapper) throws Exception {
return stream(Kernels.getOSMContributionCellStreamer(mapper, this));
}

Expand All @@ -383,7 +383,7 @@ protected Stream<X> flatMapStreamCellsOSMContributionGroupedById(

@Override
protected Stream<X> mapStreamCellsOSMEntitySnapshot(
SerializableFunction<OSMEntitySnapshot, X> mapper) throws Exception {
SerializableFunction<OSMEntitySnapshot, Optional<X>> mapper) throws Exception {
return stream(Kernels.getOSMEntitySnapshotCellStreamer(mapper, this));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ protected MapReducer<X> copy() {

@Override
protected Stream<X> mapStreamCellsOSMContribution(
SerializableFunction<OSMContribution, X> mapper
SerializableFunction<OSMContribution, Optional<X>> mapper
) throws Exception {
throw new UnsupportedOperationException("Stream function not yet implemented");
}
Expand All @@ -92,7 +92,7 @@ protected Stream<X> flatMapStreamCellsOSMContributionGroupedById(

@Override
protected Stream<X> mapStreamCellsOSMEntitySnapshot(
SerializableFunction<OSMEntitySnapshot, X> mapper
SerializableFunction<OSMEntitySnapshot, Optional<X>> mapper
) throws Exception {
throw new UnsupportedOperationException("Stream function not yet implemented");
}
Expand All @@ -115,7 +115,8 @@ public boolean isCancelable() {
}

@Override
protected <R, S> S mapReduceCellsOSMContribution(SerializableFunction<OSMContribution, R> mapper,
protected <R, S> S mapReduceCellsOSMContribution(
SerializableFunction<OSMContribution, Optional<R>> mapper,
SerializableSupplier<S> identitySupplier, SerializableBiFunction<S, R, S> accumulator,
SerializableBinaryOperator<S> combiner) throws Exception {
return mapReduceOnIgniteCache((OSHDBIgnite) this.oshdb, identitySupplier, combiner,
Expand All @@ -140,7 +141,8 @@ protected <R, S> S flatMapReduceCellsOSMContributionGroupedById(

@Override
protected <R, S> S mapReduceCellsOSMEntitySnapshot(
SerializableFunction<OSMEntitySnapshot, R> mapper, SerializableSupplier<S> identitySupplier,
SerializableFunction<OSMEntitySnapshot, Optional<R>> mapper,
SerializableSupplier<S> identitySupplier,
SerializableBiFunction<S, R, S> accumulator, SerializableBinaryOperator<S> combiner)
throws Exception {
return mapReduceOnIgniteCache((OSHDBIgnite) this.oshdb, identitySupplier, combiner,
Expand Down Expand Up @@ -290,12 +292,13 @@ S execute(Ignite node, CellProcessor<S> cellProcessor) {

private static class MapReduceCellsOSMContributionOnIgniteCacheComputeJob
<R, S, P extends Geometry & Polygonal>
extends MapReduceCellsOnIgniteCacheComputeJob<OSMContribution, R, R, S, P> {
extends MapReduceCellsOnIgniteCacheComputeJob<OSMContribution, R, Optional<R>, S, P> {
MapReduceCellsOSMContributionOnIgniteCacheComputeJob(TagInterpreter tagInterpreter,
List<String> cacheNames, Iterable<CellIdRange> cellIdRanges,
SortedSet<OSHDBTimestamp> tstamps, OSHDBBoundingBox bbox, P poly,
OSHEntityFilter preFilter, OSMEntityFilter filter,
SerializableFunction<OSMContribution, R> mapper, SerializableSupplier<S> identitySupplier,
SerializableFunction<OSMContribution, Optional<R>> mapper,
SerializableSupplier<S> identitySupplier,
SerializableBiFunction<S, R, S> accumulator, SerializableBinaryOperator<S> combiner) {
super(tagInterpreter, cacheNames, cellIdRanges, tstamps, bbox, poly, preFilter, filter,
mapper, identitySupplier, accumulator, combiner);
Expand Down Expand Up @@ -339,12 +342,13 @@ public S execute(Ignite node) {

private static class MapReduceCellsOSMEntitySnapshotOnIgniteCacheComputeJob
<R, S, P extends Geometry & Polygonal>
extends MapReduceCellsOnIgniteCacheComputeJob<OSMEntitySnapshot, R, R, S, P> {
extends MapReduceCellsOnIgniteCacheComputeJob<OSMEntitySnapshot, R, Optional<R>, S, P> {
MapReduceCellsOSMEntitySnapshotOnIgniteCacheComputeJob(TagInterpreter tagInterpreter,
List<String> cacheNames, Iterable<CellIdRange> cellIdRanges,
SortedSet<OSHDBTimestamp> tstamps, OSHDBBoundingBox bbox, P poly,
OSHEntityFilter preFilter, OSMEntityFilter filter,
SerializableFunction<OSMEntitySnapshot, R> mapper, SerializableSupplier<S> identitySupplier,
SerializableFunction<OSMEntitySnapshot, Optional<R>> mapper,
SerializableSupplier<S> identitySupplier,
SerializableBiFunction<S, R, S> accumulator, SerializableBinaryOperator<S> combiner) {
super(tagInterpreter, cacheNames, cellIdRanges, tstamps, bbox, poly, preFilter, filter,
mapper, identitySupplier, accumulator, combiner);
Expand Down Expand Up @@ -408,7 +412,7 @@ private static <V, R, M, S, P extends Geometry & Polygonal> S mapReduceOnIgniteC
null
);

if (!oshdb.timeoutInMilliseconds().isPresent()) {
if (oshdb.timeoutInMilliseconds().isEmpty()) {
return asyncResult.get();
} else {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.UUID;
Expand Down Expand Up @@ -110,7 +111,8 @@ public boolean isCancelable() {
// === map-reduce operations ===

@Override
protected <R, S> S mapReduceCellsOSMContribution(SerializableFunction<OSMContribution, R> mapper,
protected <R, S> S mapReduceCellsOSMContribution(
SerializableFunction<OSMContribution, Optional<R>> mapper,
SerializableSupplier<S> identitySupplier, SerializableBiFunction<S, R, S> accumulator,
SerializableBinaryOperator<S> combiner) throws Exception {
// load tag interpreter helper which is later used for geometry building
Expand Down Expand Up @@ -147,7 +149,8 @@ protected <R, S> S flatMapReduceCellsOSMContributionGroupedById(

@Override
protected <R, S> S mapReduceCellsOSMEntitySnapshot(
SerializableFunction<OSMEntitySnapshot, R> mapper, SerializableSupplier<S> identitySupplier,
SerializableFunction<OSMEntitySnapshot, Optional<R>> mapper,
SerializableSupplier<S> identitySupplier,
SerializableBiFunction<S, R, S> accumulator, SerializableBinaryOperator<S> combiner)
throws Exception {
// load tag interpreter helper which is later used for geometry building
Expand Down Expand Up @@ -185,7 +188,7 @@ protected <R, S> S flatMapReduceCellsOSMEntitySnapshotGroupedById(

@Override
protected Stream<X> mapStreamCellsOSMContribution(
SerializableFunction<OSMContribution, X> mapper) throws Exception {
SerializableFunction<OSMContribution, Optional<X>> mapper) throws Exception {
// load tag interpreter helper which is later used for geometry building
TagInterpreter tagInterpreter = this.getTagInterpreter();

Expand Down Expand Up @@ -219,7 +222,7 @@ protected Stream<X> flatMapStreamCellsOSMContributionGroupedById(

@Override
protected Stream<X> mapStreamCellsOSMEntitySnapshot(
SerializableFunction<OSMEntitySnapshot, X> mapper) throws Exception {
SerializableFunction<OSMEntitySnapshot, Optional<X>> mapper) throws Exception {
// load tag interpreter helper which is later used for geometry building
TagInterpreter tagInterpreter = this.getTagInterpreter();

Expand Down Expand Up @@ -367,12 +370,12 @@ S execute(Ignite node, CellProcessor<S> cellProcessor) {

private static class MapReduceCellsOSMContributionOnIgniteCacheComputeJob
<R, S, P extends Geometry & Polygonal>
extends MapReduceCellsOnIgniteCacheComputeJob<OSMContribution, R, R, S, P> {
extends MapReduceCellsOnIgniteCacheComputeJob<OSMContribution, R, Optional<R>, S, P> {
MapReduceCellsOSMContributionOnIgniteCacheComputeJob(TagInterpreter tagInterpreter,
String cacheName, Map<Integer, TreeMap<Long, CellIdRange>> cellIdRangesByLevel,
SortedSet<OSHDBTimestamp> tstamps, OSHDBBoundingBox bbox, P poly,
OSHEntityFilter preFilter, OSMEntityFilter filter,
SerializableFunction<OSMContribution, R> mapper,
SerializableFunction<OSMContribution, Optional<R>> mapper,
SerializableSupplier<S> identitySupplier, SerializableBiFunction<S, R, S> accumulator,
SerializableBinaryOperator<S> combiner) {
super(tagInterpreter, cacheName, cellIdRangesByLevel, tstamps, bbox, poly, preFilter, filter,
Expand Down Expand Up @@ -417,12 +420,12 @@ public S execute(Ignite node) {

private static class MapReduceCellsOSMEntitySnapshotOnIgniteCacheComputeJob
<R, S, P extends Geometry & Polygonal>
extends MapReduceCellsOnIgniteCacheComputeJob<OSMEntitySnapshot, R, R, S, P> {
extends MapReduceCellsOnIgniteCacheComputeJob<OSMEntitySnapshot, R, Optional<R>, S, P> {
MapReduceCellsOSMEntitySnapshotOnIgniteCacheComputeJob(TagInterpreter tagInterpreter,
String cacheName, Map<Integer, TreeMap<Long, CellIdRange>> cellIdRangesByLevel,
SortedSet<OSHDBTimestamp> tstamps, OSHDBBoundingBox bbox, P poly,
OSHEntityFilter preFilter, OSMEntityFilter filter,
SerializableFunction<OSMEntitySnapshot, R> mapper,
SerializableFunction<OSMEntitySnapshot, Optional<R>> mapper,
SerializableSupplier<S> identitySupplier, SerializableBiFunction<S, R, S> accumulator,
SerializableBinaryOperator<S> combiner) {
super(tagInterpreter, cacheName, cellIdRangesByLevel, tstamps, bbox, poly, preFilter, filter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,4 @@ class MapReduceOSHDBIgniteAffinityCallTest extends MapReduceOSHDBIgniteTest {
MapReduceOSHDBIgniteAffinityCallTest() throws Exception {
super(oshdb -> oshdb.computeMode(AFFINITY_CALL));
}

@Test
void testOSMEntitySnapshotViewStreamNullValues() throws Exception {
// simple stream query
Set<Integer> result = createMapReducerOSMEntitySnapshot()
.timestamps(
new OSHDBTimestamps("2010-01-01", "2015-01-01", OSHDBTimestamps.Interval.YEARLY))
.filter("id:617308093")
.map(snapshot -> snapshot.getEntity().getUserId())
.map(x -> (Integer) null)
.stream()
.collect(Collectors.toSet());

assertEquals(1, result.size());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.heigit.ohsome.oshdb.api.mapreducer;

import java.util.Collections;
import org.heigit.ohsome.oshdb.util.function.SerializablePredicate;

/**
* A special map function that represents a filter.
*
* <p>Note that this class is using raw types on purpose because MapReducer's "map functions"
* are designed to input and output arbitrary data types. The necessary type checks are performed
* at at runtime in the respective setters.</p>
*/
@SuppressWarnings({"rawtypes", "unchecked"}) // see javadoc above
class FilterFunction extends MapFunction {
private final SerializablePredicate filter;

FilterFunction(SerializablePredicate filter) {
super((x, ignored) -> filter.test(x)
? Collections.singletonList(x)
: Collections.emptyList(),
true);
this.filter = filter;
}

public boolean test(Object root) {
return this.filter.test(root);
}
}
Loading

0 comments on commit 39c341a

Please sign in to comment.