Skip to content

Commit

Permalink
'#1487 Call nominatim query asynchronously in a future object and
Browse files Browse the repository at this point in the history
reenqueue the item till a response is available. Add metadata info for
country, state, city and suburb.
  • Loading branch information
patrickdalla committed Sep 19, 2023
1 parent 2774ced commit 4189c3d
Showing 1 changed file with 104 additions and 31 deletions.
135 changes: 104 additions & 31 deletions iped-geo/src/main/java/iped/geo/nominatim/NominatimTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -21,6 +25,10 @@
import org.apache.http.message.BasicHeaderElementIterator;
import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -38,6 +46,14 @@ public class NominatimTask extends AbstractTask {
static final String NOMINATIM_METADATA = "nominatim:geojson";
protected static final long CUSTOM_KEEP_ALIVE = 5000;

public static final String NOMINATIM_COUNTRY_METADATA = "nominatim:country";
public static final String NOMINATIM_STATE_METADATA = "nominatim:state";
public static final String NOMINATIM_CITY_METADATA = "nominatim:city";
public static final String NOMINATIM_ADDRESSTYPE_METADATA = "nominatim:addrestype";
public static final String NOMINATIM_SUBURB_METADATA = "nominatim:suburb";

static int MAXTOTAL = 200;

/* The http connection pool is static to be used application wide */
static PoolingHttpClientConnectionManager cm = null;
static ConnectionKeepAliveStrategy myStrategy;
Expand All @@ -57,14 +73,16 @@ public List<Configurable<?>> getConfigurables() {
return Arrays.asList(new NominatimConfig());
}

static private ExecutorService queryExecutor = Executors.newFixedThreadPool(10);

@Override
public void init(ConfigurationManager configurationManager) throws Exception {
count.incrementAndGet();
if (cm == null) {
nominatimConfig = (NominatimConfig) configurationManager.findObject(NominatimConfig.class);

cm = new PoolingHttpClientConnectionManager();
cm.setMaxTotal(200);
cm.setMaxTotal(MAXTOTAL);
cm.setDefaultMaxPerRoute(nominatimConfig.getConnectionPoolSize());
myStrategy = new ConnectionKeepAliveStrategy() {
@Override
Expand Down Expand Up @@ -116,45 +134,100 @@ public void finish() throws Exception {
}
}

private Future<String> executeQuery(String lat, String longit) {
HttpGet get = new HttpGet(baseUrl + "/reverse?addressdetails=1&format=geojson&lat=" + lat + "&lon=" + longit);

Future<String> f = queryExecutor.submit(() -> {
try {
try (CloseableHttpResponse response = httpClient.execute(get)) {
try (BufferedReader in = new BufferedReader(
new InputStreamReader(response.getEntity().getContent()))) {
String inputLine;
StringBuffer content = new StringBuffer();
while ((inputLine = in.readLine()) != null) {
content.append(inputLine);
}

return content.toString();
}
}
} catch (ClientProtocolException cpe) {
cpe.printStackTrace();
LOGGER.warn(cpe.getMessage());
} catch (Exception e) {
LOGGER.warn(e.getMessage());
e.printStackTrace();
}
return null;
});

return f;
}

static HashMap<IItem, Future<String>> queries = new HashMap<IItem, Future<String>>();

@Override
protected void process(IItem evidence) throws Exception {
if (!unresolvedServer) {
String featureString = evidence.getMetadata().get(GeofileParser.FEATURE_STRING);

if (featureString == null || featureString.length() < 5) {
String location = evidence.getMetadata().get(ExtraProperties.LOCATIONS);

if (location != null && location.length() >= 1) {
String[] locs = location.split(";"); //$NON-NLS-1$
String lat = locs[0].trim();
String longit = locs[1].trim();

try {
HttpGet get = new HttpGet(baseUrl + "/reverse?format=geojson&lat=" + lat + "&lon=" + longit);
try (CloseableHttpResponse response = httpClient.execute(get)) {
try (BufferedReader in = new BufferedReader(
new InputStreamReader(response.getEntity().getContent()))) {
String inputLine;
StringBuffer content = new StringBuffer();
while ((inputLine = in.readLine()) != null) {
content.append(inputLine);
}

evidence.getMetadata().add(NOMINATIM_METADATA, content.toString());
}
}
} catch (ClientProtocolException cpe) {
cpe.printStackTrace();
LOGGER.warn(cpe.getMessage());
} catch (Exception e) {
LOGGER.warn(e.getMessage());
e.printStackTrace();
Future<String> result = queries.get(evidence);
if (result != null) {
if (result.isDone()) {
String content = result.get();
processNominatimResult(evidence, content);
queries.remove(evidence);
} else {
reEnqueueItem(evidence);
}
} else {
String featureString = evidence.getMetadata().get(GeofileParser.FEATURE_STRING);

if (featureString == null || featureString.length() < 5) {
String location = evidence.getMetadata().get(ExtraProperties.LOCATIONS);

if (location != null && location.length() >= 1) {
String[] locs = location.split(";"); //$NON-NLS-1$
String lat = locs[0].trim();
String longit = locs[1].trim();

Future<String> f = executeQuery(lat, longit);
queries.put(evidence, f);

reEnqueueItem(evidence);
}
}
}

}
}

private void processNominatimResult(IItem evidence, String content) {
evidence.getMetadata().add(NOMINATIM_METADATA, content);

try {
JSONObject obj = (JSONObject) new JSONParser().parse(content);
JSONArray features = (JSONArray) obj.get("features");
JSONObject properties = (JSONObject) ((JSONObject) features.get(0)).get("properties");
JSONObject address = (JSONObject) properties.get("address");
String country = (String) address.get("country");
evidence.getMetadata().add(NOMINATIM_COUNTRY_METADATA, country);
String state = (String) address.get("state");
evidence.getMetadata().add(NOMINATIM_STATE_METADATA, country + ":" + state);
String city = (String) address.get("city");
evidence.getMetadata().add(NOMINATIM_CITY_METADATA, country + ":" + state + ":" + city);
String suburb = (String) address.get("suburb");
if (suburb != null) {
evidence.getMetadata().add(NOMINATIM_SUBURB_METADATA,
country + ":" + state + ":" + city + ":" + suburb);
}
String addresstype = (String) properties.get("addresstype");
evidence.getMetadata().add(NOMINATIM_ADDRESSTYPE_METADATA, addresstype);
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

class IdleConnectionMonitorThread extends Thread {
private final HttpClientConnectionManager connMgr;
private volatile boolean shutdown;
Expand Down

0 comments on commit 4189c3d

Please sign in to comment.