Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-13542: Fix missing max string length parameter usage in multiple… #9074

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();

private static final StreamReadConstraints DEFAULT_STREAM_READ_CONSTRAINTS = StreamReadConstraints.builder()
static final StreamReadConstraints DEFAULT_STREAM_READ_CONSTRAINTS = StreamReadConstraints.builder()
.maxStringLength(DataUnit.parseDataSize(DEFAULT_MAX_STRING_LENGTH, DataUnit.B).intValue())
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.PathNotFoundException;
import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
import com.jayway.jsonpath.spi.json.JsonProvider;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.SimpleRecordSchema;
Expand All @@ -48,17 +50,17 @@
import java.util.Optional;

public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
private static final Configuration STRICT_PROVIDER_CONFIGURATION = Configuration.builder().jsonProvider(new JacksonJsonProvider()).build();

private final ComponentLog logger;
private final LinkedHashMap<String, JsonPath> jsonPaths;
private final InputStream in;
private final RecordSchema schema;
private final Configuration providerConfiguration;

public JsonPathRowRecordReader(final LinkedHashMap<String, JsonPath> jsonPaths, final RecordSchema schema, final InputStream in, final ComponentLog logger,
final String dateFormat, final String timeFormat, final String timestampFormat)
throws MalformedRecordException, IOException {
this(jsonPaths, schema, in, logger, dateFormat, timeFormat, timestampFormat, false, null);
this(jsonPaths, schema, in, logger, dateFormat, timeFormat, timestampFormat, false, DEFAULT_STREAM_READ_CONSTRAINTS);
}

public JsonPathRowRecordReader(final LinkedHashMap<String, JsonPath> jsonPaths, final RecordSchema schema, final InputStream in, final ComponentLog logger,
Expand All @@ -72,6 +74,11 @@ public JsonPathRowRecordReader(final LinkedHashMap<String, JsonPath> jsonPaths,
this.jsonPaths = jsonPaths;
this.in = in;
this.logger = logger;

final ObjectMapper objectMapper = new ObjectMapper();
objectMapper.getFactory().setStreamReadConstraints(streamReadConstraints);
final JsonProvider jsonProvider = new JacksonJsonProvider(objectMapper);
providerConfiguration = Configuration.builder().jsonProvider(jsonProvider).build();
}

@Override
Expand All @@ -90,7 +97,7 @@ protected Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSc
return null;
}

final DocumentContext ctx = JsonPath.using(STRICT_PROVIDER_CONFIGURATION).parse(jsonNode.toString());
final DocumentContext ctx = JsonPath.using(providerConfiguration).parse(jsonNode.toString());
final Map<String, Object> values = new HashMap<>(schema.getFieldCount());

for (final Map.Entry<String, JsonPath> entry : jsonPaths.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,20 @@ public class JsonRecordSource implements RecordSource<JsonNode> {
private final StartingFieldStrategy strategy;

public JsonRecordSource(final InputStream in) throws IOException {
this(in, null, null);
this(in, null, null, DEFAULT_STREAM_READ_CONSTRAINTS);
}

public JsonRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName) throws IOException {
this(in, strategy, startingFieldName, new JsonParserFactory());
public JsonRecordSource(final InputStream in, StreamReadConstraints streamReadConstraints) throws IOException {
this(in, null, null, streamReadConstraints);
}

public JsonRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName, TokenParserFactory tokenParserFactory) throws IOException {
jsonParser = tokenParserFactory.getJsonParser(in, DEFAULT_STREAM_READ_CONSTRAINTS, ALLOW_COMMENTS_ENABLED);
public JsonRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName, StreamReadConstraints streamReadConstraints) throws IOException {
this(in, strategy, startingFieldName, new JsonParserFactory(), streamReadConstraints);
}

public JsonRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName, TokenParserFactory tokenParserFactory,
StreamReadConstraints streamReadConstraints) throws IOException {
jsonParser = tokenParserFactory.getJsonParser(in, streamReadConstraints, ALLOW_COMMENTS_ENABLED);
this.strategy = strategy;

if (strategy == StartingFieldStrategy.NESTED_FIELD) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
import org.apache.nifi.json.TokenParserFactory;
import org.yaml.snakeyaml.LoaderOptions;

import java.io.IOException;
import java.io.InputStream;

public class YamlParserFactory implements TokenParserFactory {
private static final YAMLFactory YAML_FACTORY = new YAMLFactory(new YAMLMapper());

/**
* Get Parser implementation for YAML
Expand All @@ -39,6 +39,12 @@ public class YamlParserFactory implements TokenParserFactory {
*/
@Override
public JsonParser getJsonParser(final InputStream in, final StreamReadConstraints streamReadConstraints, final boolean allowComments) throws IOException {
return YAML_FACTORY.createParser(in);
final LoaderOptions loaderOptions = new LoaderOptions();
loaderOptions.setCodePointLimit(streamReadConstraints.getMaxStringLength());
final YAMLFactory yamlFactory = YAMLFactory.builder()
.loaderOptions(loaderOptions)
.build();

return yamlFactory.setCodec(new YAMLMapper()).createParser(in);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
*/
package org.apache.nifi.yaml;

import com.fasterxml.jackson.core.StreamReadConstraints;
import org.apache.nifi.json.JsonRecordSource;
import org.apache.nifi.json.StartingFieldStrategy;

import java.io.IOException;
import java.io.InputStream;

public class YamlRecordSource extends JsonRecordSource {
public YamlRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName) throws IOException {
super(in, strategy, startingFieldName, new YamlParserFactory());
public YamlRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName, StreamReadConstraints streamReadConstraints) throws IOException {
super(in, strategy, startingFieldName, new YamlParserFactory(), streamReadConstraints);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ protected List<AllowableValue> getSchemaAccessStrategyValues() {

@Override
protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final PropertyContext context) {
final RecordSourceFactory<JsonNode> jsonSourceFactory = (var, in) -> new JsonRecordSource(in);
final RecordSourceFactory<JsonNode> jsonSourceFactory = (var, in) -> new JsonRecordSource(in, streamReadConstraints);
final Supplier<SchemaInferenceEngine<JsonNode>> inferenceSupplier = () -> new JsonSchemaInference(new TimeValueInference(dateFormat, timeFormat, timestampFormat));

return SchemaInferenceUtil.getSchemaAccessStrategy(strategy, context, getLogger(), jsonSourceFactory, inferenceSupplier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade
protected volatile String startingFieldName;
protected volatile StartingFieldStrategy startingFieldStrategy;
protected volatile SchemaApplicationStrategy schemaApplicationStrategy;
protected volatile StreamReadConstraints streamReadConstraints;
private volatile boolean allowComments;
private volatile StreamReadConstraints streamReadConstraints;

public static final PropertyDescriptor STARTING_FIELD_STRATEGY = new PropertyDescriptor.Builder()
.name("starting-field-strategy")
Expand Down Expand Up @@ -179,7 +179,7 @@ protected SchemaAccessStrategy getSchemaAccessStrategy(final String schemaAccess
}

protected RecordSourceFactory<JsonNode> createJsonRecordSourceFactory() {
return (variables, in) -> new JsonRecordSource(in, startingFieldStrategy, startingFieldName);
return (variables, in) -> new JsonRecordSource(in, startingFieldStrategy, startingFieldName, streamReadConstraints);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@

package org.apache.nifi.yaml;

import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.json.AbstractJsonRowRecordReader;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.json.JsonTreeRowRecordReader;
import org.apache.nifi.logging.ComponentLog;
Expand Down Expand Up @@ -51,17 +49,12 @@ public class YamlTreeReader extends JsonTreeReader {

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
// Remove those properties which are not applicable for YAML
properties.remove(AbstractJsonRowRecordReader.MAX_STRING_LENGTH);
properties.remove(AbstractJsonRowRecordReader.ALLOW_COMMENTS);
exceptionfactory marked this conversation as resolved.
Show resolved Hide resolved

return properties;
return new ArrayList<>(super.getSupportedPropertyDescriptors());
}

@Override
protected RecordSourceFactory<JsonNode> createJsonRecordSourceFactory() {
return (var, in) -> new YamlRecordSource(in, startingFieldStrategy, startingFieldName);
return (var, in) -> new YamlRecordSource(in, startingFieldStrategy, startingFieldName, streamReadConstraints);
}

@Override
Expand All @@ -70,11 +63,6 @@ protected JsonTreeRowRecordReader createJsonTreeRowRecordReader(InputStream in,
schemaApplicationStrategy, null);
}

@Override
protected StreamReadConstraints buildStreamReadConstraints(final ConfigurationContext context) {
return StreamReadConstraints.defaults();
}

@Override
protected boolean isAllowCommentsEnabled(final ConfigurationContext context) {
return ALLOW_COMMENTS_DISABLED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.nifi.json;

import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.inference.InferSchemaAccessStrategy;
Expand Down Expand Up @@ -236,7 +237,7 @@ private RecordSchema inferSchema(final File file, final StartingFieldStrategy st
final InputStream bufferedIn = new BufferedInputStream(in)) {

final InferSchemaAccessStrategy<?> accessStrategy = new InferSchemaAccessStrategy<>(
(var, content) -> new JsonRecordSource(content, strategy, startingFieldName),
(var, content) -> new JsonRecordSource(content, strategy, startingFieldName, StreamReadConstraints.defaults()),
timestampInference, Mockito.mock(ComponentLog.class)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1308,7 +1308,7 @@ private void testReadRecords(InputStream jsonStream,

private RecordSchema inferSchema(InputStream jsonStream, StartingFieldStrategy strategy, String startingFieldName) throws IOException {
RecordSchema schema = new InferSchemaAccessStrategy<>(
(__, inputStream) -> new JsonRecordSource(inputStream, strategy, startingFieldName),
(__, inputStream) -> new JsonRecordSource(inputStream, strategy, startingFieldName, StreamReadConstraints.defaults()),
new JsonSchemaInference(new TimeValueInference(null, null, null)),
mock(ComponentLog.class)
).getSchema(Collections.emptyMap(), jsonStream, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.nifi.yaml;

import com.fasterxml.jackson.core.StreamReadConstraints;
import org.apache.avro.Schema;
import org.apache.commons.io.FileUtils;
import org.apache.nifi.avro.AvroTypeUtil;
Expand Down Expand Up @@ -1087,7 +1088,7 @@ private void testNestedReadRecords(InputStream yamlStream,

private RecordSchema inferSchema(InputStream jsonStream, StartingFieldStrategy strategy, String startingFieldName) throws IOException {
RecordSchema schema = new InferSchemaAccessStrategy<>(
(__, inputStream) -> new YamlRecordSource(inputStream, strategy, startingFieldName),
(__, inputStream) -> new YamlRecordSource(inputStream, strategy, startingFieldName, StreamReadConstraints.defaults()),
new JsonSchemaInference(new TimeValueInference(null, null, null)),
mock(ComponentLog.class)
).getSchema(Collections.emptyMap(), jsonStream, null);
Expand Down