Skip to content

Commit

Permalink
on dwc2json extractor; related to #148
Browse files Browse the repository at this point in the history
  • Loading branch information
jhpoelen committed Mar 9, 2022
1 parent 2460254 commit 30731a6
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import java.util.regex.Pattern;

@Parameters(separators = "= ", commandDescription = "Extract records from DarwinCore archives in line-json")
public class CmdNames extends LoggingPersisting implements Runnable {
public class CmdDwc2Json extends LoggingPersisting implements Runnable {

private InputStream inputStream = System.in;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private JCommander buildCommander() {

// utilities
.addCommand("grep", new CmdGrep(), "match", "findURLs")
.addCommand("dwc", new CmdNames(), "dwc-json", "json-stream", "dwc-json-stream", "dwc-stream")
.addCommand("dwc", new CmdDwc2Json(), "dwc-json", "json-stream", "dwc-json-stream", "dwc-stream")
.addCommand("test", new CmdVerify(), "verify", "check", "validate")
.addCommand("version", new CmdVersion())
.addCommand("seeds", new CmdSeeds())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ private int getBatchSize() {


private String getActivityDescription() {
return "An activity that finds taxonomic names in any encountered DwC-A content (e.g., hash://sha256/... identifiers).";
return "An activity that streams DwC-A content into line-json.";
}

}
81 changes: 40 additions & 41 deletions preston-dwc/src/main/java/org/gbif/dwc/DwCArchiveStreamHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,9 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.rdf.api.IRI;
import org.gbif.dwc.Archive;
import org.gbif.dwc.ArchiveFile;
import org.gbif.dwc.DwcRecordIterator;
import org.gbif.dwc.UnsupportedArchiveException;
import org.gbif.dwc.meta.DwcMetaFiles;
import org.gbif.dwc.record.Record;
import org.gbif.dwc.terms.Term;
Expand All @@ -30,11 +25,9 @@
import java.io.Reader;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public class DwCArchiveStreamHandler implements ContentStreamHandler {

Expand Down Expand Up @@ -66,17 +59,15 @@ public boolean handle(IRI version, InputStream is) throws ContentStreamException


for (Pair<IRI, ArchiveFile> resourceIRIs : dwcaResourceIRIs) {
IRI key = resourceIRIs.getKey();
ClosableIterator<Record> iterator = iterator(resourceIRIs.getRight(), resourceIRIs.getLeft());
while (iterator.hasNext()) {
Record next = iterator.next();
ObjectNode objectNode = new ObjectMapper().createObjectNode();
objectNode.set("contentId", TextNode.valueOf(resourceIRIs.getLeft().getIRIString()));
objectNode.set("rowType", TextNode.valueOf(resourceIRIs.getRight().getRowType().qualifiedName()));
for (Term term : next.terms()) {
objectNode.set(term.qualifiedName(), TextNode.valueOf(next.value(term)));
ArchiveFile file = resourceIRIs.getRight();
try {
TabularDataFileReader<List<String>> tabularFileReader = createReader(file, resourceIRIs.getLeft());
ClosableIterator<Record> iterator = createRecordIterator(file, tabularFileReader);
while (iterator.hasNext()) {
streamAsJson(resourceIRIs, tabularFileReader, iterator.next());
}
System.out.println(objectNode.toString());
} catch (IOException ex) {
throw new ContentStreamException("failed to read or stream dwc records", ex);
}
}

Expand All @@ -88,6 +79,26 @@ public boolean handle(IRI version, InputStream is) throws ContentStreamException
return false;
}

void streamAsJson(Pair<IRI, ArchiveFile> resourceIRIs, TabularDataFileReader<List<String>> tabularFileReader, Record next) {
ObjectNode objectNode = new ObjectMapper().createObjectNode();
objectNode.set("contentId", TextNode.valueOf("line:" + resourceIRIs.getLeft().getIRIString() + "!/L" + tabularFileReader.getLastRecordLineNumber()));
objectNode.set("rowType", TextNode.valueOf(resourceIRIs.getRight().getRowType().qualifiedName()));
for (Term term : next.terms()) {
objectNode.set(term.qualifiedName(), TextNode.valueOf(next.value(term)));
}
System.out.println(objectNode.toString());
}

ClosableIterator<Record> createRecordIterator(ArchiveFile file, TabularDataFileReader<List<String>> tabularFileReader) {
return new DwcRecordIterator(
tabularFileReader,
file.getId(),
file.getFields(),
file.getRowType(),
false,
false);
}

Pair<IRI, ArchiveFile> getLocation(String iriString, ArchiveFile core) {
String baseIRI = StringUtils.substring(iriString, 0, StringUtils.length(iriString) - META_XML.length());

Expand All @@ -100,30 +111,18 @@ public boolean shouldKeepReading() {
}


public ClosableIterator<Record> iterator(ArchiveFile file, IRI resource) {
try {
CharsetDecoder decoder = Charset.forName(file.getEncoding()).newDecoder();
Reader reader = new InputStreamReader(dereferencer.get(resource), decoder);
BufferedReader bufferedReader = new BufferedReader(reader);

TabularDataFileReader<List<String>> tabularFileReader =
TabularFiles.newTabularFileReader(bufferedReader,
file.getFieldsTerminatedByChar(),
file.getLinesTerminatedBy(),
file.getFieldsEnclosedBy(),
file.areHeaderLinesIncluded(),
file.getLinesToSkipBeforeHeader()
);
return new DwcRecordIterator(
tabularFileReader,
file.getId(),
file.getFields(),
file.getRowType(),
false,
false);
} catch (IOException var4) {
throw new UnsupportedArchiveException(var4);
}
TabularDataFileReader<List<String>> createReader(ArchiveFile file, IRI resource) throws IOException {
CharsetDecoder decoder = Charset.forName(file.getEncoding()).newDecoder();
Reader reader = new InputStreamReader(dereferencer.get(resource), decoder);
BufferedReader bufferedReader = new BufferedReader(reader);

return TabularFiles.newTabularFileReader(bufferedReader,
file.getFieldsTerminatedByChar(),
file.getLinesTerminatedBy(),
file.getFieldsEnclosedBy(),
file.areHeaderLinesIncluded(),
file.getLinesToSkipBeforeHeader()
);
}


Expand Down

0 comments on commit 30731a6

Please sign in to comment.