Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to import json dump file (#291) #438

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions src/main/java/de/komoot/photon/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.beust.jcommander.JCommander;
import com.beust.jcommander.ParameterException;
import de.komoot.photon.elasticsearch.Server;
import de.komoot.photon.json.JsonDumpConnector;
import de.komoot.photon.nominatim.NominatimConnector;
import de.komoot.photon.nominatim.NominatimUpdater;
import de.komoot.photon.utils.CorsFilter;
Expand Down Expand Up @@ -64,6 +65,12 @@ public static void main(String[] rawArgs) throws Exception {
return;
}

if (args.getJsonImport() != null) {
shutdownES = true;
startJsonDumpImport(args, esServer, esClient);
return;
}

// no special action specified -> normal mode: start search API
startApi(args, esClient);
} finally {
Expand Down Expand Up @@ -106,6 +113,34 @@ private static void startJsonDump(CommandLineArgs args) {
}
}

/**
* take json dump to fill elastic search index
*
* @param args
* @param esServer
* @param esNodeClient
*/
private static void startJsonDumpImport(CommandLineArgs args, Server esServer, Client esNodeClient) {
try {
esServer.recreateIndex(); // delete previous data
} catch (IOException e) {
log.error("cannot setup index, elastic search config files not readable", e);
return;
}

log.info("starting import from json dump to photon with languages: " + args.getLanguages());
log.info("note: languages should be supplied as contained in the dump.");
de.komoot.photon.elasticsearch.Importer importer = new de.komoot.photon.elasticsearch.Importer(esNodeClient, args.getLanguages());
JsonDumpConnector jsonDumpConnector = new JsonDumpConnector(importer, args.getJsonImport(), args.getJsonImportUid());
try {
jsonDumpConnector.readEntireDatabase();
} catch (Exception e) {
log.info("error importing from json dump: " + e.getMessage());
}

log.info("imported data from json dump to photon with languages: " + args.getLanguages());
}


/**
* take nominatim data to fill elastic search index
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/de/komoot/photon/CommandLineArgs.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ public class CommandLineArgs {
@Parameter(names = "-nominatim-import", description = "import nominatim database into photon (this will delete previous index)")
private boolean nominatimImport = false;

@Parameter(names = "-json-import", description = "specify json dump to feed into photon (this will delete previous index)")
private String jsonImport = null;

@Parameter(names = "-json-import-uid", description = "JSON entry to use for the UID (default is 'uid')")
private String jsonImportUid = "uid";

@Parameter(names = "-languages", description = "languages nominatim importer should import and use at run-time, comma separated (default is 'en,fr,de,it')")
private String languages = "en,fr,de,it";

Expand Down
15 changes: 15 additions & 0 deletions src/main/java/de/komoot/photon/Connector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package de.komoot.photon;

/**
* Connects to a datasource and imports all documents.
*
* @author holger
*/
public interface Connector {

/**
* reads every document from database and must call the Importer
* for every document
*/
void readEntireDatabase();
}
41 changes: 41 additions & 0 deletions src/main/java/de/komoot/photon/ImportProgressMonitor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package de.komoot.photon;

import java.text.MessageFormat;
import java.util.concurrent.atomic.AtomicLong;

import lombok.extern.slf4j.Slf4j;

/**
* Report import progress.
*
* @author holger
*/
@Slf4j
public class ImportProgressMonitor {

private static final int PROGRESS_INTERVAL = 50000;

private final AtomicLong counter = new AtomicLong();
private long startMillis;

public void start() {
startMillis = System.currentTimeMillis();
}

public void progressByOne() {
if (counter.incrementAndGet() % PROGRESS_INTERVAL == 0) {
reportProgress();
}
}

public void finish() {
reportProgress();
}

private void reportProgress() {
final double documentsPerSecond = 1000d * counter.longValue()
/ (System.currentTimeMillis() - startMillis);
log.info(String.format("imported %s documents [%.1f/second]",
MessageFormat.format("{0}", counter.longValue()), documentsPerSecond));
}
}
4 changes: 2 additions & 2 deletions src/main/java/de/komoot/photon/Importer.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ public interface Importer {
*
* @param doc
*/
public void add(PhotonDoc doc);
void add(PhotonDoc doc);

/**
* import is finished
*/
public void finish();
void finish();
}
26 changes: 23 additions & 3 deletions src/main/java/de/komoot/photon/elasticsearch/Importer.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.json.JSONObject;

import java.io.IOException;

Expand Down Expand Up @@ -35,12 +39,28 @@ public Importer(Client esClient, String languages) {
@Override
public void add(PhotonDoc doc) {
try {
this.bulkRequest.add(this.esClient.prepareIndex(indexName, indexType).
setSource(Utils.convert(doc, languages)).setId(doc.getUid()));
add(Utils.convert(doc, languages).bytes(), doc.getUid());
} catch (IOException e) {
log.error("could not bulk add document " + doc.getUid(), e);
return;
}
}

/**
* add a json formatted photon document.
*
* @param source json formatted photon document
*/
public void add(String source, String uidName) {
// this is expensive as ES is parsing the BytesArray again but we would need e.g. jackson that
// parses the string into a Map that could be used from ES without doing the parsing work again
JSONObject json = new JSONObject(source);
String uid = json.getString(uidName);
add(new BytesArray(source), uid);
}

private void add(BytesReference sourceBytes, String uid) {
this.bulkRequest.add(this.esClient.prepareIndex(indexName, indexType)
.setSource(sourceBytes, XContentType.JSON).setId(uid));
this.documentCount += 1;
if (this.documentCount > 0 && this.documentCount % 10000 == 0) {
this.saveDocuments();
Expand Down
59 changes: 59 additions & 0 deletions src/main/java/de/komoot/photon/json/JsonDumpConnector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package de.komoot.photon.json;

import de.komoot.photon.Connector;
import de.komoot.photon.ImportProgressMonitor;
import de.komoot.photon.elasticsearch.Importer;
import lombok.extern.slf4j.Slf4j;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;

/**
* Import json dump data.
*
* @author holger
*/
@Slf4j
public class JsonDumpConnector implements Connector {

private BufferedReader reader;
private final Importer importer;
private String uidName;

public JsonDumpConnector(de.komoot.photon.Importer importer, String filename, String uidName) {
this.uidName = uidName;
if (importer instanceof Importer) {
this.importer = (Importer) importer;
} else {
throw new IllegalArgumentException(String.format(
"Only importer of type %1 is supported.", Importer.class.getName()));
}
try {
this.reader = new BufferedReader(new FileReader(filename));
} catch (FileNotFoundException e) {
throw new RuntimeException("Json dump file '%1' not found " + filename, e);
}
}

public void readEntireDatabase() {
final ImportProgressMonitor progressMonitor = new ImportProgressMonitor();
progressMonitor.start();

String sourceLine;
try {
while ((reader.readLine()) != null) {
// first line is action line, currently we assume / support only indexing
sourceLine = reader.readLine();
if (sourceLine != null)
importer.add(sourceLine, uidName);

progressMonitor.progressByOne();
}
progressMonitor.finish();
} catch (IOException e) {
log.error("Error importing from json dump file", e);
}
}
}