Skip to content

Commit

Permalink
NIFI-13590 Refactor components in standard-processors bundle using cu…
Browse files Browse the repository at this point in the history
…rrent API methods
  • Loading branch information
EndzeitBegins committed Jul 28, 2024
1 parent beefa2a commit 058a551
Show file tree
Hide file tree
Showing 139 changed files with 1,838 additions and 3,480 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
Expand All @@ -67,18 +66,18 @@
import static java.sql.Types.BIT;
import static java.sql.Types.BLOB;
import static java.sql.Types.BOOLEAN;
import static java.sql.Types.CHAR;
import static java.sql.Types.CLOB;
import static java.sql.Types.DATE;
import static java.sql.Types.DECIMAL;
import static java.sql.Types.DOUBLE;
import static java.sql.Types.FLOAT;
import static java.sql.Types.INTEGER;
import static java.sql.Types.LONGVARBINARY;
import static java.sql.Types.NUMERIC;
import static java.sql.Types.CHAR;
import static java.sql.Types.DATE;
import static java.sql.Types.LONGNVARCHAR;
import static java.sql.Types.LONGVARBINARY;
import static java.sql.Types.LONGVARCHAR;
import static java.sql.Types.NCHAR;
import static java.sql.Types.NUMERIC;
import static java.sql.Types.NVARCHAR;
import static java.sql.Types.REAL;
import static java.sql.Types.ROWID;
Expand Down Expand Up @@ -283,10 +282,10 @@ public void setup(final ProcessContext context, boolean shouldCleanCache, FlowFi
columnTypeMap.clear();
}

final List<String> maxValueColumnNameList = Arrays.asList(maxValueColumnNames.toLowerCase().split(","));
final String[] maxValueColumnNameList = maxValueColumnNames.toLowerCase().split(",");
final List<String> maxValueQualifiedColumnNameList = new ArrayList<>();

for (String maxValueColumn:maxValueColumnNameList) {
for (String maxValueColumn : maxValueColumnNameList) {
String colKey = getStateKey(tableName, maxValueColumn.trim(), dbAdapter);
maxValueQualifiedColumnNameList.add(colKey);
}
Expand All @@ -304,7 +303,7 @@ public void setup(final ProcessContext context, boolean shouldCleanCache, FlowFi
columnTypeMap.putIfAbsent(colKey, colType);
}

for (String maxValueColumn:maxValueColumnNameList) {
for (String maxValueColumn : maxValueColumnNameList) {
String colKey = getStateKey(tableName, maxValueColumn.trim().toLowerCase(), dbAdapter);
if (!columnTypeMap.containsKey(colKey)) {
throw new ProcessException("Column not found in the table/query specified: " + maxValueColumn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,13 +517,13 @@ protected Pair<String, SQLException> executeConfigStatements(final Connection co
* Extract list of queries from config property
*/
protected List<String> getQueries(final String value) {
if (value == null || value.length() == 0 || value.trim().length() == 0) {
if (value == null || value.isEmpty() || value.isBlank()) {
return null;
}
final List<String> queries = new LinkedList<>();
for (String query : value.split("(?<!\\\\);")) {
query = query.replaceAll("\\\\;", ";");
if (query.trim().length() > 0) {
if (!query.isBlank()) {
queries.add(query.trim());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,23 +69,23 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr
public static final String RESULT_TABLENAME = "tablename";
public static final String RESULT_ROW_COUNT = "querydbtable.row.count";

private static AllowableValue TRANSACTION_READ_COMMITTED = new AllowableValue(
private static final AllowableValue TRANSACTION_READ_COMMITTED = new AllowableValue(
String.valueOf(Connection.TRANSACTION_READ_COMMITTED),
"TRANSACTION_READ_COMMITTED"
);
private static AllowableValue TRANSACTION_READ_UNCOMMITTED = new AllowableValue(
private static final AllowableValue TRANSACTION_READ_UNCOMMITTED = new AllowableValue(
String.valueOf(Connection.TRANSACTION_READ_UNCOMMITTED),
"TRANSACTION_READ_UNCOMMITTED"
);
private static AllowableValue TRANSACTION_REPEATABLE_READ = new AllowableValue(
private static final AllowableValue TRANSACTION_REPEATABLE_READ = new AllowableValue(
String.valueOf(Connection.TRANSACTION_REPEATABLE_READ),
"TRANSACTION_REPEATABLE_READ"
);
private static AllowableValue TRANSACTION_NONE = new AllowableValue(
private static final AllowableValue TRANSACTION_NONE = new AllowableValue(
String.valueOf(Connection.TRANSACTION_NONE),
"TRANSACTION_NONE"
);
private static AllowableValue TRANSACTION_SERIALIZABLE = new AllowableValue(
private static final AllowableValue TRANSACTION_SERIALIZABLE = new AllowableValue(
String.valueOf(Connection.TRANSACTION_SERIALIZABLE),
"TRANSACTION_SERIALIZABLE"
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

package org.apache.nifi.processors.standard;

import org.apache.commons.text.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.commons.text.StringEscapeUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
Expand All @@ -34,23 +34,19 @@
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

import java.util.Map;
import java.util.Set;
import java.util.HashSet;
import java.util.List;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.Collections;
import java.util.Arrays;
import java.util.ArrayList;

@SideEffectFree
@SupportsBatching
Expand Down Expand Up @@ -150,45 +146,33 @@ public class AttributesToCSV extends AbstractProcessor {
.defaultValue("false")
.build();

private static final List<PropertyDescriptor> PROPERTIES = List.of(
ATTRIBUTES_LIST, ATTRIBUTES_REGEX, DESTINATION,
INCLUDE_CORE_ATTRIBUTES, NULL_VALUE_FOR_EMPTY_STRING, INCLUDE_SCHEMA
);

public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("Successfully converted attributes to CSV").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("Failed to convert attributes to CSV").build();

private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE);

private volatile Boolean includeCoreAttributes;
private volatile Set<String> coreAttributes;
private volatile boolean destinationContent;
private volatile boolean nullValForEmptyString;
private volatile Pattern pattern;
private volatile Boolean includeSchema;

@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(ATTRIBUTES_LIST);
properties.add(ATTRIBUTES_REGEX);
properties.add(DESTINATION);
properties.add(INCLUDE_CORE_ATTRIBUTES);
properties.add(NULL_VALUE_FOR_EMPTY_STRING);
properties.add(INCLUDE_SCHEMA);
this.properties = Collections.unmodifiableList(properties);

final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
return PROPERTIES;
}

@Override
public Set<Relationship> getRelationships() {
return relationships;
return RELATIONSHIPS;
}


Expand Down Expand Up @@ -311,7 +295,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
if (destinationContent) {
FlowFile conFlowfile = session.write(original, (in, out) -> {
if (includeSchema) {
sbNames.append(System.getProperty("line.separator"));
sbNames.append(System.lineSeparator());
out.write(sbNames.toString().getBytes());
}
out.write(sbValues.toString().getBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,13 @@
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

import java.io.BufferedOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -157,7 +154,7 @@ public String getDescription() {
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues(JsonHandlingStrategy.class)
.defaultValue(AttributesToJSON.JsonHandlingStrategy.ESCAPED.getValue())
.defaultValue(AttributesToJSON.JsonHandlingStrategy.ESCAPED)
.build();

public static final PropertyDescriptor PRETTY_PRINT = new PropertyDescriptor.Builder()
Expand All @@ -170,14 +167,19 @@ public String getDescription() {
.dependsOn(DESTINATION, DESTINATION_CONTENT)
.build();

private static final List<PropertyDescriptor> PROPERTIES = List.of(
ATTRIBUTES_LIST, ATTRIBUTES_REGEX, DESTINATION, INCLUDE_CORE_ATTRIBUTES,
NULL_VALUE_FOR_EMPTY_STRING, JSON_HANDLING_STRATEGY, PRETTY_PRINT
);

public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("Successfully converted attributes to JSON").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("Failed to convert attributes to JSON").build();

private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE);

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
private volatile Set<String> attributesToRemove;
private volatile Set<String> attributes;
private volatile Boolean nullValueForEmptyString;
Expand All @@ -186,32 +188,14 @@ public String getDescription() {
private volatile Pattern pattern;
private volatile JsonHandlingStrategy jsonHandlingStrategy;

@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(ATTRIBUTES_LIST);
properties.add(ATTRIBUTES_REGEX);
properties.add(DESTINATION);
properties.add(INCLUDE_CORE_ATTRIBUTES);
properties.add(NULL_VALUE_FOR_EMPTY_STRING);
properties.add(JSON_HANDLING_STRATEGY);
properties.add(PRETTY_PRINT);
this.properties = Collections.unmodifiableList(properties);

final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
return PROPERTIES;
}

@Override
public Set<Relationship> getRelationships() {
return relationships;
return RELATIONSHIPS;
}

/**
Expand Down Expand Up @@ -272,15 +256,15 @@ private Set<String> buildAtrs(String atrList) {

@OnScheduled
public void onScheduled(ProcessContext context) {
attributesToRemove = context.getProperty(INCLUDE_CORE_ATTRIBUTES).asBoolean() ? Collections.EMPTY_SET : Arrays.stream(CoreAttributes.values())
attributesToRemove = context.getProperty(INCLUDE_CORE_ATTRIBUTES).asBoolean() ? Set.of() : Arrays.stream(CoreAttributes.values())
.map(CoreAttributes::key)
.collect(Collectors.toSet());
attributes = buildAtrs(context.getProperty(ATTRIBUTES_LIST).getValue());
nullValueForEmptyString = context.getProperty(NULL_VALUE_FOR_EMPTY_STRING).asBoolean();
destinationContent = DESTINATION_CONTENT.equals(context.getProperty(DESTINATION).getValue());
final boolean prettyPrint = context.getProperty(PRETTY_PRINT).asBoolean();
objectWriter = destinationContent && prettyPrint ? OBJECT_MAPPER.writerWithDefaultPrettyPrinter() : OBJECT_MAPPER.writer();
jsonHandlingStrategy = JsonHandlingStrategy.valueOf(context.getProperty(JSON_HANDLING_STRATEGY).getValue());
jsonHandlingStrategy = context.getProperty(JSON_HANDLING_STRATEGY).asAllowableValue(JsonHandlingStrategy.class);

if (context.getProperty(ATTRIBUTES_REGEX).isSet()) {
pattern = Pattern.compile(context.getProperty(ATTRIBUTES_REGEX).evaluateAttributeExpressions().getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -87,6 +86,8 @@ public class CalculateRecordStats extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();

static final List<PropertyDescriptor> PROPERTIES = List.of(RECORD_READER, LIMIT);

static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("If a flowfile is successfully processed, it goes here.")
Expand All @@ -96,21 +97,9 @@ public class CalculateRecordStats extends AbstractProcessor {
.description("If a flowfile fails to be processed, it goes here.")
.build();

private RecordPathCache cache;
static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE);

static final Set RELATIONSHIPS;
static final List<PropertyDescriptor> PROPERTIES;

static {
Set _rels = new HashSet();
_rels.add(REL_SUCCESS);
_rels.add(REL_FAILURE);
RELATIONSHIPS = Collections.unmodifiableSet(_rels);
List<PropertyDescriptor> _temp = new ArrayList<>();
_temp.add(RECORD_READER);
_temp.add(LIMIT);
PROPERTIES = Collections.unmodifiableList(_temp);
}
private RecordPathCache cache;

protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
Expand Down Expand Up @@ -160,7 +149,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro

protected Map<String, RecordPath> getRecordPaths(ProcessContext context, FlowFile flowFile) {
return context.getProperties().keySet()
.stream().filter(p -> p.isDynamic())
.stream().filter(PropertyDescriptor::isDynamic)
.collect(Collectors.toMap(
e -> e.getName(),
e -> {
Expand Down Expand Up @@ -189,7 +178,7 @@ protected Map<String, String> getStats(FlowFile flowFile, Map<String, RecordPath
String approxValue = value.get().getValue().toString();
String baseKey = String.format("recordStats.%s", entry.getKey());
String key = String.format("%s.%s", baseKey, approxValue);
Integer stat = retVal.containsKey(key) ? retVal.get(key) : 0;
Integer stat = retVal.getOrDefault(key, 0);
Integer baseStat = retVal.getOrDefault(baseKey, 0);
stat++;
baseStat++;
Expand Down Expand Up @@ -224,10 +213,10 @@ protected Map<String, String> getStats(FlowFile flowFile, Map<String, RecordPath
protected Map filterBySize(Map<String, Integer> values, Integer limit, List<String> baseKeys) {
Map<String, Integer> toFilter = values.entrySet().stream()
.filter(e -> !baseKeys.contains(e.getKey()))
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Map<String, Integer> retVal = values.entrySet().stream()
.filter((e -> baseKeys.contains(e.getKey())))
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

List<Map.Entry<String, Integer>> _flat = new ArrayList<>(toFilter.entrySet());
_flat.sort(Map.Entry.comparingByValue());
Expand Down
Loading

0 comments on commit 058a551

Please sign in to comment.