Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-13590 Refactor components in standard-processors bundle using current API methods #9119

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
Expand All @@ -67,18 +66,18 @@
import static java.sql.Types.BIT;
import static java.sql.Types.BLOB;
import static java.sql.Types.BOOLEAN;
import static java.sql.Types.CHAR;
import static java.sql.Types.CLOB;
import static java.sql.Types.DATE;
import static java.sql.Types.DECIMAL;
import static java.sql.Types.DOUBLE;
import static java.sql.Types.FLOAT;
import static java.sql.Types.INTEGER;
import static java.sql.Types.LONGVARBINARY;
import static java.sql.Types.NUMERIC;
import static java.sql.Types.CHAR;
import static java.sql.Types.DATE;
import static java.sql.Types.LONGNVARCHAR;
import static java.sql.Types.LONGVARBINARY;
import static java.sql.Types.LONGVARCHAR;
import static java.sql.Types.NCHAR;
import static java.sql.Types.NUMERIC;
import static java.sql.Types.NVARCHAR;
import static java.sql.Types.REAL;
import static java.sql.Types.ROWID;
Expand Down Expand Up @@ -283,10 +282,10 @@ public void setup(final ProcessContext context, boolean shouldCleanCache, FlowFi
columnTypeMap.clear();
}

final List<String> maxValueColumnNameList = Arrays.asList(maxValueColumnNames.toLowerCase().split(","));
final String[] maxValueColumnNameList = maxValueColumnNames.toLowerCase().split(",");
final List<String> maxValueQualifiedColumnNameList = new ArrayList<>();

for (String maxValueColumn:maxValueColumnNameList) {
for (String maxValueColumn : maxValueColumnNameList) {
String colKey = getStateKey(tableName, maxValueColumn.trim(), dbAdapter);
maxValueQualifiedColumnNameList.add(colKey);
}
Expand All @@ -304,7 +303,7 @@ public void setup(final ProcessContext context, boolean shouldCleanCache, FlowFi
columnTypeMap.putIfAbsent(colKey, colType);
}

for (String maxValueColumn:maxValueColumnNameList) {
for (String maxValueColumn : maxValueColumnNameList) {
String colKey = getStateKey(tableName, maxValueColumn.trim().toLowerCase(), dbAdapter);
if (!columnTypeMap.containsKey(colKey)) {
throw new ProcessException("Column not found in the table/query specified: " + maxValueColumn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,13 +517,13 @@ protected Pair<String, SQLException> executeConfigStatements(final Connection co
* Extract list of queries from config property
*/
protected List<String> getQueries(final String value) {
if (value == null || value.length() == 0 || value.trim().length() == 0) {
if (value == null || value.isEmpty() || value.isBlank()) {
return null;
}
final List<String> queries = new LinkedList<>();
for (String query : value.split("(?<!\\\\);")) {
query = query.replaceAll("\\\\;", ";");
if (query.trim().length() > 0) {
if (!query.isBlank()) {
queries.add(query.trim());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -53,15 +52,13 @@
*/
public abstract class AbstractJsonPathProcessor extends AbstractProcessor {

static final Map<String, String> NULL_REPRESENTATION_MAP = new HashMap<>();

static final String EMPTY_STRING_OPTION = "empty string";
static final String NULL_STRING_OPTION = "the string 'null'";

static {
NULL_REPRESENTATION_MAP.put(EMPTY_STRING_OPTION, "");
NULL_REPRESENTATION_MAP.put(NULL_STRING_OPTION, "null");
}
static final Map<String, String> NULL_REPRESENTATION_MAP = Map.of(
EMPTY_STRING_OPTION, "",
NULL_STRING_OPTION, "null"
);

public static final PropertyDescriptor NULL_VALUE_DEFAULT_REPRESENTATION = new PropertyDescriptor.Builder()
.name("Null Value Representation")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,23 +69,23 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr
public static final String RESULT_TABLENAME = "tablename";
public static final String RESULT_ROW_COUNT = "querydbtable.row.count";

private static AllowableValue TRANSACTION_READ_COMMITTED = new AllowableValue(
private static final AllowableValue TRANSACTION_READ_COMMITTED = new AllowableValue(
String.valueOf(Connection.TRANSACTION_READ_COMMITTED),
"TRANSACTION_READ_COMMITTED"
);
private static AllowableValue TRANSACTION_READ_UNCOMMITTED = new AllowableValue(
private static final AllowableValue TRANSACTION_READ_UNCOMMITTED = new AllowableValue(
String.valueOf(Connection.TRANSACTION_READ_UNCOMMITTED),
"TRANSACTION_READ_UNCOMMITTED"
);
private static AllowableValue TRANSACTION_REPEATABLE_READ = new AllowableValue(
private static final AllowableValue TRANSACTION_REPEATABLE_READ = new AllowableValue(
String.valueOf(Connection.TRANSACTION_REPEATABLE_READ),
"TRANSACTION_REPEATABLE_READ"
);
private static AllowableValue TRANSACTION_NONE = new AllowableValue(
private static final AllowableValue TRANSACTION_NONE = new AllowableValue(
String.valueOf(Connection.TRANSACTION_NONE),
"TRANSACTION_NONE"
);
private static AllowableValue TRANSACTION_SERIALIZABLE = new AllowableValue(
private static final AllowableValue TRANSACTION_SERIALIZABLE = new AllowableValue(
String.valueOf(Connection.TRANSACTION_SERIALIZABLE),
"TRANSACTION_SERIALIZABLE"
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

package org.apache.nifi.processors.standard;

import org.apache.commons.text.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.commons.text.StringEscapeUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
Expand All @@ -34,23 +34,19 @@
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

import java.util.Map;
import java.util.Set;
import java.util.HashSet;
import java.util.List;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.Collections;
import java.util.Arrays;
import java.util.ArrayList;

@SideEffectFree
@SupportsBatching
Expand Down Expand Up @@ -150,45 +146,40 @@ public class AttributesToCSV extends AbstractProcessor {
.defaultValue("false")
.build();

private static final List<PropertyDescriptor> PROPERTIES = List.of(
ATTRIBUTES_LIST,
ATTRIBUTES_REGEX,
DESTINATION,
INCLUDE_CORE_ATTRIBUTES,
NULL_VALUE_FOR_EMPTY_STRING,
INCLUDE_SCHEMA
);

public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("Successfully converted attributes to CSV").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("Failed to convert attributes to CSV").build();

private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
private static final Set<Relationship> RELATIONSHIPS = Set.of(
REL_SUCCESS,
REL_FAILURE
);

private volatile Boolean includeCoreAttributes;
private volatile Set<String> coreAttributes;
private volatile boolean destinationContent;
private volatile boolean nullValForEmptyString;
private volatile Pattern pattern;
private volatile Boolean includeSchema;

@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(ATTRIBUTES_LIST);
properties.add(ATTRIBUTES_REGEX);
properties.add(DESTINATION);
properties.add(INCLUDE_CORE_ATTRIBUTES);
properties.add(NULL_VALUE_FOR_EMPTY_STRING);
properties.add(INCLUDE_SCHEMA);
this.properties = Collections.unmodifiableList(properties);

final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
return PROPERTIES;
}

@Override
public Set<Relationship> getRelationships() {
return relationships;
return RELATIONSHIPS;
}


Expand Down Expand Up @@ -311,7 +302,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
if (destinationContent) {
FlowFile conFlowfile = session.write(original, (in, out) -> {
if (includeSchema) {
sbNames.append(System.getProperty("line.separator"));
sbNames.append(System.lineSeparator());
out.write(sbNames.toString().getBytes());
}
out.write(sbValues.toString().getBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,13 @@
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

import java.io.BufferedOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -157,7 +154,7 @@ public String getDescription() {
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues(JsonHandlingStrategy.class)
.defaultValue(AttributesToJSON.JsonHandlingStrategy.ESCAPED.getValue())
.defaultValue(AttributesToJSON.JsonHandlingStrategy.ESCAPED)
.build();

public static final PropertyDescriptor PRETTY_PRINT = new PropertyDescriptor.Builder()
Expand All @@ -170,14 +167,27 @@ public String getDescription() {
.dependsOn(DESTINATION, DESTINATION_CONTENT)
.build();

private static final List<PropertyDescriptor> PROPERTIES = List.of(
ATTRIBUTES_LIST,
ATTRIBUTES_REGEX,
DESTINATION,
INCLUDE_CORE_ATTRIBUTES,
NULL_VALUE_FOR_EMPTY_STRING,
JSON_HANDLING_STRATEGY,
PRETTY_PRINT
);

public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("Successfully converted attributes to JSON").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("Failed to convert attributes to JSON").build();

private static final Set<Relationship> RELATIONSHIPS = Set.of(
REL_SUCCESS,
REL_FAILURE
);

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
private volatile Set<String> attributesToRemove;
private volatile Set<String> attributes;
private volatile Boolean nullValueForEmptyString;
Expand All @@ -186,32 +196,14 @@ public String getDescription() {
private volatile Pattern pattern;
private volatile JsonHandlingStrategy jsonHandlingStrategy;

@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(ATTRIBUTES_LIST);
properties.add(ATTRIBUTES_REGEX);
properties.add(DESTINATION);
properties.add(INCLUDE_CORE_ATTRIBUTES);
properties.add(NULL_VALUE_FOR_EMPTY_STRING);
properties.add(JSON_HANDLING_STRATEGY);
properties.add(PRETTY_PRINT);
this.properties = Collections.unmodifiableList(properties);

final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
return PROPERTIES;
}

@Override
public Set<Relationship> getRelationships() {
return relationships;
return RELATIONSHIPS;
}

/**
Expand Down Expand Up @@ -272,15 +264,15 @@ private Set<String> buildAtrs(String atrList) {

@OnScheduled
public void onScheduled(ProcessContext context) {
attributesToRemove = context.getProperty(INCLUDE_CORE_ATTRIBUTES).asBoolean() ? Collections.EMPTY_SET : Arrays.stream(CoreAttributes.values())
attributesToRemove = context.getProperty(INCLUDE_CORE_ATTRIBUTES).asBoolean() ? Set.of() : Arrays.stream(CoreAttributes.values())
.map(CoreAttributes::key)
.collect(Collectors.toSet());
attributes = buildAtrs(context.getProperty(ATTRIBUTES_LIST).getValue());
nullValueForEmptyString = context.getProperty(NULL_VALUE_FOR_EMPTY_STRING).asBoolean();
destinationContent = DESTINATION_CONTENT.equals(context.getProperty(DESTINATION).getValue());
final boolean prettyPrint = context.getProperty(PRETTY_PRINT).asBoolean();
objectWriter = destinationContent && prettyPrint ? OBJECT_MAPPER.writerWithDefaultPrettyPrinter() : OBJECT_MAPPER.writer();
jsonHandlingStrategy = JsonHandlingStrategy.valueOf(context.getProperty(JSON_HANDLING_STRATEGY).getValue());
jsonHandlingStrategy = context.getProperty(JSON_HANDLING_STRATEGY).asAllowableValue(JsonHandlingStrategy.class);

if (context.getProperty(ATTRIBUTES_REGEX).isSet()) {
pattern = Pattern.compile(context.getProperty(ATTRIBUTES_REGEX).evaluateAttributeExpressions().getValue());
Expand Down
Loading