Skip to content

Commit

Permalink
more effective data extraction
Browse files Browse the repository at this point in the history
* requires improvements from GIScience/oshdb#200
* it's not necessary to do a custom preflight request anymore on ohsome-api side
* uses a custom fork-join pool for parallelized requests to ignite

fixes https://gitlab.gistools.geog.uni-heidelberg.de/giscience/big-data/ohsome/ohsome-api/issues/60
  • Loading branch information
tyrasd committed Oct 23, 2019
1 parent 83c43fc commit 3401db9
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 53 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<properties>
<springboot.version>2.0.3.RELEASE</springboot.version>
<jsonpath.version>2.2.0</jsonpath.version>
<oshdb.version>0.5.4</oshdb.version>
<oshdb.version>0.6.0-SNAPSHOT</oshdb.version>
<jacksondatatype.version>2.9.6</jacksondatatype.version>
<geotools.version>20.2</geotools.version>
<springfox.version>2.9.2</springfox.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ public class ElementsRequestExecutor {
public static final String URL = ExtractMetadata.attributionUrl;
public static final String TEXT = ExtractMetadata.attributionShort;
public static final DecimalFormat df = ExecutionUtils.defineDecimalFormat("#.##");
private static final double MAX_STREAM_DATA_SIZE = 1E7;

private ElementsRequestExecutor() {
throw new IllegalStateException("Utility class");
Expand Down Expand Up @@ -103,19 +102,10 @@ public static void executeElements(ElementsGeometry elemGeom, HttpServletRequest
final boolean includeOSMMetadata = inputProcessor.includeOSMMetadata();
final boolean unclippedGeometries = inputProcessor.isUnclipped();
if (DbConnData.db instanceof OSHDBIgnite) {
// do a preflight to get an approximate result data size estimation:
// for now just the sum of the average size of the objects versions in bytes is used
// if that number is larger than 10MB, then fall back to the slightly slower, but much
// less memory intensive streaming implementation (which is currently only available on
// the ignite "AffinityCall" backend).
Number approxResultSize = inputProcessor.processParameters()
.map(data -> ((OSMEntitySnapshot) data).getOSHEntity()).map(data -> (OSHEntityImpl) data)
.sum(data -> data.getLength() / data.getVersions().iterator().next().getVersion());
if (approxResultSize.doubleValue() > MAX_STREAM_DATA_SIZE) {
mapRed = inputProcessor.processParameters(ComputeMode.AffinityCall);
} else {
mapRed = inputProcessor.processParameters();
}
// on ignite: Use AffinityCall backend, which is the only one properly supporting streaming
// of result data, without buffering the whole result in memory before returning the result.
// This allows to write data out to the client via a chunked HTTP response.
mapRed = inputProcessor.processParameters(ComputeMode.AffinityCall);
} else {
mapRed = inputProcessor.processParameters();
}
Expand Down Expand Up @@ -186,17 +176,11 @@ public static void executeElementsFullHistory(ElementsGeometry elemGeom,
MapReducer<OSMEntitySnapshot> mapRedSnapshot = null;
MapReducer<OSMContribution> mapRedContribution = null;
if (DbConnData.db instanceof OSHDBIgnite) {
final double maxStreamDataSize = 1E7;
Number approxResultSize = snapshotInputProcessor.processParameters()
.map(data -> ((OSMEntitySnapshot) data).getOSHEntity()).map(data -> (OSHEntityImpl) data)
.sum(data -> data.getLength() / data.getVersions().iterator().next().getVersion());
if (approxResultSize.doubleValue() > maxStreamDataSize) {
mapRedSnapshot = snapshotInputProcessor.processParameters(ComputeMode.AffinityCall);
mapRedContribution = inputProcessor.processParameters(ComputeMode.AffinityCall);
} else {
mapRedSnapshot = snapshotInputProcessor.processParameters();
mapRedContribution = inputProcessor.processParameters();
}
// on ignite: Use AffinityCall backend, which is the only one properly supporting streaming
// of result data, without buffering the whole result in memory before returning the result.
// This allows to write data out to the client via a chunked HTTP response.
mapRedSnapshot = snapshotInputProcessor.processParameters(ComputeMode.AffinityCall);
mapRedContribution = inputProcessor.processParameters(ComputeMode.AffinityCall);
} else {
mapRedSnapshot = snapshotInputProcessor.processParameters();
mapRedContribution = inputProcessor.processParameters();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -243,11 +245,9 @@ public void streamElementsResponse(HttpServletResponse servletResponse, DataResp
isFirst = new AtomicReference<>(true);
outputStream.print("\n");
if (isFullHistory) {
contributionStream =
writeStreamResponse(outputJsonGen, contributionStream, outputBuffers, outputStream);
writeStreamResponse(outputJsonGen, contributionStream, outputBuffers, outputStream);
}
snapshotStream =
writeStreamResponse(outputJsonGen, snapshotStream, outputBuffers, outputStream);
writeStreamResponse(outputJsonGen, snapshotStream, outputBuffers, outputStream);
outputStream.print("\n ]\n}\n");
servletResponse.flushBuffer();
}
Expand Down Expand Up @@ -1070,31 +1070,36 @@ private ImmutablePair<List<String>, List<String[]>> createCsvResponseForUsersGro
return new ImmutablePair<>(columnNames, rows);
}

private static ForkJoinPool dataExtractionForkJoinPool = new ForkJoinPool(80);
/** Fills the given stream with output data. */
private Stream<org.wololo.geojson.Feature> writeStreamResponse(
private void writeStreamResponse(
ThreadLocal<JsonGenerator> outputJsonGen, Stream<org.wololo.geojson.Feature> stream,
ThreadLocal<ByteArrayOutputStream> outputBuffers, ServletOutputStream outputStream) {
stream.map(data -> {
try {
outputBuffers.get().reset();
outputJsonGen.get().writeObject(data);
return outputBuffers.get().toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
}).sequential().forEach(data -> {
try {
if (isFirst.get()) {
isFirst.set(false);
} else {
outputStream.print(",");
}
outputStream.write(data);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
return stream;
ThreadLocal<ByteArrayOutputStream> outputBuffers, final ServletOutputStream outputStream)
throws ExecutionException, InterruptedException {
dataExtractionForkJoinPool.submit(() ->
stream.map(data -> {
try {
outputBuffers.get().reset();
outputJsonGen.get().writeObject(data);
return outputBuffers.get().toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
}).parallel().forEach(data -> {
synchronized (outputStream) {
try {
if (isFirst.get()) {
isFirst.set(false);
} else {
outputStream.print(",");
}
outputStream.write(data);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
})
).get();
}

/** Fills a GeoJSON Feature with the groupByBoundaryId and the geometry. */
Expand Down

0 comments on commit 3401db9

Please sign in to comment.