Skip to content

Commit

Permalink
NIFI-13542 Added missing Max String Length property for JSON Readers
Browse files Browse the repository at this point in the history
This closes #9074

Signed-off-by: David Handermann <exceptionfactory@apache.org>
  • Loading branch information
mark-bathori authored and exceptionfactory committed Jul 15, 2024
1 parent 76a23d9 commit 730b9c6
Show file tree
Hide file tree
Showing 11 changed files with 42 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();

private static final StreamReadConstraints DEFAULT_STREAM_READ_CONSTRAINTS = StreamReadConstraints.builder()
static final StreamReadConstraints DEFAULT_STREAM_READ_CONSTRAINTS = StreamReadConstraints.builder()
.maxStringLength(DataUnit.parseDataSize(DEFAULT_MAX_STRING_LENGTH, DataUnit.B).intValue())
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.PathNotFoundException;
import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
import com.jayway.jsonpath.spi.json.JsonProvider;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.SimpleRecordSchema;
Expand All @@ -48,17 +50,17 @@
import java.util.Optional;

public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
private static final Configuration STRICT_PROVIDER_CONFIGURATION = Configuration.builder().jsonProvider(new JacksonJsonProvider()).build();

private final ComponentLog logger;
private final LinkedHashMap<String, JsonPath> jsonPaths;
private final InputStream in;
private final RecordSchema schema;
private final Configuration providerConfiguration;

public JsonPathRowRecordReader(final LinkedHashMap<String, JsonPath> jsonPaths, final RecordSchema schema, final InputStream in, final ComponentLog logger,
final String dateFormat, final String timeFormat, final String timestampFormat)
throws MalformedRecordException, IOException {
this(jsonPaths, schema, in, logger, dateFormat, timeFormat, timestampFormat, false, null);
this(jsonPaths, schema, in, logger, dateFormat, timeFormat, timestampFormat, false, DEFAULT_STREAM_READ_CONSTRAINTS);
}

public JsonPathRowRecordReader(final LinkedHashMap<String, JsonPath> jsonPaths, final RecordSchema schema, final InputStream in, final ComponentLog logger,
Expand All @@ -72,6 +74,11 @@ public JsonPathRowRecordReader(final LinkedHashMap<String, JsonPath> jsonPaths,
this.jsonPaths = jsonPaths;
this.in = in;
this.logger = logger;

final ObjectMapper objectMapper = new ObjectMapper();
objectMapper.getFactory().setStreamReadConstraints(streamReadConstraints);
final JsonProvider jsonProvider = new JacksonJsonProvider(objectMapper);
providerConfiguration = Configuration.builder().jsonProvider(jsonProvider).build();
}

@Override
Expand All @@ -90,7 +97,7 @@ protected Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSc
return null;
}

final DocumentContext ctx = JsonPath.using(STRICT_PROVIDER_CONFIGURATION).parse(jsonNode.toString());
final DocumentContext ctx = JsonPath.using(providerConfiguration).parse(jsonNode.toString());
final Map<String, Object> values = new HashMap<>(schema.getFieldCount());

for (final Map.Entry<String, JsonPath> entry : jsonPaths.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,20 @@ public class JsonRecordSource implements RecordSource<JsonNode> {
private final StartingFieldStrategy strategy;

public JsonRecordSource(final InputStream in) throws IOException {
this(in, null, null);
this(in, null, null, DEFAULT_STREAM_READ_CONSTRAINTS);
}

public JsonRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName) throws IOException {
this(in, strategy, startingFieldName, new JsonParserFactory());
public JsonRecordSource(final InputStream in, StreamReadConstraints streamReadConstraints) throws IOException {
this(in, null, null, streamReadConstraints);
}

public JsonRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName, TokenParserFactory tokenParserFactory) throws IOException {
jsonParser = tokenParserFactory.getJsonParser(in, DEFAULT_STREAM_READ_CONSTRAINTS, ALLOW_COMMENTS_ENABLED);
public JsonRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName, StreamReadConstraints streamReadConstraints) throws IOException {
this(in, strategy, startingFieldName, new JsonParserFactory(), streamReadConstraints);
}

public JsonRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName, TokenParserFactory tokenParserFactory,
StreamReadConstraints streamReadConstraints) throws IOException {
jsonParser = tokenParserFactory.getJsonParser(in, streamReadConstraints, ALLOW_COMMENTS_ENABLED);
this.strategy = strategy;

if (strategy == StartingFieldStrategy.NESTED_FIELD) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
import org.apache.nifi.json.TokenParserFactory;
import org.yaml.snakeyaml.LoaderOptions;

import java.io.IOException;
import java.io.InputStream;

public class YamlParserFactory implements TokenParserFactory {
private static final YAMLFactory YAML_FACTORY = new YAMLFactory(new YAMLMapper());

/**
* Get Parser implementation for YAML
Expand All @@ -39,6 +39,12 @@ public class YamlParserFactory implements TokenParserFactory {
*/
@Override
public JsonParser getJsonParser(final InputStream in, final StreamReadConstraints streamReadConstraints, final boolean allowComments) throws IOException {
return YAML_FACTORY.createParser(in);
final LoaderOptions loaderOptions = new LoaderOptions();
loaderOptions.setCodePointLimit(streamReadConstraints.getMaxStringLength());
final YAMLFactory yamlFactory = YAMLFactory.builder()
.loaderOptions(loaderOptions)
.build();

return yamlFactory.setCodec(new YAMLMapper()).createParser(in);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
*/
package org.apache.nifi.yaml;

import com.fasterxml.jackson.core.StreamReadConstraints;
import org.apache.nifi.json.JsonRecordSource;
import org.apache.nifi.json.StartingFieldStrategy;

import java.io.IOException;
import java.io.InputStream;

public class YamlRecordSource extends JsonRecordSource {
public YamlRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName) throws IOException {
super(in, strategy, startingFieldName, new YamlParserFactory());
public YamlRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName, StreamReadConstraints streamReadConstraints) throws IOException {
super(in, strategy, startingFieldName, new YamlParserFactory(), streamReadConstraints);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ protected List<AllowableValue> getSchemaAccessStrategyValues() {

@Override
protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final PropertyContext context) {
final RecordSourceFactory<JsonNode> jsonSourceFactory = (var, in) -> new JsonRecordSource(in);
final RecordSourceFactory<JsonNode> jsonSourceFactory = (var, in) -> new JsonRecordSource(in, streamReadConstraints);
final Supplier<SchemaInferenceEngine<JsonNode>> inferenceSupplier = () -> new JsonSchemaInference(new TimeValueInference(dateFormat, timeFormat, timestampFormat));

return SchemaInferenceUtil.getSchemaAccessStrategy(strategy, context, getLogger(), jsonSourceFactory, inferenceSupplier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade
protected volatile String startingFieldName;
protected volatile StartingFieldStrategy startingFieldStrategy;
protected volatile SchemaApplicationStrategy schemaApplicationStrategy;
protected volatile StreamReadConstraints streamReadConstraints;
private volatile boolean allowComments;
private volatile StreamReadConstraints streamReadConstraints;

public static final PropertyDescriptor STARTING_FIELD_STRATEGY = new PropertyDescriptor.Builder()
.name("starting-field-strategy")
Expand Down Expand Up @@ -179,7 +179,7 @@ protected SchemaAccessStrategy getSchemaAccessStrategy(final String schemaAccess
}

protected RecordSourceFactory<JsonNode> createJsonRecordSourceFactory() {
return (variables, in) -> new JsonRecordSource(in, startingFieldStrategy, startingFieldName);
return (variables, in) -> new JsonRecordSource(in, startingFieldStrategy, startingFieldName, streamReadConstraints);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@

package org.apache.nifi.yaml;

import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.json.AbstractJsonRowRecordReader;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.json.JsonTreeRowRecordReader;
import org.apache.nifi.logging.ComponentLog;
Expand Down Expand Up @@ -51,17 +49,12 @@ public class YamlTreeReader extends JsonTreeReader {

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
// Remove those properties which are not applicable for YAML
properties.remove(AbstractJsonRowRecordReader.MAX_STRING_LENGTH);
properties.remove(AbstractJsonRowRecordReader.ALLOW_COMMENTS);

return properties;
return new ArrayList<>(super.getSupportedPropertyDescriptors());
}

@Override
protected RecordSourceFactory<JsonNode> createJsonRecordSourceFactory() {
return (var, in) -> new YamlRecordSource(in, startingFieldStrategy, startingFieldName);
return (var, in) -> new YamlRecordSource(in, startingFieldStrategy, startingFieldName, streamReadConstraints);
}

@Override
Expand All @@ -70,11 +63,6 @@ protected JsonTreeRowRecordReader createJsonTreeRowRecordReader(InputStream in,
schemaApplicationStrategy, null);
}

@Override
protected StreamReadConstraints buildStreamReadConstraints(final ConfigurationContext context) {
return StreamReadConstraints.defaults();
}

@Override
protected boolean isAllowCommentsEnabled(final ConfigurationContext context) {
return ALLOW_COMMENTS_DISABLED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.nifi.json;

import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.inference.InferSchemaAccessStrategy;
Expand Down Expand Up @@ -236,7 +237,7 @@ private RecordSchema inferSchema(final File file, final StartingFieldStrategy st
final InputStream bufferedIn = new BufferedInputStream(in)) {

final InferSchemaAccessStrategy<?> accessStrategy = new InferSchemaAccessStrategy<>(
(var, content) -> new JsonRecordSource(content, strategy, startingFieldName),
(var, content) -> new JsonRecordSource(content, strategy, startingFieldName, StreamReadConstraints.defaults()),
timestampInference, Mockito.mock(ComponentLog.class)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1308,7 +1308,7 @@ private void testReadRecords(InputStream jsonStream,

private RecordSchema inferSchema(InputStream jsonStream, StartingFieldStrategy strategy, String startingFieldName) throws IOException {
RecordSchema schema = new InferSchemaAccessStrategy<>(
(__, inputStream) -> new JsonRecordSource(inputStream, strategy, startingFieldName),
(__, inputStream) -> new JsonRecordSource(inputStream, strategy, startingFieldName, StreamReadConstraints.defaults()),
new JsonSchemaInference(new TimeValueInference(null, null, null)),
mock(ComponentLog.class)
).getSchema(Collections.emptyMap(), jsonStream, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.nifi.yaml;

import com.fasterxml.jackson.core.StreamReadConstraints;
import org.apache.avro.Schema;
import org.apache.commons.io.FileUtils;
import org.apache.nifi.avro.AvroTypeUtil;
Expand Down Expand Up @@ -1087,7 +1088,7 @@ private void testNestedReadRecords(InputStream yamlStream,

private RecordSchema inferSchema(InputStream jsonStream, StartingFieldStrategy strategy, String startingFieldName) throws IOException {
RecordSchema schema = new InferSchemaAccessStrategy<>(
(__, inputStream) -> new YamlRecordSource(inputStream, strategy, startingFieldName),
(__, inputStream) -> new YamlRecordSource(inputStream, strategy, startingFieldName, StreamReadConstraints.defaults()),
new JsonSchemaInference(new TimeValueInference(null, null, null)),
mock(ComponentLog.class)
).getSchema(Collections.emptyMap(), jsonStream, null);
Expand Down

0 comments on commit 730b9c6

Please sign in to comment.