Skip to content

Commit

Permalink
NIFI-13590 Refactored Standard Processors using current API methods
Browse files Browse the repository at this point in the history
This closes #9119

Signed-off-by: David Handermann <exceptionfactory@apache.org>
  • Loading branch information
EndzeitBegins authored and exceptionfactory committed Aug 2, 2024
1 parent 83d2078 commit 4588449
Show file tree
Hide file tree
Showing 141 changed files with 2,815 additions and 3,475 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
Expand All @@ -67,18 +66,18 @@
import static java.sql.Types.BIT;
import static java.sql.Types.BLOB;
import static java.sql.Types.BOOLEAN;
import static java.sql.Types.CHAR;
import static java.sql.Types.CLOB;
import static java.sql.Types.DATE;
import static java.sql.Types.DECIMAL;
import static java.sql.Types.DOUBLE;
import static java.sql.Types.FLOAT;
import static java.sql.Types.INTEGER;
import static java.sql.Types.LONGVARBINARY;
import static java.sql.Types.NUMERIC;
import static java.sql.Types.CHAR;
import static java.sql.Types.DATE;
import static java.sql.Types.LONGNVARCHAR;
import static java.sql.Types.LONGVARBINARY;
import static java.sql.Types.LONGVARCHAR;
import static java.sql.Types.NCHAR;
import static java.sql.Types.NUMERIC;
import static java.sql.Types.NVARCHAR;
import static java.sql.Types.REAL;
import static java.sql.Types.ROWID;
Expand Down Expand Up @@ -283,10 +282,10 @@ public void setup(final ProcessContext context, boolean shouldCleanCache, FlowFi
columnTypeMap.clear();
}

final List<String> maxValueColumnNameList = Arrays.asList(maxValueColumnNames.toLowerCase().split(","));
final String[] maxValueColumnNameList = maxValueColumnNames.toLowerCase().split(",");
final List<String> maxValueQualifiedColumnNameList = new ArrayList<>();

for (String maxValueColumn:maxValueColumnNameList) {
for (String maxValueColumn : maxValueColumnNameList) {
String colKey = getStateKey(tableName, maxValueColumn.trim(), dbAdapter);
maxValueQualifiedColumnNameList.add(colKey);
}
Expand All @@ -304,7 +303,7 @@ public void setup(final ProcessContext context, boolean shouldCleanCache, FlowFi
columnTypeMap.putIfAbsent(colKey, colType);
}

for (String maxValueColumn:maxValueColumnNameList) {
for (String maxValueColumn : maxValueColumnNameList) {
String colKey = getStateKey(tableName, maxValueColumn.trim().toLowerCase(), dbAdapter);
if (!columnTypeMap.containsKey(colKey)) {
throw new ProcessException("Column not found in the table/query specified: " + maxValueColumn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,13 +517,13 @@ protected Pair<String, SQLException> executeConfigStatements(final Connection co
* Extract list of queries from config property
*/
protected List<String> getQueries(final String value) {
if (value == null || value.length() == 0 || value.trim().length() == 0) {
if (value == null || value.isEmpty() || value.isBlank()) {
return null;
}
final List<String> queries = new LinkedList<>();
for (String query : value.split("(?<!\\\\);")) {
query = query.replaceAll("\\\\;", ";");
if (query.trim().length() > 0) {
if (!query.isBlank()) {
queries.add(query.trim());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -53,15 +52,13 @@
*/
public abstract class AbstractJsonPathProcessor extends AbstractProcessor {

static final Map<String, String> NULL_REPRESENTATION_MAP = new HashMap<>();

static final String EMPTY_STRING_OPTION = "empty string";
static final String NULL_STRING_OPTION = "the string 'null'";

static {
NULL_REPRESENTATION_MAP.put(EMPTY_STRING_OPTION, "");
NULL_REPRESENTATION_MAP.put(NULL_STRING_OPTION, "null");
}
static final Map<String, String> NULL_REPRESENTATION_MAP = Map.of(
EMPTY_STRING_OPTION, "",
NULL_STRING_OPTION, "null"
);

public static final PropertyDescriptor NULL_VALUE_DEFAULT_REPRESENTATION = new PropertyDescriptor.Builder()
.name("Null Value Representation")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,23 +69,23 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr
public static final String RESULT_TABLENAME = "tablename";
public static final String RESULT_ROW_COUNT = "querydbtable.row.count";

private static AllowableValue TRANSACTION_READ_COMMITTED = new AllowableValue(
private static final AllowableValue TRANSACTION_READ_COMMITTED = new AllowableValue(
String.valueOf(Connection.TRANSACTION_READ_COMMITTED),
"TRANSACTION_READ_COMMITTED"
);
private static AllowableValue TRANSACTION_READ_UNCOMMITTED = new AllowableValue(
private static final AllowableValue TRANSACTION_READ_UNCOMMITTED = new AllowableValue(
String.valueOf(Connection.TRANSACTION_READ_UNCOMMITTED),
"TRANSACTION_READ_UNCOMMITTED"
);
private static AllowableValue TRANSACTION_REPEATABLE_READ = new AllowableValue(
private static final AllowableValue TRANSACTION_REPEATABLE_READ = new AllowableValue(
String.valueOf(Connection.TRANSACTION_REPEATABLE_READ),
"TRANSACTION_REPEATABLE_READ"
);
private static AllowableValue TRANSACTION_NONE = new AllowableValue(
private static final AllowableValue TRANSACTION_NONE = new AllowableValue(
String.valueOf(Connection.TRANSACTION_NONE),
"TRANSACTION_NONE"
);
private static AllowableValue TRANSACTION_SERIALIZABLE = new AllowableValue(
private static final AllowableValue TRANSACTION_SERIALIZABLE = new AllowableValue(
String.valueOf(Connection.TRANSACTION_SERIALIZABLE),
"TRANSACTION_SERIALIZABLE"
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

package org.apache.nifi.processors.standard;

import org.apache.commons.text.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.commons.text.StringEscapeUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
Expand All @@ -34,23 +34,19 @@
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

import java.util.Map;
import java.util.Set;
import java.util.HashSet;
import java.util.List;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.Collections;
import java.util.Arrays;
import java.util.ArrayList;

@SideEffectFree
@SupportsBatching
Expand Down Expand Up @@ -150,45 +146,40 @@ public class AttributesToCSV extends AbstractProcessor {
.defaultValue("false")
.build();

private static final List<PropertyDescriptor> PROPERTIES = List.of(
ATTRIBUTES_LIST,
ATTRIBUTES_REGEX,
DESTINATION,
INCLUDE_CORE_ATTRIBUTES,
NULL_VALUE_FOR_EMPTY_STRING,
INCLUDE_SCHEMA
);

public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("Successfully converted attributes to CSV").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("Failed to convert attributes to CSV").build();

private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
private static final Set<Relationship> RELATIONSHIPS = Set.of(
REL_SUCCESS,
REL_FAILURE
);

private volatile Boolean includeCoreAttributes;
private volatile Set<String> coreAttributes;
private volatile boolean destinationContent;
private volatile boolean nullValForEmptyString;
private volatile Pattern pattern;
private volatile Boolean includeSchema;

@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(ATTRIBUTES_LIST);
properties.add(ATTRIBUTES_REGEX);
properties.add(DESTINATION);
properties.add(INCLUDE_CORE_ATTRIBUTES);
properties.add(NULL_VALUE_FOR_EMPTY_STRING);
properties.add(INCLUDE_SCHEMA);
this.properties = Collections.unmodifiableList(properties);

final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
return PROPERTIES;
}

@Override
public Set<Relationship> getRelationships() {
return relationships;
return RELATIONSHIPS;
}


Expand Down Expand Up @@ -311,7 +302,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
if (destinationContent) {
FlowFile conFlowfile = session.write(original, (in, out) -> {
if (includeSchema) {
sbNames.append(System.getProperty("line.separator"));
sbNames.append(System.lineSeparator());
out.write(sbNames.toString().getBytes());
}
out.write(sbValues.toString().getBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,13 @@
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

import java.io.BufferedOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -157,7 +154,7 @@ public String getDescription() {
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues(JsonHandlingStrategy.class)
.defaultValue(AttributesToJSON.JsonHandlingStrategy.ESCAPED.getValue())
.defaultValue(AttributesToJSON.JsonHandlingStrategy.ESCAPED)
.build();

public static final PropertyDescriptor PRETTY_PRINT = new PropertyDescriptor.Builder()
Expand All @@ -170,14 +167,27 @@ public String getDescription() {
.dependsOn(DESTINATION, DESTINATION_CONTENT)
.build();

private static final List<PropertyDescriptor> PROPERTIES = List.of(
ATTRIBUTES_LIST,
ATTRIBUTES_REGEX,
DESTINATION,
INCLUDE_CORE_ATTRIBUTES,
NULL_VALUE_FOR_EMPTY_STRING,
JSON_HANDLING_STRATEGY,
PRETTY_PRINT
);

public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("Successfully converted attributes to JSON").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("Failed to convert attributes to JSON").build();

private static final Set<Relationship> RELATIONSHIPS = Set.of(
REL_SUCCESS,
REL_FAILURE
);

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
private volatile Set<String> attributesToRemove;
private volatile Set<String> attributes;
private volatile Boolean nullValueForEmptyString;
Expand All @@ -186,32 +196,14 @@ public String getDescription() {
private volatile Pattern pattern;
private volatile JsonHandlingStrategy jsonHandlingStrategy;

@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(ATTRIBUTES_LIST);
properties.add(ATTRIBUTES_REGEX);
properties.add(DESTINATION);
properties.add(INCLUDE_CORE_ATTRIBUTES);
properties.add(NULL_VALUE_FOR_EMPTY_STRING);
properties.add(JSON_HANDLING_STRATEGY);
properties.add(PRETTY_PRINT);
this.properties = Collections.unmodifiableList(properties);

final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
return PROPERTIES;
}

@Override
public Set<Relationship> getRelationships() {
return relationships;
return RELATIONSHIPS;
}

/**
Expand Down Expand Up @@ -272,15 +264,15 @@ private Set<String> buildAtrs(String atrList) {

@OnScheduled
public void onScheduled(ProcessContext context) {
attributesToRemove = context.getProperty(INCLUDE_CORE_ATTRIBUTES).asBoolean() ? Collections.EMPTY_SET : Arrays.stream(CoreAttributes.values())
attributesToRemove = context.getProperty(INCLUDE_CORE_ATTRIBUTES).asBoolean() ? Set.of() : Arrays.stream(CoreAttributes.values())
.map(CoreAttributes::key)
.collect(Collectors.toSet());
attributes = buildAtrs(context.getProperty(ATTRIBUTES_LIST).getValue());
nullValueForEmptyString = context.getProperty(NULL_VALUE_FOR_EMPTY_STRING).asBoolean();
destinationContent = DESTINATION_CONTENT.equals(context.getProperty(DESTINATION).getValue());
final boolean prettyPrint = context.getProperty(PRETTY_PRINT).asBoolean();
objectWriter = destinationContent && prettyPrint ? OBJECT_MAPPER.writerWithDefaultPrettyPrinter() : OBJECT_MAPPER.writer();
jsonHandlingStrategy = JsonHandlingStrategy.valueOf(context.getProperty(JSON_HANDLING_STRATEGY).getValue());
jsonHandlingStrategy = context.getProperty(JSON_HANDLING_STRATEGY).asAllowableValue(JsonHandlingStrategy.class);

if (context.getProperty(ATTRIBUTES_REGEX).isSet()) {
pattern = Pattern.compile(context.getProperty(ATTRIBUTES_REGEX).evaluateAttributeExpressions().getValue());
Expand Down
Loading

0 comments on commit 4588449

Please sign in to comment.