Skip to content

Commit

Permalink
make data extraction thread pool size customizable via CLI
Browse files Browse the repository at this point in the history
  • Loading branch information
tyrasd committed Oct 23, 2019
1 parent fc64923 commit 4f438fd
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
public class Application implements ApplicationRunner {

public static final String API_VERSION = "0.9";
public static final int DEFAULT_TIMEOUT_IN_MILLISECONDS = 100000;
public static final int DEFAULT_NUMBER_OF_CLUSTER_NODES = 24;
public static final int DEFAULT_NUMBER_OF_DATA_EXTRACTION_THREADS = 40;

/** Main method to run this SpringBootApplication. */
public static void main(String[] args) {
Expand Down Expand Up @@ -54,8 +57,9 @@ public static void preRun(ApplicationArguments args) throws Exception {
boolean multithreading = true;
boolean caching = false;
String dbPrefix = null;
long timeoutInMilliseconds = 100000;
int numberOfClusterNodes = 24;
long timeoutInMilliseconds = DEFAULT_TIMEOUT_IN_MILLISECONDS;
int numberOfClusterNodes = DEFAULT_NUMBER_OF_CLUSTER_NODES;
int numberOfDataExtractionThreads = DEFAULT_NUMBER_OF_DATA_EXTRACTION_THREADS;
// only used when tests are executed directly in Eclipse
if (System.getProperty(dbProperty) != null) {
DbConnData.db = new OSHDBH2(System.getProperty(dbProperty));
Expand Down Expand Up @@ -107,10 +111,13 @@ public static void preRun(ApplicationArguments args) throws Exception {
dbPrefix = args.getOptionValues(paramName).get(0);
break;
case "database.timeout":
timeoutInMilliseconds = Long.valueOf(args.getOptionValues(paramName).get(0));
timeoutInMilliseconds = Long.parseLong(args.getOptionValues(paramName).get(0));
break;
case "cluster.servernodes.count":
numberOfClusterNodes = Integer.valueOf(args.getOptionValues(paramName).get(0));
numberOfClusterNodes = Integer.parseInt(args.getOptionValues(paramName).get(0));
break;
case "cluster.dataextraction.threadcount":
numberOfDataExtractionThreads = Integer.parseInt(args.getOptionValues(paramName).get(0));
break;
default:
break;
Expand All @@ -124,6 +131,7 @@ public static void preRun(ApplicationArguments args) throws Exception {
ProcessingData.setTimeout(timeoutInMilliseconds / 1000.0);
DbConnData.db.timeoutInMilliseconds(timeoutInMilliseconds);
ProcessingData.setNumberOfClusterNodes(numberOfClusterNodes);
ProcessingData.setNumberOfDataExtractionThreads(numberOfDataExtractionThreads);
if (DbConnData.db instanceof OSHDBJdbc) {
DbConnData.db = ((OSHDBJdbc) DbConnData.db).multithreading(multithreading);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,6 @@

/** Holds helper methods that are used by the executor classes. */
public class ExecutionUtils {
private static ForkJoinPool dataExtractionForkJoinPool = new ForkJoinPool(80);

private AtomicReference<Boolean> isFirst;
private final ProcessingData processingData;
private final DecimalFormat ratioDf = defineDecimalFormat("#.######");
Expand Down Expand Up @@ -1076,7 +1074,7 @@ private void writeStreamResponse(
ThreadLocal<JsonGenerator> outputJsonGen, Stream<org.wololo.geojson.Feature> stream,
ThreadLocal<ByteArrayOutputStream> outputBuffers, final ServletOutputStream outputStream)
throws ExecutionException, InterruptedException {
dataExtractionForkJoinPool.submit(() ->
ProcessingData.getDataExtractionThreadPool().submit(() ->
stream.map(data -> {
try {
outputBuffers.get().reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Set;
import java.util.concurrent.ForkJoinPool;
import org.geojson.GeoJsonObject;
import org.heigit.bigspatialdata.ohsome.ohsomeapi.executor.RequestParameters;
import org.heigit.bigspatialdata.oshdb.osm.OSMType;
Expand All @@ -13,6 +14,7 @@ public class ProcessingData {

private static Geometry dataPolyGeom;
private static double timeout;
private static ForkJoinPool dataExtractionForkJoinPool = new ForkJoinPool();
private RequestParameters requestParameters;
private String requestUrl;
private BoundaryType boundaryType;
Expand Down Expand Up @@ -166,6 +168,15 @@ public static int getNumberOfClusterNodes() {
public static void setNumberOfClusterNodes(int numberOfClusterNodes) {
ProcessingData.numberOfClusterNodes = numberOfClusterNodes;
}

public static void setNumberOfDataExtractionThreads(int numberOfDataExtractionsThreads) {
dataExtractionForkJoinPool.shutdown();
dataExtractionForkJoinPool = new ForkJoinPool(numberOfDataExtractionsThreads);
}

public static ForkJoinPool getDataExtractionThreadPool() {
return dataExtractionForkJoinPool;
}

public boolean isShareRatio() {
return isShareRatio;
Expand Down

0 comments on commit 4f438fd

Please sign in to comment.