Skip to content

Commit

Permalink
Merge branch 'improve-data-extraction' into 'master'
Browse files Browse the repository at this point in the history
more effective data extraction

Closes #60

See merge request giscience/big-data/ohsome/ohsome-api!30
  • Loading branch information
tyrasd committed Dec 11, 2019
2 parents 1d7a1ec + fe71fbe commit f19e5b2
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 68 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<properties>
<springboot.version>2.0.3.RELEASE</springboot.version>
<jsonpath.version>2.2.0</jsonpath.version>
<oshdb.version>0.5.4</oshdb.version>
<oshdb.version>0.5.5</oshdb.version>
<jacksondatatype.version>2.9.6</jacksondatatype.version>
<geotools.version>20.2</geotools.version>
<springfox.version>2.9.2</springfox.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
public class Application implements ApplicationRunner {

public static final String API_VERSION = "0.9";
public static final int DEFAULT_TIMEOUT_IN_MILLISECONDS = 100000;
public static final int DEFAULT_NUMBER_OF_CLUSTER_NODES = 24;
public static final int DEFAULT_NUMBER_OF_DATA_EXTRACTION_THREADS = 40;

/** Main method to run this SpringBootApplication. */
public static void main(String[] args) {
Expand All @@ -40,12 +43,14 @@ public static void main(String[] args) {
preRun(new DefaultApplicationArguments(args));
SpringApplication.run(Application.class, args);
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}

/**
* Reads and sets the given application arguments and makes a connection to the OSHDB.
*
* @param args Application arguments given over the commandline on startup
* @throws Exception if the connection to the db cannot be established
*/
Expand All @@ -54,8 +59,9 @@ public static void preRun(ApplicationArguments args) throws Exception {
boolean multithreading = true;
boolean caching = false;
String dbPrefix = null;
long timeoutInMilliseconds = 100000;
int numberOfClusterNodes = 24;
long timeoutInMilliseconds = DEFAULT_TIMEOUT_IN_MILLISECONDS;
int numberOfClusterNodes = DEFAULT_NUMBER_OF_CLUSTER_NODES;
int numberOfDataExtractionThreads = DEFAULT_NUMBER_OF_DATA_EXTRACTION_THREADS;
// only used when tests are executed directly in Eclipse
if (System.getProperty(dbProperty) != null) {
DbConnData.db = new OSHDBH2(System.getProperty(dbProperty));
Expand Down Expand Up @@ -107,10 +113,14 @@ public static void preRun(ApplicationArguments args) throws Exception {
dbPrefix = args.getOptionValues(paramName).get(0);
break;
case "database.timeout":
timeoutInMilliseconds = Long.valueOf(args.getOptionValues(paramName).get(0));
timeoutInMilliseconds = Long.parseLong(args.getOptionValues(paramName).get(0));
break;
case "cluster.servernodes.count":
numberOfClusterNodes = Integer.valueOf(args.getOptionValues(paramName).get(0));
numberOfClusterNodes = Integer.parseInt(args.getOptionValues(paramName).get(0));
break;
case "cluster.dataextraction.threadcount":
numberOfDataExtractionThreads =
Integer.parseInt(args.getOptionValues(paramName).get(0));
break;
default:
break;
Expand All @@ -124,6 +134,7 @@ public static void preRun(ApplicationArguments args) throws Exception {
ProcessingData.setTimeout(timeoutInMilliseconds / 1000.0);
DbConnData.db.timeoutInMilliseconds(timeoutInMilliseconds);
ProcessingData.setNumberOfClusterNodes(numberOfClusterNodes);
ProcessingData.setNumberOfDataExtractionThreads(numberOfDataExtractionThreads);
if (DbConnData.db instanceof OSHDBJdbc) {
DbConnData.db = ((OSHDBJdbc) DbConnData.db).multithreading(multithreading);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.heigit.bigspatialdata.oshdb.api.mapreducer.MapReducer;
import org.heigit.bigspatialdata.oshdb.api.object.OSMContribution;
import org.heigit.bigspatialdata.oshdb.api.object.OSMEntitySnapshot;
import org.heigit.bigspatialdata.oshdb.impl.osh.OSHEntityImpl;
import org.heigit.bigspatialdata.oshdb.osm.OSMEntity;
import org.heigit.bigspatialdata.oshdb.osm.OSMType;
import org.heigit.bigspatialdata.oshdb.util.OSHDBTag;
Expand All @@ -71,7 +70,6 @@ public class ElementsRequestExecutor {
public static final String URL = ExtractMetadata.attributionUrl;
public static final String TEXT = ExtractMetadata.attributionShort;
public static final DecimalFormat df = ExecutionUtils.defineDecimalFormat("#.##");
private static final double MAX_STREAM_DATA_SIZE = 1E7;

private ElementsRequestExecutor() {
throw new IllegalStateException("Utility class");
Expand Down Expand Up @@ -103,19 +101,10 @@ public static void executeElements(ElementsGeometry elemGeom, HttpServletRequest
final boolean includeOSMMetadata = inputProcessor.includeOSMMetadata();
final boolean unclippedGeometries = inputProcessor.isUnclipped();
if (DbConnData.db instanceof OSHDBIgnite) {
// do a preflight to get an approximate result data size estimation:
// for now just the sum of the average size of the objects versions in bytes is used
// if that number is larger than 10MB, then fall back to the slightly slower, but much
// less memory intensive streaming implementation (which is currently only available on
// the ignite "AffinityCall" backend).
Number approxResultSize = inputProcessor.processParameters()
.map(data -> ((OSMEntitySnapshot) data).getOSHEntity()).map(data -> (OSHEntityImpl) data)
.sum(data -> data.getLength() / data.getVersions().iterator().next().getVersion());
if (approxResultSize.doubleValue() > MAX_STREAM_DATA_SIZE) {
mapRed = inputProcessor.processParameters(ComputeMode.AffinityCall);
} else {
mapRed = inputProcessor.processParameters();
}
// on ignite: Use AffinityCall backend, which is the only one properly supporting streaming
// of result data, without buffering the whole result in memory before returning the result.
// This allows to write data out to the client via a chunked HTTP response.
mapRed = inputProcessor.processParameters(ComputeMode.AffinityCall);
} else {
mapRed = inputProcessor.processParameters();
}
Expand Down Expand Up @@ -186,17 +175,11 @@ public static void executeElementsFullHistory(ElementsGeometry elemGeom,
MapReducer<OSMEntitySnapshot> mapRedSnapshot = null;
MapReducer<OSMContribution> mapRedContribution = null;
if (DbConnData.db instanceof OSHDBIgnite) {
final double maxStreamDataSize = 1E7;
Number approxResultSize = snapshotInputProcessor.processParameters()
.map(data -> ((OSMEntitySnapshot) data).getOSHEntity()).map(data -> (OSHEntityImpl) data)
.sum(data -> data.getLength() / data.getVersions().iterator().next().getVersion());
if (approxResultSize.doubleValue() > maxStreamDataSize) {
mapRedSnapshot = snapshotInputProcessor.processParameters(ComputeMode.AffinityCall);
mapRedContribution = inputProcessor.processParameters(ComputeMode.AffinityCall);
} else {
mapRedSnapshot = snapshotInputProcessor.processParameters();
mapRedContribution = inputProcessor.processParameters();
}
// on ignite: Use AffinityCall backend, which is the only one properly supporting streaming
// of result data, without buffering the whole result in memory before returning the result.
// This allows to write data out to the client via a chunked HTTP response.
mapRedSnapshot = snapshotInputProcessor.processParameters(ComputeMode.AffinityCall);
mapRedContribution = inputProcessor.processParameters(ComputeMode.AffinityCall);
} else {
mapRedSnapshot = snapshotInputProcessor.processParameters();
mapRedContribution = inputProcessor.processParameters();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.util.DefaultIndenter;
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.opencsv.CSVWriter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.text.DecimalFormat;
import java.text.DecimalFormatSymbols;
import java.util.ArrayList;
Expand All @@ -21,7 +24,11 @@
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Expand Down Expand Up @@ -86,8 +93,7 @@

/** Holds helper methods that are used by the executor classes. */
public class ExecutionUtils {

AtomicReference<Boolean> isFirst;
private AtomicReference<Boolean> isFirst;
private final ProcessingData processingData;
private final DecimalFormat ratioDf = defineDecimalFormat("#.######");

Expand Down Expand Up @@ -223,31 +229,38 @@ public void streamElementsResponse(HttpServletResponse servletResponse, DataResp
jsonFactory.createGenerator(tempStream, JsonEncoding.UTF8).setCodec(objMapper)
.writeObject(osmData);

String scaffold = tempStream.toString("UTF-8").replaceFirst("]\\r?\\n?\\W*}\\r?\\n?\\W*$", "");
String scaffold = tempStream.toString("UTF-8").replaceFirst("\\s*]\\s*}\\s*$", "");

servletResponse.setContentType("application/geo+json; charset=utf-8");
ServletOutputStream outputStream = servletResponse.getOutputStream();
outputStream.write(scaffold.getBytes("UTF-8"));
outputStream.write(scaffold.getBytes(StandardCharsets.UTF_8));

ThreadLocal<ByteArrayOutputStream> outputBuffers =
ThreadLocal.withInitial(ByteArrayOutputStream::new);
ThreadLocal<JsonGenerator> outputJsonGen = ThreadLocal.withInitial(() -> {
try {
DefaultPrettyPrinter.Indenter indenter = new DefaultIndenter() {
@Override
public void writeIndentation(JsonGenerator g, int level) throws IOException {
super.writeIndentation(g, level + 1);
}
};
DefaultPrettyPrinter printer = new DefaultPrettyPrinter("")
.withArrayIndenter(indenter)
.withObjectIndenter(indenter);
return jsonFactory.createGenerator(outputBuffers.get(), JsonEncoding.UTF8)
.setCodec(objMapper);
.setCodec(objMapper)
.setPrettyPrinter(printer);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
isFirst = new AtomicReference<>(true);
outputStream.print("\n");
if (isFullHistory) {
contributionStream =
writeStreamResponse(outputJsonGen, contributionStream, outputBuffers, outputStream);
writeStreamResponse(outputJsonGen, contributionStream, outputBuffers, outputStream);
}
snapshotStream =
writeStreamResponse(outputJsonGen, snapshotStream, outputBuffers, outputStream);
outputStream.print("\n ]\n}\n");
writeStreamResponse(outputJsonGen, snapshotStream, outputBuffers, outputStream);
outputStream.print("]\n}\n");
servletResponse.flushBuffer();
}

Expand Down Expand Up @@ -1069,31 +1082,54 @@ private ImmutablePair<List<String>, List<String[]>> createCsvResponseForUsersGro
return new ImmutablePair<>(columnNames, rows);
}

/** Fills the given stream with output data. */
private Stream<org.wololo.geojson.Feature> writeStreamResponse(
ThreadLocal<JsonGenerator> outputJsonGen, Stream<org.wololo.geojson.Feature> stream,
ThreadLocal<ByteArrayOutputStream> outputBuffers, ServletOutputStream outputStream) {
stream.map(data -> {
try {
outputBuffers.get().reset();
outputJsonGen.get().writeObject(data);
return outputBuffers.get().toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
}).sequential().forEach(data -> {
try {
if (isFirst.get()) {
isFirst.set(false);
} else {
outputStream.print(",");
/** Fills the given stream with output data using multiple parallel threads. */
private void writeStreamResponse(ThreadLocal<JsonGenerator> outputJsonGen,
Stream<org.wololo.geojson.Feature> stream, ThreadLocal<ByteArrayOutputStream> outputBuffers,
final ServletOutputStream outputStream)
throws ExecutionException, InterruptedException, IOException {
ReentrantLock lock = new ReentrantLock();
AtomicBoolean errored = new AtomicBoolean(false);
ForkJoinPool threadPool = new ForkJoinPool(ProcessingData.getNumberOfDataExtractionThreads());
try {
threadPool.submit(() -> stream.parallel().map(data -> {
// 1. convert features to geojson
try {
outputBuffers.get().reset();
outputJsonGen.get().writeObject(data);
return outputBuffers.get().toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
outputStream.write(data);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
return stream;
}).forEach(data -> {
// 2. write data out to client
// only 1 thread is allowed to write at once!
lock.lock();
if (errored.get()) {
// when any one thread experienced an exception (e.g. a client disconnects):
// the "errored" flag is set and all threads abort themselves by throwing an exception
lock.unlock();
throw new RuntimeException();
}
try {
// separate features in the result by a comma, except for the very first one
if (isFirst.get()) {
isFirst.set(false);
} else {
outputStream.print(", ");
}
// write the feature
outputStream.write(data);
} catch (IOException e) {
errored.set(true);
throw new RuntimeException(e);
} finally {
lock.unlock();
}
})).get();
} finally {
threadPool.shutdown();
outputStream.flush();
}
}

/** Fills a GeoJSON Feature with the groupByBoundaryId and the geometry. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public class ProcessingData {

private static Geometry dataPolyGeom;
private static double timeout;
private static int numberOfDataExtractionThreads = 1;
private RequestParameters requestParameters;
private String requestUrl;
private BoundaryType boundaryType;
Expand Down Expand Up @@ -158,15 +159,23 @@ public Set<SimpleFeatureType> getSimpleFeatureTypes() {
public void setSimpleFeatureTypes(Set<SimpleFeatureType> simpleFeatureTypes) {
this.simpleFeatureTypes = (EnumSet<SimpleFeatureType>) simpleFeatureTypes;
}

public static int getNumberOfClusterNodes() {
return numberOfClusterNodes;
}

public static void setNumberOfClusterNodes(int numberOfClusterNodes) {
ProcessingData.numberOfClusterNodes = numberOfClusterNodes;
}


public static void setNumberOfDataExtractionThreads(int numberOfDataExtractionsThreads) {
numberOfDataExtractionThreads = numberOfDataExtractionsThreads;
}

public static int getNumberOfDataExtractionThreads() {
return numberOfDataExtractionThreads;
}

public boolean isShareRatio() {
return isShareRatio;
}
Expand Down

0 comments on commit f19e5b2

Please sign in to comment.