Skip to content

Commit

Permalink
NIFI-12491 Added Starting Row Schema Strategy to ExcelReader (#9081)
Browse files Browse the repository at this point in the history
Signed-off-by: David Handermann <exceptionfactory@apache.org>
  • Loading branch information
dan-s1 authored Jul 16, 2024
1 parent 5493fde commit 2195956
Show file tree
Hide file tree
Showing 6 changed files with 363 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.excel;

import org.apache.commons.lang3.math.NumberUtils;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaAccessStrategy;
import org.apache.nifi.schema.access.SchemaField;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schema.inference.FieldTypeInference;
import org.apache.nifi.schema.inference.TimeValueInference;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.SchemaInferenceUtil;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.DataFormatter;
import org.apache.poi.ss.usermodel.Row;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class ExcelHeaderSchemaStrategy implements SchemaAccessStrategy {
private static final Set<SchemaField> schemaFields = EnumSet.noneOf(SchemaField.class);
static final int NUM_ROWS_TO_DETERMINE_TYPES = 10; // NOTE: This number is arbitrary.
static final AllowableValue USE_STARTING_ROW = new AllowableValue("Use Starting Row", "Use Starting Row",
"The configured first row of the Excel file is a header line that contains the names of the columns. The schema will be derived by using the "
+ "column names in the header and the following " + NUM_ROWS_TO_DETERMINE_TYPES + " rows to determine the type(s) of each column");

private final PropertyContext context;
private final ComponentLog logger;
private final TimeValueInference timeValueInference;
private final DataFormatter dataFormatter;

public ExcelHeaderSchemaStrategy(PropertyContext context, ComponentLog logger, TimeValueInference timeValueInference, Locale locale) {
this.context = context;
this.logger = logger;
this.timeValueInference = timeValueInference;
this.dataFormatter = locale == null ? new DataFormatter() : new DataFormatter(locale);
}

@Override
public RecordSchema getSchema(Map<String, String> variables, InputStream contentStream, RecordSchema readSchema) throws SchemaNotFoundException, IOException {
if (this.context == null) {
throw new SchemaNotFoundException("Schema Access Strategy intended only for validation purposes and cannot obtain schema");
}

final String requiredSheetsDelimited = context.getProperty(ExcelReader.REQUIRED_SHEETS).evaluateAttributeExpressions(variables).getValue();
final List<String> requiredSheets = ExcelReader.getRequiredSheets(requiredSheetsDelimited);
final Integer rawFirstRow = context.getProperty(ExcelReader.STARTING_ROW).evaluateAttributeExpressions(variables).asInteger();
final int firstRow = rawFirstRow == null ? NumberUtils.toInt(ExcelReader.STARTING_ROW.getDefaultValue()) : rawFirstRow;
final int zeroBasedFirstRow = ExcelReader.getZeroBasedIndex(firstRow);
final String password = context.getProperty(ExcelReader.PASSWORD).getValue();
final ExcelRecordReaderConfiguration configuration = new ExcelRecordReaderConfiguration.Builder()
.withRequiredSheets(requiredSheets)
.withFirstRow(zeroBasedFirstRow)
.withPassword(password)
.build();

final RowIterator rowIterator = new RowIterator(contentStream, configuration, logger);
final Map<String, FieldTypeInference> typeMap = new LinkedHashMap<>();
List<String> fieldNames = null;
int index = 0;

while (rowIterator.hasNext()) {
Row row = rowIterator.next();
if (index == 0) {
fieldNames = getFieldNames(firstRow, row);
} else if (index <= NUM_ROWS_TO_DETERMINE_TYPES) {
inferSchema(row, fieldNames, typeMap);
} else {
break;
}

index++;
}

if (typeMap.isEmpty()) {
final String message = String.format("Failed to infer schema from empty first %d rows", NUM_ROWS_TO_DETERMINE_TYPES);
throw new SchemaNotFoundException(message);
}
return createSchema(typeMap);
}

private List<String> getFieldNames(int firstRowIndex, Row row) throws SchemaNotFoundException {
if (!ExcelUtils.hasCells(row)) {
throw new SchemaNotFoundException(String.format("Field names could not be determined from configured header row %s, as this row has no cells with data", firstRowIndex));
}

final List<String> fieldNames = new ArrayList<>();
for (int index = 0; index < row.getLastCellNum(); index++) {
final Cell cell = row.getCell(index);
final String fieldName = dataFormatter.formatCellValue(cell);

// NOTE: This accounts for column(s) which may be empty in the configured starting row.
if (fieldName == null || fieldName.isEmpty()) {
fieldNames.add(ExcelUtils.FIELD_NAME_PREFIX + index);
} else {
fieldNames.add(fieldName);
}
}

return fieldNames;
}

private void inferSchema(final Row row, final List<String> fieldNames, final Map<String, FieldTypeInference> typeMap) throws SchemaNotFoundException {
// NOTE: This allows rows to be blank when inferring the schema
if (ExcelUtils.hasCells(row)) {
if (row.getLastCellNum() > fieldNames.size()) {
throw new SchemaNotFoundException(String.format("Row %s has %s cells, more than the expected %s number of field names", row.getRowNum(), row.getLastCellNum(), fieldNames.size()));
}

IntStream.range(0, row.getLastCellNum())
.forEach(index -> {
final Cell cell = row.getCell(index);
final String fieldName = fieldNames.get(index);
final FieldTypeInference typeInference = typeMap.computeIfAbsent(fieldName, key -> new FieldTypeInference());
final String formattedCellValue = dataFormatter.formatCellValue(cell);
final DataType dataType = SchemaInferenceUtil.getDataType(formattedCellValue, timeValueInference);
typeInference.addPossibleDataType(dataType);
});
}
}

private RecordSchema createSchema(final Map<String, FieldTypeInference> inferences) {
final List<RecordField> recordFields = inferences.entrySet().stream()
.map(entry -> new RecordField(entry.getKey(), entry.getValue().toDataType(), true))
.collect(Collectors.toList());
return new SimpleRecordSchema(recordFields);
}

@Override
public Set<SchemaField> getSuppliedSchemaFields() {
return schemaFields;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaAccessStrategy;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schema.inference.InferSchemaAccessStrategy;
import org.apache.nifi.schema.inference.RecordSourceFactory;
Expand Down Expand Up @@ -104,8 +105,9 @@ public String getDescription() {
public static final PropertyDescriptor STARTING_ROW = new PropertyDescriptor
.Builder().name("Starting Row")
.displayName("Starting Row")
.description("The row number of the first row to start processing (One based)."
+ " Use this to skip over rows of data at the top of a worksheet that are not part of the dataset.")
.description("The row number of the first row to start processing (One based)." +
" Use this to skip over rows of data at the top of a worksheet that are not part of the dataset." +
" When using the '" + ExcelHeaderSchemaStrategy.USE_STARTING_ROW.getValue() + "' strategy this should be the column header row.")
.required(true)
.defaultValue("1")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
Expand Down Expand Up @@ -184,7 +186,9 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {

@Override
protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final PropertyContext context) {
if (SchemaInferenceUtil.INFER_SCHEMA.getValue().equals(allowableValue)) {
if (allowableValue.equalsIgnoreCase(ExcelHeaderSchemaStrategy.USE_STARTING_ROW.getValue())) {
return new ExcelHeaderSchemaStrategy(context, getLogger(), new TimeValueInference(dateFormat, timeFormat, timestampFormat), null);
} else if (SchemaInferenceUtil.INFER_SCHEMA.getValue().equals(allowableValue)) {
final RecordSourceFactory<Row> sourceFactory = (variables, in) -> new ExcelRecordSource(in, context, variables, getLogger());
final SchemaInferenceEngine<Row> inference = new ExcelSchemaInference(new TimeValueInference(dateFormat, timeFormat, timestampFormat));
return new InferSchemaAccessStrategy<>(sourceFactory, inference, getLogger());
Expand All @@ -196,17 +200,23 @@ protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableVal
@Override
protected List<AllowableValue> getSchemaAccessStrategyValues() {
final List<AllowableValue> allowableValues = new ArrayList<>(super.getSchemaAccessStrategyValues());
allowableValues.add(ExcelHeaderSchemaStrategy.USE_STARTING_ROW);
allowableValues.add(SchemaInferenceUtil.INFER_SCHEMA);
return allowableValues;
}

@Override
protected AllowableValue getDefaultSchemaAccessStrategy() {
return SchemaInferenceUtil.INFER_SCHEMA;
return ExcelHeaderSchemaStrategy.USE_STARTING_ROW;
}

private int getStartingRow(final Map<String, String> variables) {
int rawStartingRow = configurationContext.getProperty(STARTING_ROW).evaluateAttributeExpressions(variables).asInteger();
String schemaAccessStrategy = configurationContext.getProperty(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY).getValue();

if (ExcelHeaderSchemaStrategy.USE_STARTING_ROW.getValue().equals(schemaAccessStrategy)) {
rawStartingRow++;
}
return getZeroBasedIndex(rawStartingRow);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.stream.IntStream;

public class ExcelSchemaInference implements SchemaInferenceEngine<Row> {
static final String FIELD_NAME_PREFIX = "column_";
private final TimeValueInference timeValueInference;
private final DataFormatter dataFormatter;

Expand Down Expand Up @@ -66,7 +65,7 @@ private void inferSchema(final Row row, final Map<String, FieldTypeInference> ty
IntStream.range(0, row.getLastCellNum())
.forEach(index -> {
final Cell cell = row.getCell(index);
final String fieldName = FIELD_NAME_PREFIX + index;
final String fieldName = ExcelUtils.FIELD_NAME_PREFIX + index;
final FieldTypeInference typeInference = typeMap.computeIfAbsent(fieldName, key -> new FieldTypeInference());
final String formattedCellValue = dataFormatter.formatCellValue(cell);
final DataType dataType = SchemaInferenceUtil.getDataType(formattedCellValue, timeValueInference);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.apache.poi.ss.usermodel.Row;

public class ExcelUtils {
static final String FIELD_NAME_PREFIX = "column_";

private ExcelUtils() {
}

Expand Down
Loading

0 comments on commit 2195956

Please sign in to comment.