diff --git a/metafacture-json/build.gradle b/metafacture-json/build.gradle index b349373b9..dd63d0eca 100644 --- a/metafacture-json/build.gradle +++ b/metafacture-json/build.gradle @@ -20,6 +20,8 @@ description = 'Modules for processing JSON data in Metafacture' dependencies { api project(':metafacture-framework') implementation 'com.fasterxml.jackson.core:jackson-core:2.8.5' + implementation 'com.fasterxml.jackson.core:jackson-databind:2.8.5' + implementation 'com.jayway.jsonpath:json-path:2.6.0' testImplementation 'junit:junit:4.12' testImplementation 'org.mockito:mockito-core:2.5.5' } diff --git a/metafacture-json/src/main/java/org/metafacture/json/JsonDecoder.java b/metafacture-json/src/main/java/org/metafacture/json/JsonDecoder.java index 80de6981b..f6dbd2238 100644 --- a/metafacture-json/src/main/java/org/metafacture/json/JsonDecoder.java +++ b/metafacture-json/src/main/java/org/metafacture/json/JsonDecoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2017 hbz + * Copyright 2017, 2021 hbz * * Licensed under the Apache License, Version 2.0 the "License"; * you may not use this file except in compliance with the License. @@ -17,12 +17,23 @@ import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.jayway.jsonpath.JsonPath; + +import org.metafacture.framework.FluxCommand; import org.metafacture.framework.MetafactureException; import org.metafacture.framework.StreamReceiver; +import org.metafacture.framework.annotations.Description; +import org.metafacture.framework.annotations.In; +import org.metafacture.framework.annotations.Out; import org.metafacture.framework.helpers.DefaultObjectPipe; import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; /** * Decodes a record in JSON format. @@ -30,6 +41,11 @@ * @author Jens Wille * */ +@Description("Decodes JSON to metadata events. The \'recordPath\' option can be used to set a JsonPath " + + "to extract a path as JSON - or to split the data into multiple JSON documents.") +@In(String.class) +@Out(StreamReceiver.class) +@FluxCommand("decode-json") public final class JsonDecoder extends DefaultObjectPipe { public static final String DEFAULT_ARRAY_MARKER = JsonEncoder.ARRAY_MARKER; @@ -38,6 +54,8 @@ public final class JsonDecoder extends DefaultObjectPipe public static final String DEFAULT_RECORD_ID = "%d"; + public static final String DEFAULT_ROOT_PATH = ""; + private final JsonFactory jsonFactory = new JsonFactory(); private JsonParser jsonParser; @@ -46,12 +64,15 @@ public final class JsonDecoder extends DefaultObjectPipe private String recordId; private int recordCount; + private String recordPath; + public JsonDecoder() { super(); setArrayMarker(DEFAULT_ARRAY_MARKER); setArrayName(DEFAULT_ARRAY_NAME); setRecordId(DEFAULT_RECORD_ID); + setRecordPath(DEFAULT_ROOT_PATH); resetRecordCount(); } @@ -96,25 +117,45 @@ public int getRecordCount() { return recordCount; } + public void setRecordPath(final String recordPath) { + this.recordPath = recordPath; + } + + public String getRecordPath() { + return recordPath; + } + public void resetRecordCount() { setRecordCount(0); } @Override - public void process(final String string) { + public void process(final String json) { assert !isClosed(); - - createParser(string); - - try { - decode(); - } - catch (final IOException e) { - throw new MetafactureException(e); - } - finally { - closeParser(); - } + final List records = recordPath.isEmpty() ? Arrays.asList(json) + : matches(JsonPath.read(json, recordPath)); + records.forEach(record -> { + createParser(record); + try { + decode(); + } catch (final IOException e) { + throw new MetafactureException(e); + } finally { + closeParser(); + } + }); + } + + private List matches(Object obj) { + final List records = (obj instanceof List) ? ((List) obj) : Arrays.asList(obj); + return records.stream().map(doc -> { + try { + return new ObjectMapper().writeValueAsString(doc); + } catch (JsonProcessingException e) { + e.printStackTrace(); + return doc.toString(); + } + }).collect(Collectors.toList()); } @Override diff --git a/metafacture-json/src/test/java/org/metafacture/json/JsonDecoderTest.java b/metafacture-json/src/test/java/org/metafacture/json/JsonDecoderTest.java index 86c6ae2e1..67cd3a3a8 100644 --- a/metafacture-json/src/test/java/org/metafacture/json/JsonDecoderTest.java +++ b/metafacture-json/src/test/java/org/metafacture/json/JsonDecoderTest.java @@ -151,6 +151,22 @@ public void testShouldProcessConcatenatedRecords() { ordered.verify(receiver).endRecord(); } + @Test + public void testShouldProcessRecordsInArray() { + jsonDecoder.setRecordPath("$.data"); + jsonDecoder.process( + "{\"data\":[" + "{\"lit\": \"record 1\"}," + + "{\"lit\": \"record 2\"}" + "]}"); + + final InOrder ordered = inOrder(receiver); + ordered.verify(receiver).startRecord("1"); + ordered.verify(receiver).literal("lit", "record 1"); + ordered.verify(receiver).endRecord(); + ordered.verify(receiver).startRecord("2"); + ordered.verify(receiver).literal("lit", "record 2"); + ordered.verify(receiver).endRecord(); + } + @Test public void testShouldProcessMultipleRecords() { jsonDecoder.process("{\"lit\": \"record 1\"}");