Skip to content

Commit

Permalink
Merge branch 'stream-elements' into 'master'
Browse files Browse the repository at this point in the history
use oshdb streaming api for large geojson requests

See merge request giscience/big-data/ohsome/ohsome-api!5
  • Loading branch information
FabiKo117 committed Oct 11, 2018
2 parents 48d5f17 + 3ffde2a commit d7107bc
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package org.heigit.bigspatialdata.ohsome.ohsomeapi.executor;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -11,7 +14,9 @@
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
Expand All @@ -27,18 +32,21 @@
import org.heigit.bigspatialdata.ohsome.ohsomeapi.inputprocessing.InputProcessor;
import org.heigit.bigspatialdata.ohsome.ohsomeapi.inputprocessing.ProcessingData;
import org.heigit.bigspatialdata.ohsome.ohsomeapi.interceptor.RequestInterceptor;
import org.heigit.bigspatialdata.ohsome.ohsomeapi.oshdb.RemoteTagTranslator;
import org.heigit.bigspatialdata.ohsome.ohsomeapi.oshdb.DbConnData;
import org.heigit.bigspatialdata.ohsome.ohsomeapi.oshdb.ExtractMetadata;
import org.heigit.bigspatialdata.ohsome.ohsomeapi.oshdb.RemoteTagTranslator;
import org.heigit.bigspatialdata.ohsome.ohsomeapi.output.Description;
import org.heigit.bigspatialdata.ohsome.ohsomeapi.output.dataaggregationresponse.Attribution;
import org.heigit.bigspatialdata.ohsome.ohsomeapi.output.dataaggregationresponse.DefaultAggregationResponse;
import org.heigit.bigspatialdata.ohsome.ohsomeapi.output.dataaggregationresponse.Metadata;
import org.heigit.bigspatialdata.ohsome.ohsomeapi.output.dataaggregationresponse.Response;
import org.heigit.bigspatialdata.ohsome.ohsomeapi.output.dataaggregationresponse.StreamMetadata;
import org.heigit.bigspatialdata.ohsome.ohsomeapi.output.dataaggregationresponse.elements.ElementsResult;
import org.heigit.bigspatialdata.ohsome.ohsomeapi.output.dataaggregationresponse.groupbyresponse.GroupByResponse;
import org.heigit.bigspatialdata.ohsome.ohsomeapi.output.dataaggregationresponse.groupbyresponse.GroupByResult;
import org.heigit.bigspatialdata.ohsome.ohsomeapi.output.rawdataresponse.DataResponse;
import org.heigit.bigspatialdata.oshdb.api.db.OSHDBIgnite;
import org.heigit.bigspatialdata.oshdb.api.db.OSHDBIgnite.ComputeMode;
import org.heigit.bigspatialdata.oshdb.api.generic.OSHDBCombinedIndex;
import org.heigit.bigspatialdata.oshdb.api.generic.function.SerializableFunction;
import org.heigit.bigspatialdata.oshdb.api.mapreducer.MapAggregator;
Expand Down Expand Up @@ -81,7 +89,6 @@ public class ElementsRequestExecutor {
public static void executeElements(RequestParameters requestParams, String osmMetadata,
String includeOSMTags, boolean isGeom, HttpServletResponse response)
throws UnsupportedOperationException, Exception {
final long startTime = System.currentTimeMillis();
MapReducer<OSMEntitySnapshot> mapRed = null;
InputProcessor inputProcessor = new InputProcessor();
String requestUrl = null;
Expand All @@ -102,7 +109,26 @@ public static void executeElements(RequestParameters requestParams, String osmMe
} else {
includeTags = false;
}
mapRed = inputProcessor.processParameters(mapRed, requestParams);
if (DbConnData.db instanceof OSHDBIgnite) {
final OSHDBIgnite dbIgnite = (OSHDBIgnite) DbConnData.db;
ComputeMode previousCM = dbIgnite.computeMode();
// do a preflight to get an approximate result data size estimation:
// for now just the sum of the average size of the objects versions in bytes is used
// if that number is larger than 10MB, then fall back to the slightly slower, but much
// less memory intensive streaming implementation (which is currently only available on
// the ignite "AffinityCall" backend).
final double MAX_STREAM_DATA_SIZE = 1E7;
Number approxResultSize = inputProcessor.processParameters(mapRed, requestParams)
.map(data -> ((OSMEntitySnapshot) data).getOSHEntity())
.sum(data -> data.getLength() / data.getLatest().getVersion());
if (approxResultSize.doubleValue() > MAX_STREAM_DATA_SIZE) {
dbIgnite.computeMode(ComputeMode.AffinityCall);
}
mapRed = inputProcessor.processParameters(mapRed, requestParams);
dbIgnite.computeMode(previousCM);
} else {
mapRed = inputProcessor.processParameters(mapRed, requestParams);
}
TagTranslator tt = DbConnData.tagTranslator;
String[] keys = requestParams.getKeys();
String[] values = requestParams.getValues();
Expand All @@ -117,8 +143,7 @@ public static void executeElements(RequestParameters requestParams, String osmMe
}
}
}
MapReducer<Feature> preResult = null;
List<Feature> result = null;
final MapReducer<Feature> preResult;
ExecutionUtils exeUtils = new ExecutionUtils();
GeoJSONWriter gjw = new GeoJSONWriter();
RemoteTagTranslator mapTagTranslator = DbConnData.mapTagTranslator;
Expand All @@ -145,25 +170,62 @@ public static void executeElements(RequestParameters requestParams, String osmMe
return new org.wololo.geojson.Feature(gjw.write(snapshot.getGeometry()), null);
});
}
result = preResult.collect();
Metadata metadata = null;
Stream<Feature> streamResult = preResult.stream();
StreamMetadata metadata = null;
if (ProcessingData.showMetadata) {
long duration = System.currentTimeMillis() - startTime;
metadata = new Metadata(duration, "Raw OSM data.", requestUrl);
metadata = new StreamMetadata("OSM data as GeoJSON features.", requestUrl);
}
DataResponse osmData = new DataResponse(new Attribution(url, text), Application.apiVersion,
metadata, "FeatureCollection", result);

metadata, "FeatureCollection", Collections.emptyList());
JsonFactory jsonFactory = new JsonFactory();
ServletOutputStream stream = response.getOutputStream();
response.addHeader("Content-disposition", "attachment;filename=ohsomeApiResponse.json");
response.setContentType("application/json");
JsonGenerator jsonGen = jsonFactory.createGenerator(stream, JsonEncoding.UTF8);
ByteArrayOutputStream tempStream = new ByteArrayOutputStream();

ObjectMapper objMapper = new ObjectMapper();
objMapper.enable(SerializationFeature.INDENT_OUTPUT);
objMapper.setSerializationInclusion(Include.NON_NULL);
jsonGen.setCodec(objMapper);
jsonGen.writeObject(osmData);
jsonFactory.createGenerator(tempStream, JsonEncoding.UTF8).setCodec(objMapper)
.writeObject(osmData);

String scaffold = tempStream.toString("UTF-8").replaceFirst("]\\r?\\n?\\W*}\\r?\\n?\\W*$", "");

response.addHeader("Content-disposition", "attachment;filename=ohsome.geojson");
response.setContentType("application/geo+json; charset=utf-8");
ServletOutputStream outputStream = response.getOutputStream();
outputStream.write(scaffold.getBytes("UTF-8"));

ThreadLocal<ByteArrayOutputStream> outputBuffers =
ThreadLocal.withInitial(ByteArrayOutputStream::new);
ThreadLocal<JsonGenerator> outputJsonGen = ThreadLocal.withInitial(() -> {
try {
return jsonFactory.createGenerator(outputBuffers.get(), JsonEncoding.UTF8)
.setCodec(objMapper);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
outputStream.print("\n");
AtomicReference<Boolean> isFirst = new AtomicReference<>(true);
streamResult.map(data -> {
try {
outputBuffers.get().reset();
outputJsonGen.get().writeObject(data);
return outputBuffers.get().toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
}).sequential().forEach(data -> {
try {
if (isFirst.get()) {
isFirst.set(false);
} else {
outputStream.print(",");
}
outputStream.write(data);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
outputStream.print("\n ]\n}\n");
response.flushBuffer();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.heigit.bigspatialdata.ohsome.ohsomeapi.output.dataaggregationresponse;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import io.swagger.annotations.ApiModelProperty;

/**
* Represents the metadata JSON object containing various metadata fields about the request:
* for example, execution time, unit, a description of the result values, or the request URL.
*/
@JsonInclude(Include.NON_NULL)
public abstract class BaseMetadata {
public BaseMetadata() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* result values, as well as the request URL.
*/
@JsonInclude(Include.NON_NULL)
public class Metadata {
public class Metadata extends BaseMetadata {

@ApiModelProperty(notes = "Time the server needed to execute the request", required = true)
private long executionTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ public interface Response {

public String getApiVersion();

public Metadata getMetadata();
public BaseMetadata getMetadata();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.heigit.bigspatialdata.ohsome.ohsomeapi.output.dataaggregationresponse;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import io.swagger.annotations.ApiModelProperty;

/**
* Represents the metadata JSON object containing the execution time, the unit, a description of the
* result values, as well as the request URL.
*/
@JsonInclude(Include.NON_NULL)
public class StreamMetadata extends BaseMetadata {

@ApiModelProperty(notes = "Text describing the result in a sentence", required = true)
private String description;
@ApiModelProperty(notes = "Request URL to which this whole output JSON was generated",
required = true)
private String requestUrl;

public StreamMetadata(String description, String requestUrl) {
this.description = description;
this.requestUrl = requestUrl;
}

public String getDescription() {
return description;
}

public String getRequestUrl() {
return requestUrl;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import io.swagger.annotations.ApiModelProperty;
import java.util.List;
import org.heigit.bigspatialdata.ohsome.ohsomeapi.output.dataaggregationresponse.Attribution;
import org.heigit.bigspatialdata.ohsome.ohsomeapi.output.dataaggregationresponse.Metadata;
import org.heigit.bigspatialdata.ohsome.ohsomeapi.output.dataaggregationresponse.Response;
import org.heigit.bigspatialdata.ohsome.ohsomeapi.output.dataaggregationresponse.StreamMetadata;
import org.wololo.geojson.Feature;

/**
Expand All @@ -20,13 +20,13 @@ public class DataResponse implements Response {
@ApiModelProperty(notes = "Version of this api", required = true)
private String apiVersion;
@ApiModelProperty(notes = "Metadata describing the output")
private Metadata metadata;
private StreamMetadata metadata;
@ApiModelProperty(notes = "Type of the GeoJSON", required = true)
private String type;
@ApiModelProperty(notes = "List of GeoJSON features containing the OSM data")
private List<Feature> features;

public DataResponse(Attribution attribution, String apiVersion, Metadata metadata, String type,
public DataResponse(Attribution attribution, String apiVersion, StreamMetadata metadata, String type,
List<Feature> features) {
this.attribution = attribution;
this.apiVersion = apiVersion;
Expand All @@ -46,7 +46,7 @@ public String getApiVersion() {
}

@Override
public Metadata getMetadata() {
public StreamMetadata getMetadata() {
return metadata;
}

Expand Down

0 comments on commit d7107bc

Please sign in to comment.