From aebd8d278a2be31482af09cc3685b842aed3231e Mon Sep 17 00:00:00 2001 From: Matthew Peterson Date: Thu, 24 Feb 2022 21:37:47 +0000 Subject: [PATCH] Fix #1438 - optionally gather fields after ingest exception --- .../ingest/mapreduce/EventMapper.java | 249 ++++------ .../ingest/mapreduce/FieldHarvester.java | 255 ++++++++++ .../ingest/mapreduce/FieldSalvager.java | 24 + .../edge/ProtobufEdgeDataTypeHandler.java | 4 +- .../data/RawRecordContainerImplTest.java | 4 +- .../data/config/DataTypeHelperImplTest.java | 12 +- .../config/NormalizedFieldAndValueTest.java | 4 +- .../FieldNameAliaserNormalizerTest.java | 6 +- .../MinimalistIngestHelperInterfaceImpl.java | 246 ++++++++++ .../normalizer/AbstractNormalizerTest.java | 4 +- .../EventMapperSalvageFieldsOnErrorTest.java | 233 +++++++++ .../ingest/mapreduce/EventMapperTest.java | 32 +- .../ingest/mapreduce/FieldHarvesterTest.java | 464 ++++++++++++++++++ .../ingest/mapreduce/IngestTestSetup.java | 65 +++ .../DateIndexDataTypeHandlerTest.java | 4 +- .../edge/ProtobufEdgeDeleteModeTest.java | 6 +- .../job/CBMutationOutputFormatterTest.java | 14 +- .../datawave/util/TypeRegistryTestSetup.java | 20 + 18 files changed, 1433 insertions(+), 213 deletions(-) create mode 100644 warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldHarvester.java create mode 100644 warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldSalvager.java create mode 100644 warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/MinimalistIngestHelperInterfaceImpl.java create mode 100644 warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/EventMapperSalvageFieldsOnErrorTest.java create mode 100644 warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/FieldHarvesterTest.java create mode 100644 warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/IngestTestSetup.java create mode 100644 warehouse/ingest-core/src/test/java/datawave/util/TypeRegistryTestSetup.java diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/EventMapper.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/EventMapper.java index 6ccf80c8fa2..ba37b2e6538 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/EventMapper.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/EventMapper.java @@ -3,18 +3,13 @@ import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; -import datawave.data.normalizer.DateNormalizer; import datawave.ingest.data.RawRecordContainer; import datawave.ingest.data.Type; import datawave.ingest.data.TypeRegistry; import datawave.ingest.data.config.DataTypeHelper; import datawave.ingest.data.config.NormalizedContentInterface; -import datawave.ingest.data.config.NormalizedFieldAndValue; import datawave.ingest.data.config.filter.KeyValueFilter; -import datawave.ingest.data.config.ingest.CompositeIngest; -import datawave.ingest.data.config.ingest.FilterIngest; import datawave.ingest.data.config.ingest.IngestHelperInterface; -import datawave.ingest.data.config.ingest.VirtualIngest; import datawave.ingest.input.reader.event.EventErrorSummary; import datawave.ingest.mapreduce.handler.DataTypeHandler; import datawave.ingest.mapreduce.handler.ExtendedDataTypeHandler; @@ -56,12 +51,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.SortedMap; import java.util.Stack; @@ -92,9 +85,6 @@ * output value */ public class EventMapper extends StatsDEnabledMapper { - - private static final String SRC_FILE_DEL = "|"; - private static final Logger log = Logger.getLogger(EventMapper.class); /** @@ -107,24 +97,6 @@ public class EventMapper extends StatsDE public static final String CONTEXT_WRITER_OUTPUT_TABLE_COUNTERS = "ingest.event.mapper.context.writer.output.table.counters"; public static final String FILE_NAME_COUNTERS = "ingest.event.mapper.file.name.counters"; - protected boolean createSequenceFileName = true; - - protected boolean trimSequenceFileName = true; - - protected boolean createRawFileName = true; - - public static final String LOAD_DATE_FIELDNAME = "LOAD_DATE"; - - public static final String SEQUENCE_FILE_FIELDNAME = "ORIG_FILE"; - - public static final String LOAD_SEQUENCE_FILE_NAME = "ingest.event.mapper.load.seq.filename"; - - public static final String TRIM_SEQUENCE_FILE_NAME = "ingest.event.mapper.trim.sequence.filename"; - - public static final String RAW_FILE_FIELDNAME = "RAW_FILE"; - - public static final String LOAD_RAW_FILE_NAME = "ingest.event.mapper.load.raw.filename"; - public static final String ID_FILTER_FSTS = "ingest.event.mapper.id.filter.fsts"; protected Map>> typeMap = new HashMap<>(); @@ -142,8 +114,6 @@ public class EventMapper extends StatsDE private StandaloneStatusReporter reporter = new StandaloneStatusReporter(); - private DateNormalizer dateNormalizer = new DateNormalizer(); - private ContextWriter contextWriter = null; protected long offset = 0; @@ -160,6 +130,8 @@ public class EventMapper extends StatsDE private MetricsService metricsService; private ReusableMetricsLabels metricsLabels; + private FieldHarvester fieldHarvester; + /** * Set up the datatype handlers */ @@ -186,12 +158,8 @@ public void setup(Context context) throws IOException, InterruptedException { interval = context.getConfiguration().getLong(DISCARD_INTERVAL, 0l); - // default to true, but it can be disabled - createSequenceFileName = context.getConfiguration().getBoolean(LOAD_SEQUENCE_FILE_NAME, true); - - trimSequenceFileName = context.getConfiguration().getBoolean(TRIM_SEQUENCE_FILE_NAME, true); - - createRawFileName = context.getConfiguration().getBoolean(LOAD_RAW_FILE_NAME, true); + // FieldHarvester encapsulates the addition of virtual fields, composite fields, LOAD_DATE, etc. + fieldHarvester = new FieldHarvester(context.getConfiguration()); Class> firstFilter = null; @@ -287,7 +255,7 @@ public void setup(Context context) throws IOException, InterruptedException { * * @return the data type handlers */ - private List> loadDataType(String typeStr, Context context) { + private List> loadDataTypeHandlers(String typeStr, Context context) { // Do not load the type twice if (!typeMap.containsKey(typeStr)) { @@ -388,7 +356,7 @@ public void map(K1 key, V1 value, Context context) throws IOException, Interrupt } // ensure this datatype's handlers etc are loaded such that the dataTypeDiscardIntervalCache and validators are filled as well - List> typeHandlers = loadDataType(value.getDataType().typeName(), context); + List> typeHandlers = loadDataTypeHandlers(value.getDataType().typeName(), context); // This is a little bit fragile, but there is no other way // to get the context on a partitioner, and we are only @@ -432,13 +400,13 @@ public void map(K1 key, V1 value, Context context) throws IOException, Interrupt try { // Load error dataType into typeMap - loadDataType(TypeRegistry.ERROR_PREFIX, context); + loadDataTypeHandlers(TypeRegistry.ERROR_PREFIX, context); // purge event errorSummary.purge(contextWriter, context, value, typeMap); // Set the original file value from the event in the error table - Collection origFiles = errorSummary.getEventFields().get(SEQUENCE_FILE_FIELDNAME); + Collection origFiles = errorSummary.getEventFields().get(FieldHarvester.SEQUENCE_FILE_FIELDNAME); if (!origFiles.isEmpty()) { NDC.push(origFiles.iterator().next()); reprocessedNDCPush = true; @@ -468,7 +436,7 @@ public void map(K1 key, V1 value, Context context) throws IOException, Interrupt // Add the list of handlers with the ALL specified handlers List> handlers = new ArrayList<>(); handlers.addAll(typeHandlers); - handlers.addAll(loadDataType(TypeRegistry.ALL_PREFIX, context)); + handlers.addAll(loadDataTypeHandlers(TypeRegistry.ALL_PREFIX, context)); // Always include any event errors in the counters for (String error : value.getErrors()) { @@ -481,7 +449,7 @@ public void map(K1 key, V1 value, Context context) throws IOException, Interrupt handlers.clear(); if (!value.ignorableError()) { // since this is not an ignorable error, lets add the error handlers back into the list - handlers.addAll(loadDataType(TypeRegistry.ERROR_PREFIX, context)); + handlers.addAll(loadDataTypeHandlers(TypeRegistry.ERROR_PREFIX, context)); getCounter(context, IngestInput.EVENT_FATAL_ERROR).increment(1); getCounter(context, IngestInput.EVENT_FATAL_ERROR.name(), "ValidationError").increment(1); @@ -500,42 +468,12 @@ public void map(K1 key, V1 value, Context context) throws IOException, Interrupt // Rollback anything written for this event contextWriter.rollback(); - // Fail job on constraint violations - if (e instanceof ConstraintChecker.ConstraintViolationException) { - throw ((RuntimeException) e); - } + failJobOnConstraintViolations(e); // ensure they know we are still working on it context.progress(); - // log error - log.error("Runtime exception processing event", e); - - // now lets dump to the errors table - // first set the exception on the event if not a field normalization error in which case the fields contain the errors - if (!(e instanceof FieldNormalizationError)) { - value.setAuxData(e); - } - for (DataTypeHandler handler : loadDataType(TypeRegistry.ERROR_PREFIX, context)) { - if (log.isTraceEnabled()) - log.trace("executing handler: " + handler.getClass().getName()); - try { - executeHandler(key, value, fields, handler, context); - context.progress(); - } catch (Exception e2) { - // This is a real bummer, we had a critical exception attempting to throw the event into the error table. - // lets terminate this job - log.error("Failed to process error data handlers for an event", e2); - throw new IOException("Failed to process error data handlers for an event", e2); - } - } - - // now create some counters - getCounter(context, IngestProcess.RUNTIME_EXCEPTION).increment(1); - List exceptions = getExceptionSynopsis(e); - for (String exception : exceptions) { - getCounter(context, IngestProcess.RUNTIME_EXCEPTION.name(), exception).increment(1); - } + handleProcessingError(key, value, context, fields, e); } finally { // Remove ORIG_FILE from NDC that was populated by reprocessing events from the error tables if (reprocessedNDCPush) { @@ -546,10 +484,60 @@ public void map(K1 key, V1 value, Context context) throws IOException, Interrupt context.progress(); } + incrementEventCount(value, context); + + updateMetrics(value, eventMapperTimer, fields); + } + + private void failJobOnConstraintViolations(Exception e) { + if (e instanceof ConstraintChecker.ConstraintViolationException) { + throw ((RuntimeException) e); + } + } + + private void handleProcessingError(K1 key, V1 value, Context context, Multimap fields, Exception e) throws IOException { + log.error("Runtime exception processing event", e); + // first set the exception on the event if not a field normalization error in which case the fields contain the errors + if (!(e instanceof FieldHarvester.FieldNormalizationError)) { + value.setAuxData(e); + } + writeToErrorTables(key, value, context, fields); + incrementExceptionCounters(context, e); + } + + private void writeToErrorTables(K1 key, V1 value, Context context, Multimap fields) throws IOException { + // now lets dump to the errors table + for (DataTypeHandler handler : loadDataTypeHandlers(TypeRegistry.ERROR_PREFIX, context)) { + if (log.isTraceEnabled()) + log.trace("executing handler: " + handler.getClass().getName()); + try { + executeHandler(key, value, fields, handler, context); + context.progress(); + } catch (Exception e2) { + // This is a real bummer, we had a critical exception attempting to throw the event into the error table. + // lets terminate this job + log.error("Failed to process error data handlers for an event", e2); + throw new IOException("Failed to process error data handlers for an event", e2); + } + } + } + + private void incrementExceptionCounters(Context context, Exception e) { + // now create some counters + getCounter(context, IngestProcess.RUNTIME_EXCEPTION).increment(1); + List exceptions = getExceptionSynopsis(e); + for (String exception : exceptions) { + getCounter(context, IngestProcess.RUNTIME_EXCEPTION.name(), exception).increment(1); + } + } + + private void incrementEventCount(V1 value, Context context) { getCounter(context, IngestOutput.EVENTS_PROCESSED.name(), value.getDataType().typeName().toUpperCase()).increment(1); offset++; - + } + + private void updateMetrics(V1 value, TraceStopwatch eventMapperTimer, Multimap fields) { if (metricsEnabled && eventMapperTimer != null) { eventMapperTimer.stop(); long timeInEventMapper = eventMapperTimer.elapsed(TimeUnit.MILLISECONDS); @@ -695,27 +683,15 @@ public void processEvent(K1 key, RawRecordContainer value, List entry : getFields(value, handler).entries()) { - // noinspection ThrowableResultOfMethodCallIgnored - if (entry.getValue().getError() != null) { - e = entry.getValue().getError(); - } - fields.put(entry.getKey(), entry.getValue()); - } - if (e != null) { - throw new FieldNormalizationError("Failed getting all fields", e); - } - // Event based metrics - if (metricsEnabled) { - metricsLabels.clear(); - metricsLabels.put("dataType", value.getDataType().typeName()); - - metricsService.collect(Metric.EVENT_COUNT, metricsLabels.get(), fields, 1L); - metricsService.collect(Metric.BYTE_COUNT, metricsLabels.get(), fields, (long) value.getRawData().length); + // populates fields by parsing value and using IngestHelper + fieldHarvester.extractFields(fields, thisHelper, value, offset, splitStart); + if (fieldHarvester.hasError()) { + throw new Exception(fieldHarvester.getException()); } + updateMetrics(value, fields); + previousHelper = thisHelper; } @@ -730,79 +706,32 @@ public void processEvent(K1 key, RawRecordContainer value, List fields) { + // Event based metrics + if (metricsEnabled) { + metricsLabels.clear(); + metricsLabels.put("dataType", value.getDataType().typeName()); + + metricsService.collect(Metric.EVENT_COUNT, metricsLabels.get(), fields, 1L); + metricsService.collect(Metric.BYTE_COUNT, metricsLabels.get(), fields, (long) value.getRawData().length); } } + /** + * Deprecated. Use #fieldHarvester.extractFields() + */ + @Deprecated + // After eliminating this method, expand fieldHarvester.extractFields by eliminating faultTolerantGetEventFields and addSupplementalFields public Multimap getFields(RawRecordContainer value, DataTypeHandler handler) throws Exception { - Multimap newFields; - // Parse the event into its field names and field values using the DataTypeHandler's BaseIngestHelper object. - newFields = handler.getHelper(value.getDataType()).getEventFields(value); - - // Also get the virtual fields, if applicable. - if (handler.getHelper(value.getDataType()) instanceof VirtualIngest) { - VirtualIngest vHelper = (VirtualIngest) handler.getHelper(value.getDataType()); - Multimap virtualFields = vHelper.getVirtualFields(newFields); - for (Entry v : virtualFields.entries()) - newFields.put(v.getKey(), v.getValue()); - } - // Also get the composite fields, if applicable - if (handler.getHelper(value.getDataType()) instanceof CompositeIngest) { - CompositeIngest vHelper = (CompositeIngest) handler.getHelper(value.getDataType()); - Multimap compositeFields = vHelper.getCompositeFields(newFields); - for (String fieldName : compositeFields.keySet()) { - // if this is an overloaded composite field, we are replacing the existing field data - if (vHelper.isOverloadedCompositeField(fieldName)) - newFields.removeAll(fieldName); - newFields.putAll(fieldName, compositeFields.get(fieldName)); - } - } - - // Create a LOAD_DATE parameter, which is the current time in milliseconds, for all datatypes - long loadDate = now.get(); - NormalizedFieldAndValue loadDateValue = new NormalizedFieldAndValue(LOAD_DATE_FIELDNAME, Long.toString(loadDate)); - // set an indexed field value for use by the date index data type handler - loadDateValue.setIndexedFieldValue(dateNormalizer.normalizeDelegateType(new Date(loadDate))); - newFields.put(LOAD_DATE_FIELDNAME, loadDateValue); - - String seqFileName = null; - - // place the sequence filename into the event - if (createSequenceFileName) { - seqFileName = NDC.peek(); - - if (trimSequenceFileName) { - seqFileName = StringUtils.substringAfterLast(seqFileName, "/"); - } - - if (null != seqFileName) { - StringBuilder seqFile = new StringBuilder(seqFileName); - - seqFile.append(SRC_FILE_DEL).append(offset); - - if (null != splitStart) { - seqFile.append(SRC_FILE_DEL).append(splitStart); - } - - newFields.put(SEQUENCE_FILE_FIELDNAME, new NormalizedFieldAndValue(SEQUENCE_FILE_FIELDNAME, seqFile.toString())); - } + Multimap fields = fieldHarvester.faultTolerantGetEventFields(value, handler.getHelper(value.getDataType())); + if (fieldHarvester.hasError()) { + throw new Exception(fieldHarvester.getException()); } - - if (createRawFileName && !value.getRawFileName().isEmpty() && !value.getRawFileName().equals(seqFileName)) { - newFields.put(RAW_FILE_FIELDNAME, new NormalizedFieldAndValue(RAW_FILE_FIELDNAME, value.getRawFileName())); - } - - // Also if this helper needs to filter the fields before returning, apply now - if (handler.getHelper(value.getDataType()) instanceof FilterIngest) { - FilterIngest fHelper = (FilterIngest) handler.getHelper(value.getDataType()); - fHelper.filter(newFields); + fieldHarvester.addSupplementalFields(value, offset, splitStart, handler.getHelper(value.getDataType()), fields); + if (fieldHarvester.hasError()) { + throw new Exception(fieldHarvester.getException()); } - - return newFields; + return fields; } @SuppressWarnings("unchecked") diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldHarvester.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldHarvester.java new file mode 100644 index 00000000000..8893fbeb880 --- /dev/null +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldHarvester.java @@ -0,0 +1,255 @@ +package datawave.ingest.mapreduce; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import datawave.data.normalizer.DateNormalizer; +import datawave.ingest.data.RawRecordContainer; +import datawave.ingest.data.config.NormalizedContentInterface; +import datawave.ingest.data.config.NormalizedFieldAndValue; +import datawave.ingest.data.config.ingest.CompositeIngest; +import datawave.ingest.data.config.ingest.FilterIngest; +import datawave.ingest.data.config.ingest.IngestHelperInterface; +import datawave.ingest.data.config.ingest.VirtualIngest; +import datawave.ingest.time.Now; +import datawave.util.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; +import org.apache.log4j.NDC; + +import java.util.Date; +import java.util.Map; + +public class FieldHarvester { + private static final Logger log = Logger.getLogger(FieldHarvester.class); + + private static Now now = Now.getInstance(); + + public static final String LOAD_DATE_FIELDNAME = "LOAD_DATE"; + public static final String SEQUENCE_FILE_FIELDNAME = "ORIG_FILE"; + public static final String LOAD_SEQUENCE_FILE_NAME = "ingest.event.mapper.load.seq.filename"; + public static final String TRIM_SEQUENCE_FILE_NAME = "ingest.event.mapper.trim.sequence.filename"; + public static final String RAW_FILE_FIELDNAME = "RAW_FILE"; + public static final String LOAD_RAW_FILE_NAME = "ingest.event.mapper.load.raw.filename"; + + private boolean createSequenceFileName; + private boolean trimSequenceFileName; + private boolean createRawFileName; + private final DateNormalizer dateNormalizer = new DateNormalizer(); + + private static final String SRC_FILE_DEL = "|"; + private Exception exception; + + public FieldHarvester(Configuration configuration) { + this.createSequenceFileName = configuration.getBoolean(LOAD_SEQUENCE_FILE_NAME, true); + this.trimSequenceFileName = configuration.getBoolean(TRIM_SEQUENCE_FILE_NAME, true); + this.createRawFileName = configuration.getBoolean(LOAD_RAW_FILE_NAME, true); + } + + public boolean hasError() { + return null != this.exception; + } + + public Exception getException() { + return this.exception; + } + + /** + * Updates "fields" with extracted, derived, and automatically generated fields. + * + * @param fields + * the Multimap to modify with extracted and generated fields + * @param ingestHelper + * interface to use for field extraction + * @param value + * the record from which the fields will be extracted + * @param offset + * record offset within the source file + * @param splitStart + * the splitStart for the record + */ + public void extractFields(Multimap fields, IngestHelperInterface ingestHelper, RawRecordContainer value, long offset, + String splitStart) { + // reset exception-in-extraction tracking + this.exception = null; + + // "candidateFields" holds the fields that may eventually be added to "fields" + Multimap candidateFields = null; + try { + // get salvaged fields if getEventFields throws exception + candidateFields = faultTolerantGetEventFields(value, ingestHelper); + + // try adding supplemental fields to candidateFields, whether or not there was an exception + addSupplementalFields(value, offset, splitStart, ingestHelper, candidateFields); + } catch (Exception exception) { + this.exception = exception; + } finally { + // Add each "candidateFields" field value to "fields" as long as the field value is without error + addErrorFreeFields(fields, candidateFields); + } + } + + /** + * Calls IngestHelper.getEventFields with value. If an exception is thrown, captures it and attempts to salvage fields from the value. + */ + // package method visibility for EventMapper.getFields only + @Deprecated + Multimap faultTolerantGetEventFields(RawRecordContainer value, IngestHelperInterface ingestHelper) { + try { + // Parse the event into its candidate field names and values using the IngestHelperInterface. + return ingestHelper.getEventFields(value); + } catch (Exception exception) { + // delay throwing the exception + this.exception = exception; + return attemptToSalvageFields(value, ingestHelper); + } + } + + // todo test case where salvage fields are empty + // package method visibility for EventMapper.getFields only + void addSupplementalFields(RawRecordContainer value, long offset, String splitStart, IngestHelperInterface ingestHelper, + Multimap fields) { + addVirtualFields(ingestHelper, fields); + addCompositeFields(ingestHelper, fields); + addLoadDateField(fields); + addFileNameFields(value, offset, splitStart, fields); + applyFieldFilters(ingestHelper, fields); + } + + /* + * Populate the "fields" method parameter with any candidateFields that do not have an Error + */ + private void addErrorFreeFields(Multimap fields, Multimap candidateFields) { + if (null == candidateFields) { + return; + } + Throwable fieldError = null; + for (Map.Entry entry : candidateFields.entries()) { + // noinspection ThrowableResultOfMethodCallIgnored + if (entry.getValue().getError() != null) { + fieldError = entry.getValue().getError(); + } + fields.put(entry.getKey(), entry.getValue()); + } + if (fieldError != null) { + this.exception = new FieldNormalizationError("Failed getting all fields", fieldError); + } + } + + private void addVirtualFields(IngestHelperInterface ingestHelper, Multimap newFields) { + // Also get the virtual fields, if applicable. + if (ingestHelper instanceof VirtualIngest) { + VirtualIngest vHelper = (VirtualIngest) ingestHelper; + Multimap virtualFields = vHelper.getVirtualFields(newFields); + for (Map.Entry v : virtualFields.entries()) + newFields.put(v.getKey(), v.getValue()); + } + } + + private void addCompositeFields(IngestHelperInterface ingestHelper, Multimap newFields) { + // Also get the composite fields, if applicable + if (ingestHelper instanceof CompositeIngest) { + CompositeIngest vHelper = (CompositeIngest) ingestHelper; + Multimap compositeFields = vHelper.getCompositeFields(newFields); + for (String fieldName : compositeFields.keySet()) { + // if this is an overloaded composite field, we are replacing the existing field data + if (vHelper.isOverloadedCompositeField(fieldName)) { + newFields.removeAll(fieldName); + } + newFields.putAll(fieldName, compositeFields.get(fieldName)); + } + } + } + + private void addLoadDateField(Multimap newFields) { + // Create a LOAD_DATE parameter, which is the current time in milliseconds, for all datatypes + long loadDate = now.get(); + NormalizedFieldAndValue loadDateValue = new NormalizedFieldAndValue(LOAD_DATE_FIELDNAME, Long.toString(loadDate)); + // set an indexed field value for use by the date index data type handler + loadDateValue.setIndexedFieldValue(dateNormalizer.normalizeDelegateType(new Date(loadDate))); + newFields.put(LOAD_DATE_FIELDNAME, loadDateValue); + } + + private void addRawFileField(RawRecordContainer value, Multimap newFields, String seqFileName) { + if (createRawFileName && !value.getRawFileName().isEmpty() && !value.getRawFileName().equals(seqFileName)) { + newFields.put(RAW_FILE_FIELDNAME, new NormalizedFieldAndValue(RAW_FILE_FIELDNAME, value.getRawFileName())); + } + } + + private void addOrigFileField(Multimap newFields, long offset, String splitStart, String seqFileName) { + if (null != seqFileName) { + StringBuilder seqFile = new StringBuilder(seqFileName); + + seqFile.append(SRC_FILE_DEL).append(offset); + + if (null != splitStart) { + seqFile.append(SRC_FILE_DEL).append(splitStart); + } + + newFields.put(SEQUENCE_FILE_FIELDNAME, new NormalizedFieldAndValue(SEQUENCE_FILE_FIELDNAME, seqFile.toString())); + } + } + + private String getSeqFileName() { + String seqFileName; + seqFileName = NDC.peek(); + + if (trimSequenceFileName) { + seqFileName = StringUtils.substringAfterLast(seqFileName, "/"); + } + return seqFileName; + } + + private void addFileNameFields(RawRecordContainer value, long offset, String splitStart, Multimap newFields) { + String seqFileName = null; + + if (createSequenceFileName) { + seqFileName = getSeqFileName(); + + // place the sequence filename into the event + addOrigFileField(newFields, offset, splitStart, seqFileName); + } + + addRawFileField(value, newFields, seqFileName); + } + + private void applyFieldFilters(IngestHelperInterface ingestHelper, Multimap newFields) { + // Also if this helper needs to filter the fields before returning, apply now + if (ingestHelper instanceof FilterIngest) { + FilterIngest fHelper = (FilterIngest) ingestHelper; + fHelper.filter(newFields); + } + } + + /** + * If IngestHelper implements FieldSalvager, get the salvageable fields from value. Otherwise, return an empty Multimap. + */ + private Multimap attemptToSalvageFields(RawRecordContainer value, IngestHelperInterface ingestHelper) { + // If this helper is able, attempt to salvage a subset of the fields + if (null != ingestHelper && ingestHelper instanceof FieldSalvager) { + FieldSalvager salvager = (FieldSalvager) ingestHelper; + try { + Multimap salvagedFields = salvager.getSalvageableEventFields(value); + if (null != salvagedFields) { + return salvagedFields; + } + } catch (Exception salvagerException) { + // Do not overwrite the original exception + if (null == this.exception) { + this.exception = new IllegalStateException("Unexpected state (FieldExpander.exception should be non-null if salvaging", salvagerException); + } else { + // allow original exception (this.exception) to be thrown by caller + log.error("Even salvager threw an exception", salvagerException); + } + } + } + return HashMultimap.create(); + } + + public static class FieldNormalizationError extends Exception { + private static final long serialVersionUID = 1L; + + public FieldNormalizationError(String message, Throwable cause) { + super(message, cause); + } + } +} diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldSalvager.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldSalvager.java new file mode 100644 index 00000000000..0f6d07a2e2c --- /dev/null +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/FieldSalvager.java @@ -0,0 +1,24 @@ +package datawave.ingest.mapreduce; + +import com.google.common.collect.Multimap; +import datawave.ingest.data.RawRecordContainer; +import datawave.ingest.data.config.NormalizedContentInterface; + +/** + * This optional interface is intended to complement the IngestHelperInterface interface's handling of errors that occur within ingest jobs. + * + * One use case is when IngestHelperInterface's getEventFields throws an exception. The getEventFields method will not return a Multimap of field values + * (because it instead threw an exception). Prior to FieldSalvager, this meant that the error tables would not have information on any of the + * RawRecordContainer's field values. + * + * FieldSalvager implementations can attempt to provide a subset of the field values, so that the error tables can have more helpful information about the + * failed record, perhaps aiding troubleshooting efforts. An implementation could return only those field names that are relatively well-structured and + * predictably formatted, very unlikely to cause exceptions while processing. + */ +public interface FieldSalvager { + /** + * @param rawRecordContainer + * @return Multimap containing subset of field values, possibly empty but not null + */ + Multimap getSalvageableEventFields(RawRecordContainer rawRecordContainer); +} diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDataTypeHandler.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDataTypeHandler.java index 050d12772dc..6cdafac59c5 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDataTypeHandler.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDataTypeHandler.java @@ -20,7 +20,7 @@ import datawave.ingest.data.config.GroupedNormalizedContentInterface; import datawave.ingest.data.config.NormalizedContentInterface; import datawave.ingest.data.config.ingest.IngestHelperInterface; -import datawave.ingest.mapreduce.EventMapper; +import datawave.ingest.mapreduce.FieldHarvester; import datawave.ingest.mapreduce.handler.ExtendedDataTypeHandler; import datawave.ingest.mapreduce.handler.edge.define.EdgeDataBundle; import datawave.ingest.mapreduce.handler.edge.define.EdgeDefinition; @@ -500,7 +500,7 @@ public long process(KEYIN key, RawRecordContainer event, Multimap loadDates = fields.get(EventMapper.LOAD_DATE_FIELDNAME); + Collection loadDates = fields.get(FieldHarvester.LOAD_DATE_FIELDNAME); if (!loadDates.isEmpty()) { NormalizedContentInterface nci = loadDates.iterator().next(); Date date = new Date(Long.parseLong(nci.getEventFieldValue())); diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/data/RawRecordContainerImplTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/data/RawRecordContainerImplTest.java index dde9b6dcb7e..865d52b9297 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/data/RawRecordContainerImplTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/data/RawRecordContainerImplTest.java @@ -19,6 +19,7 @@ import datawave.ingest.config.RawRecordContainerImpl; import datawave.ingest.data.config.MarkingsHelper; +import datawave.util.TypeRegistryTestSetup; import org.apache.hadoop.conf.Configuration; import org.junit.Before; import org.junit.Test; @@ -53,8 +54,7 @@ public void setUp() throws Exception { conf.set("samplecsv" + TypeRegistry.INGEST_HELPER, TestCSVIngestHelper.class.getName()); conf.set("samplecsv.reader.class", TestCSVReader.class.getName()); conf.set("samplecsv" + MarkingsHelper.DEFAULT_MARKING, "PUBLIC|PRIVATE"); - TypeRegistry.reset(); - TypeRegistry.getInstance(conf); + TypeRegistryTestSetup.resetTypeRegistry(conf); dataType = TypeRegistry.getType("samplecsv"); } diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/DataTypeHelperImplTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/DataTypeHelperImplTest.java index c116d4182d8..2a7b325dc50 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/DataTypeHelperImplTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/DataTypeHelperImplTest.java @@ -1,8 +1,7 @@ package datawave.ingest.data.config; -import datawave.ingest.data.TypeRegistry; - import datawave.policy.IngestPolicyEnforcer; +import datawave.util.TypeRegistryTestSetup; import org.apache.hadoop.conf.Configuration; import org.junit.Assert; import org.junit.Before; @@ -25,8 +24,7 @@ public void setup() { @Test(expected = IllegalArgumentException.class) public void testInvalidConfig() { DataTypeHelperImpl helper = new DataTypeHelperImpl(); - TypeRegistry.reset(); - TypeRegistry.getInstance(conf); + TypeRegistryTestSetup.resetTypeRegistry(conf); helper.setup(conf); } @@ -36,8 +34,7 @@ public void testValidConfig() throws Exception { Assert.assertNotNull(configStream); conf.addResource(configStream); Assert.assertThat(conf.get("data.name"), is("fake")); - TypeRegistry.reset(); - TypeRegistry.getInstance(conf); + TypeRegistryTestSetup.resetTypeRegistry(conf); DataTypeHelperImpl helper = new DataTypeHelperImpl(); helper.setup(conf); @@ -54,8 +51,7 @@ public void testDowncaseFields() throws Exception { Assert.assertNotNull(configStream); conf.addResource(configStream); conf.set("fake" + DataTypeHelper.Properties.DOWNCASE_FIELDS, "one,two,three,FOUR"); - TypeRegistry.reset(); - TypeRegistry.getInstance(conf); + TypeRegistryTestSetup.resetTypeRegistry(conf); DataTypeHelperImpl helper = new DataTypeHelperImpl(); helper.setup(conf); diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/NormalizedFieldAndValueTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/NormalizedFieldAndValueTest.java index 82b3a3385e9..5ebef6b0992 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/NormalizedFieldAndValueTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/NormalizedFieldAndValueTest.java @@ -21,7 +21,7 @@ public static class NonGroupedInstance implements NormalizedContentInterface { private Map _markings; private Throwable _error; - protected NonGroupedInstance() { + public NonGroupedInstance() { _fieldName = "TestNonGroupedInstance"; _indexedFieldName = "TestIndexedField"; @@ -141,7 +141,7 @@ public static class GroupedInstance implements GroupedNormalizedContentInterface private String _group; private String _subGroup; - protected GroupedInstance() { + public GroupedInstance() { _fieldName = "TestNonGroupedInstance"; _indexedFieldName = "TestIndexedField"; diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/FieldNameAliaserNormalizerTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/FieldNameAliaserNormalizerTest.java index b34937d2c88..245e7486519 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/FieldNameAliaserNormalizerTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/FieldNameAliaserNormalizerTest.java @@ -7,6 +7,7 @@ import datawave.ingest.data.TypeRegistry; import datawave.ingest.data.config.DataTypeHelper.Properties; +import datawave.util.TypeRegistryTestSetup; import org.apache.hadoop.conf.Configuration; import org.junit.Assert; import org.junit.Before; @@ -21,10 +22,7 @@ public void setup() { conf = new Configuration(); conf.set(Properties.DATA_NAME, "test"); conf.set("test" + TypeRegistry.INGEST_HELPER, TestBaseIngestHelper.class.getName()); - - TypeRegistry.reset(); - TypeRegistry.getInstance(conf); - + TypeRegistryTestSetup.resetTypeRegistry(conf); } @Test(expected = IllegalArgumentException.class) diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/MinimalistIngestHelperInterfaceImpl.java b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/MinimalistIngestHelperInterfaceImpl.java new file mode 100644 index 00000000000..3966bb8c9a4 --- /dev/null +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/ingest/MinimalistIngestHelperInterfaceImpl.java @@ -0,0 +1,246 @@ +package datawave.ingest.data.config.ingest; + +import com.google.common.collect.Multimap; +import datawave.ingest.data.RawRecordContainer; +import datawave.ingest.data.Type; +import datawave.ingest.data.config.DataTypeHelperImpl; +import datawave.ingest.data.config.MaskedFieldHelper; +import datawave.ingest.data.config.NormalizedContentInterface; +import datawave.policy.IngestPolicyEnforcer; +import org.apache.hadoop.conf.Configuration; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +// For testing only, this implementation fulfills the interface contract with methods that throw UnsupportedOperationException. +// It is is extendable when implementations are needed for specific tests. +public class MinimalistIngestHelperInterfaceImpl implements IngestHelperInterface { + @Override + public Type getType() { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public IngestPolicyEnforcer getPolicyEnforcer() { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public void setup(Configuration conf) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public Set getShardExclusions() { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean isShardExcluded(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public Multimap getEventFields(RawRecordContainer value) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public Multimap normalizeMap(Multimap fields) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public Multimap normalize(Multimap fields) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public List> getDataTypes(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public String getNormalizedMaskedValue(String key) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasMappings() { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean contains(String key) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public String get(String key) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean getDeleteMode() { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean getReplaceMalformedUTF8() { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean isEmbeddedHelperMaskedFieldHelper() { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public MaskedFieldHelper getEmbeddedHelperAsMaskedFieldHelper() { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public DataTypeHelperImpl getEmbeddedHelper() { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean isIndexedField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean isReverseIndexedField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean isIndexOnlyField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public void addIndexedField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public void addShardExclusionField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public void addReverseIndexedField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public void addIndexOnlyField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean isCompositeField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean isOverloadedCompositeField(String fieldName) { + return true; + } + + @Override + public boolean isNormalizedField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public void addNormalizedField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean isAliasedIndexField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public HashSet getAliasesForIndexedField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean isDataTypeField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public Multimap getCompositeFieldDefinitions() { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public Map getCompositeFieldSeparators() { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean isVirtualIndexedField(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public Map getVirtualNameAndIndex(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean shouldHaveBeenIndexed(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } + + @Override + public boolean shouldHaveBeenReverseIndexed(String fieldName) { + // override this method, as needed + throw new UnsupportedOperationException(); + } +} diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/data/normalizer/AbstractNormalizerTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/data/normalizer/AbstractNormalizerTest.java index acc39e28839..ec8e5f12b5f 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/data/normalizer/AbstractNormalizerTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/data/normalizer/AbstractNormalizerTest.java @@ -7,6 +7,7 @@ import datawave.data.normalizer.NormalizationException; +import datawave.util.TypeRegistryTestSetup; import org.apache.hadoop.conf.Configuration; import org.junit.Assert; import org.junit.Before; @@ -32,8 +33,7 @@ public String convertFieldRegex(String fieldName, String fieldRegex) { public void setUp() { Configuration conf = new Configuration(); conf.set("test" + TypeRegistry.INGEST_HELPER, TestBaseIngestHelper.class.getName()); - TypeRegistry.reset(); - TypeRegistry.getInstance(conf); + TypeRegistryTestSetup.resetTypeRegistry(conf); normalizer.setup(TypeRegistry.getType("test"), "test", conf); } diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/EventMapperSalvageFieldsOnErrorTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/EventMapperSalvageFieldsOnErrorTest.java new file mode 100644 index 00000000000..bc09d9eb244 --- /dev/null +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/EventMapperSalvageFieldsOnErrorTest.java @@ -0,0 +1,233 @@ +package datawave.ingest.mapreduce; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import datawave.ingest.data.RawRecordContainer; +import datawave.ingest.data.Type; +import datawave.ingest.data.config.BaseNormalizedContent; +import datawave.ingest.data.config.NormalizedContentInterface; +import datawave.ingest.data.config.ingest.ContentBaseIngestHelper; +import datawave.ingest.data.config.ingest.IngestHelperInterface; +import datawave.ingest.mapreduce.job.BulkIngestKey; +import datawave.ingest.mapreduce.job.writer.ContextWriter; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.easymock.EasyMockRule; +import org.easymock.Mock; +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.HashMap; +import java.util.Map; + +import static org.easymock.EasyMock.verify; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * This test class aims to test only the scenarios in which EventMapper encounters an error, to verify its behavior with the FieldSalvager interface. It's + * modeled after EventMapperTest. + */ +public class EventMapperSalvageFieldsOnErrorTest { + private static final String[] SALVAGED_FIELDS = {"ISBN", "author", "borrower", "dueDate", "libraryBranch"}; + + @Rule + public EasyMockRule easyMockRule = new EasyMockRule(this); + + @Mock + private Mapper.Context mapContext; + + private Configuration conf; + private SimpleRawRecord record; + private EventMapper eventMapper; + + public void setupTest(String dataTypeHandler) throws Exception { + eventMapper = new EventMapper<>(); + + conf = new Configuration(); + conf.setClass(EventMapper.CONTEXT_WRITER_CLASS, TestContextWriter.class, ContextWriter.class); + EventMapperTest.setupMapContextMock(mapContext, conf); + + record = IngestTestSetup.createRecord(dataTypeHandler, conf); + } + + /** + * FakeSalvagingIngestHelper: - always throws an exception when getEventFields is called, to ensure error handling code path is reached within EventMapper. + * - allows for anonymous inline helper creation. + */ + public static abstract class FakeSalvagingIngestHelper extends ContentBaseIngestHelper implements FieldSalvager { + @Override + public Multimap getEventFields(RawRecordContainer value) { + throw new RuntimeException("Simulated exception while getting event fields for value."); + } + } + + /** + * SalvagingDataTypeHandler provides a FieldSalvager implementation that deserializes rawData as a Map<String, String>, then returns a Multimap + * containing only the SALVAGED_FIELDS that were encountered, skipping all others. + */ + public static class SalvagingDataTypeHandler extends SimpleDataTypeHandler { + @Override + public IngestHelperInterface getHelper(Type datatype) { + FakeSalvagingIngestHelper fakeSalvagingIngestHelper = new FakeSalvagingIngestHelper() { + @Override + public Multimap getSalvageableEventFields(RawRecordContainer value) { + HashMultimap fields = HashMultimap.create(); + byte[] rawData = value.getRawData(); + + try (ByteArrayInputStream bytArrayInputStream = new ByteArrayInputStream(rawData); + ObjectInputStream objectInputStream = new ObjectInputStream(bytArrayInputStream);) { + Map deserializedData = (Map) objectInputStream.readObject(); + for (String fieldToSalvage : SALVAGED_FIELDS) { + String fieldValue = deserializedData.get(fieldToSalvage); + if (null != fieldValue) { + try { + fields.put(fieldToSalvage, new BaseNormalizedContent(fieldToSalvage, fieldValue)); + } catch (Exception fieldException) { + // skip this field and proceed to the next + } + } + } + } catch (Exception e) { + return fields; + } + return fields; + } + + @Override + public Multimap getEventFields(RawRecordContainer value) { + throw new RuntimeException("Simulated exception while getting event fields for value."); + } + }; + return fakeSalvagingIngestHelper; + } + } + + /** + * NullSalvagingSimpleDataTypeHandler provides a FieldSalvager implementation that always returns null. + */ + public static class NullSalvagingSimpleDataTypeHandler extends SimpleDataTypeHandler { + @Override + public IngestHelperInterface getHelper(Type datatype) { + return new FakeSalvagingIngestHelper() { + @Override + public Multimap getSalvageableEventFields(RawRecordContainer value) { + return null; + } + }; + } + } + + @After + public void checkMock() { + verify(mapContext); + } + + @Test + public void shouldSalvageAllFields() throws Exception { + setupTest(SalvagingDataTypeHandler.class.getName()); + + // Add both salvageable and unsalvageable fields as rawData + HashMap fieldValues = createMapOfSalveagableFieldValues(); + addUnsalvageableFieldsToMap(fieldValues); + record.setRawData(IngestTestSetup.objectToRawBytes(fieldValues)); + + runMapper(); // will throw error, calling ErrorDataTypeHandler. See FakeSalvagingIngestHelper.getEventFields + Multimap written = TestContextWriter.getWritten(); + + // Expect only the salvageable fields, each exactly once + assertEquals(written.toString(), SALVAGED_FIELDS.length, written.size()); + Map fieldNameOccurrences = countPersistedFieldsByName(written); + for (String expectedFieldName : SALVAGED_FIELDS) { + assertTrue(fieldNameOccurrences.toString(), fieldNameOccurrences.containsKey(expectedFieldName)); + assertEquals(1, (int) fieldNameOccurrences.get(expectedFieldName)); + } + } + + @Test + public void shouldTolerateNullSalvagedFieldsMap() throws Exception { + // Use a DataTypeHandler that provides a FieldSalvager that always returns null + setupTest(NullSalvagingSimpleDataTypeHandler.class.getName()); + + // Create a record with salvageable and unsalvageable fields + HashMap fieldValues = createMapOfSalveagableFieldValues(); + addUnsalvageableFieldsToMap(fieldValues); + record.setRawData(IngestTestSetup.objectToRawBytes(fieldValues)); + + runMapper(); // will throw error, calling ErrorDataTypeHandler. See FakeSalvagingIngestHelper.getEventFields + Multimap written = TestContextWriter.getWritten(); + + // Expect nothing to be salvaged + assertEquals(written.toString(), 0, written.size()); + } + + @Test + public void shouldIgnoreNonSalvagedFields() throws Exception { + setupTest(SalvagingDataTypeHandler.class.getName()); + + // Add only unsalvagable fields + HashMap fieldValues = new HashMap<>(); + addUnsalvageableFieldsToMap(fieldValues); + record.setRawData(IngestTestSetup.objectToRawBytes(fieldValues)); + + runMapper(); // will throw error, calling ErrorDataTypeHandler. See FakeSalvagingIngestHelper.getEventFields + Multimap written = TestContextWriter.getWritten(); + + // Expect all of the salvageable fields to occur once + assertEquals(written.toString(), 0, written.size()); + } + + @Test + public void shouldTolerateErrorInSalvager() throws Exception { + setupTest(SalvagingDataTypeHandler.class.getName()); + + // Set raw data to an invalid format, causing an error in the FieldSalvager implementation + record.setRawData("Not a map".getBytes()); + + runMapper(); // will throw error, calling ErrorDataTypeHandler. See FakeSalvagingIngestHelper.getEventFields + Multimap written = TestContextWriter.getWritten(); + + // Expect all of the salvageable fields to occur once + assertEquals(written.toString(), 0, written.size()); + } + + private void runMapper() throws IOException, InterruptedException { + eventMapper.setup(mapContext); + eventMapper.map(new LongWritable(1), record, mapContext); + eventMapper.cleanup(mapContext); + } + + private HashMap createMapOfSalveagableFieldValues() { + HashMap fieldValues = new HashMap<>(); + fieldValues.put("ISBN", "0-123-00000-1"); + fieldValues.put("dueDate", "20211126"); + fieldValues.put("author", "Henry Hope Reed"); + fieldValues.put("borrower", "Edward Clark Potter"); + fieldValues.put("libraryBranch", "Grand Central"); + return fieldValues; + } + + private void addUnsalvageableFieldsToMap(HashMap fieldValues) { + fieldValues.put("genre", "FICTION"); + fieldValues.put("format", "Hardcover"); + } + + /** + * Extracts field names from persisted data, creating a map containing the number of occurrences per field name. + */ + private Map countPersistedFieldsByName(Multimap entries) { + Map result = new HashMap<>(); + for (BulkIngestKey key : entries.keys()) { + String fieldName = key.getKey().getColumnFamily().toString(); + // create or increment + result.merge(fieldName, 1, Integer::sum); + } + return result; + } +} diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/EventMapperTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/EventMapperTest.java index a8dfc3d1586..bbed1a6e19a 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/EventMapperTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/EventMapperTest.java @@ -12,6 +12,7 @@ import datawave.ingest.mapreduce.job.metrics.MetricsConfiguration; import datawave.ingest.mapreduce.job.metrics.TestEventCountMetricsReceiver; import datawave.ingest.mapreduce.job.writer.ContextWriter; +import datawave.util.TypeRegistryTestSetup; import org.apache.accumulo.core.data.Value; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; @@ -30,6 +31,7 @@ import java.io.IOException; import java.util.Map; +// Also see EventMapperSalvageFieldsOnErrorTest public class EventMapperTest { @Rule @@ -51,12 +53,11 @@ public void setUp() throws Exception { conf = new Configuration(); conf.setClass(EventMapper.CONTEXT_WRITER_CLASS, TestContextWriter.class, ContextWriter.class); - Type type = new Type("file", null, null, new String[] {SimpleDataTypeHandler.class.getName()}, 10, null); - Type errorType = new Type(TypeRegistry.ERROR_PREFIX, null, null, new String[] {SimpleDataTypeHandler.class.getName()}, 20, null); + String[] defaultDataTypeHandlers = {SimpleDataTypeHandler.class.getName()}; - TypeRegistry registry = TypeRegistry.getInstance(conf); - registry.put(type.typeName(), type); - registry.put(errorType.typeName(), errorType); + Type type = new Type("file", null, null, defaultDataTypeHandlers, 10, null); + Type errorType = new Type(TypeRegistry.ERROR_PREFIX, null, null, defaultDataTypeHandlers, 20, null); + TypeRegistryTestSetup.resetTypeRegistryWithTypes(conf, type, errorType); Multimap fields = HashMultimap.create(); fields.put("fileExtension", new BaseNormalizedContent("fileExtension", "gz")); @@ -64,25 +65,18 @@ public void setUp() throws Exception { SimpleDataTypeHelper.registerFields(fields); - record = new SimpleRawRecord(); - record.setRawFileTimestamp(eventTime); - record.setDataType(type); - record.setDate(eventTime); - record.setRawFileName("/some/filename"); - record.setRawData("some data".getBytes()); - record.generateId(null); + record = IngestTestSetup.createBasicRecord(eventTime, type); - errorRecord = new SimpleRawRecord(); + errorRecord = IngestTestSetup.createBasicRecord(eventTime, type); errorRecord.setRawFileTimestamp(0); - errorRecord.setDataType(type); - errorRecord.setDate(eventTime); - errorRecord.setRawFileName("/some/filename"); - errorRecord.setRawData("some data".getBytes()); - errorRecord.generateId(null); errorRecord.setRawFileName(""); errorRecord.addError("EVENT_DATE_MISSING"); errorRecord.setFatalError(true); + EventMapperTest.setupMapContextMock(mapContext, conf); + } + + static void setupMapContextMock(Mapper.Context mapContext, Configuration conf) throws IOException, InterruptedException { expect(mapContext.getConfiguration()).andReturn(conf).anyTimes(); mapContext.progress(); @@ -209,7 +203,7 @@ private Map.Entry getMetric(Multimap w } private Map.Entry getRawFileName(Multimap written) { - return getFieldEntry(written, EventMapper.RAW_FILE_FIELDNAME.toString()); + return getFieldEntry(written, FieldHarvester.RAW_FILE_FIELDNAME.toString()); } private Map.Entry getFieldEntry(Multimap written, String field) { diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/FieldHarvesterTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/FieldHarvesterTest.java new file mode 100644 index 00000000000..0b65036a5bc --- /dev/null +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/FieldHarvesterTest.java @@ -0,0 +1,464 @@ +package datawave.ingest.mapreduce; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import datawave.ingest.data.RawRecordContainer; +import datawave.ingest.data.config.NormalizedContentInterface; +import datawave.ingest.data.config.NormalizedFieldAndValueTest; +import datawave.ingest.data.config.ingest.CompositeIngest; +import datawave.ingest.data.config.ingest.IngestHelperInterface; +import datawave.ingest.data.config.ingest.MinimalistIngestHelperInterfaceImpl; +import datawave.ingest.data.config.ingest.VirtualIngest; +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.NDC; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collection; +import java.util.Map; + +public class FieldHarvesterTest { + private static final String SAMPLE_FIELD_NAME = "SAMPLE_FIELD_NAME"; + private static final String COMPOSITE_FIELD = "COMPOSITE_FIELD"; + private static final String VIRTUAL_FIELD = "VIRTUAL_FIELD"; + private static final String SEQ_FILENAME_ONLY = "InputFile.seq"; + private static final String SEQ_FILENAME_AND_DIR = "/input/directory/" + SEQ_FILENAME_ONLY; + private static final int NUM_SUPPLEMENTAL_FIELDS = 3; + private static final String LOAD_DATE = "LOAD_DATE"; + private static final String ORIG_FILE = "ORIG_FILE"; + private static final String RAW_FILE = "RAW_FILE"; + + private final Multimap fields = HashMultimap.create(); + private FieldHarvester fieldHarvester; + private long offset = 0; + private String splitStart = null; + private RawRecordContainer value; + + @Before + public void before() { + value = IngestTestSetup.createBasicRecord(); + value.setRawData(null); // rawData is ignored by below implementations of getEventFields + fieldHarvester = new FieldHarvester(new Configuration()); + NDC.push(SEQ_FILENAME_AND_DIR); + } + + @After + public void after() { + NDC.pop(); + } + + @Test + public void reusableFieldHarvester() { + // The first call to extractFields produces an error, adding only supplemental fields + fieldHarvester.extractFields(fields, null, value, offset, splitStart); + + // Verify error is captured (NullPointerException because null provided as the IngestHelperInterface param) + assertExceptionCaptured(fieldHarvester, NullPointerException.class); + assertContainsOnlySupplementalFields(); + + // The second extractFields calls an IngestHelper that doesn't error + // There should be no residue from the prior call (prior errors cleared, prior fields cleared) + // field map with single field and value + fields.clear(); + fieldHarvester.extractFields(fields, new BasicIngestHelper(createOneFieldMultiMap()), value, offset, splitStart); + + // Verify it contains expected fields + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME)); + assertContainsSupplementalFields(fields); + Assert.assertEquals(fields.toString(), NUM_SUPPLEMENTAL_FIELDS + 1, fields.size()); + assertNoErrors(fieldHarvester); + + // The third call is just like the first call + fields.clear(); + fieldHarvester.extractFields(fields, null, value, offset, splitStart); + + // Verify error is captured (NullPointerException because null provided as IngestHelperInterface) + assertExceptionCaptured(fieldHarvester, NullPointerException.class); + assertContainsOnlySupplementalFields(); + } + + @Test + public void disableSeqFileNameCreation() { + // Configuration disables seq file name creation + Configuration config = new Configuration(); + config.setBoolean(FieldHarvester.LOAD_SEQUENCE_FILE_NAME, false); + FieldHarvester fieldHarvester = new FieldHarvester(config); + + // field map with single field and value + fieldHarvester.extractFields(fields, new BasicIngestHelper(createOneFieldMultiMap()), value, offset, splitStart); + + // Verify it contains expected fields + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME)); + Assert.assertTrue(fields.containsKey(LOAD_DATE)); + Assert.assertTrue(fields.containsKey(RAW_FILE)); + // excluded due to config + Assert.assertFalse(fields.containsKey(ORIG_FILE)); + Assert.assertEquals(fields.toString(), 3, fields.size()); + + // Verify there was no exception + assertNoErrors(fieldHarvester); + } + + @Test + public void disableTrimmingSeqFileName() { + // Configuration disables trimming of seq file name + Configuration config = new Configuration(); + config.setBoolean(FieldHarvester.TRIM_SEQUENCE_FILE_NAME, false); + FieldHarvester fieldHarvester = new FieldHarvester(config); + + // field map with single field and value + fieldHarvester.extractFields(fields, new BasicIngestHelper(createOneFieldMultiMap()), value, offset, splitStart); + + // Verify it contains expected fields + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME)); + assertContainsSupplementalFields(fields); + Assert.assertEquals(fields.toString(), NUM_SUPPLEMENTAL_FIELDS + 1, fields.size()); + + Collection result = fields.get(ORIG_FILE); + NormalizedContentInterface fieldValue = result.iterator().next(); + Assert.assertEquals(SEQ_FILENAME_AND_DIR + "|0", fieldValue.getEventFieldValue()); + Assert.assertEquals(SEQ_FILENAME_AND_DIR + "|0", fieldValue.getIndexedFieldValue()); + + // Verify there was no exception + assertNoErrors(fieldHarvester); + } + + @Test + public void enableTrimmingSeqFileName() { + // Default configuration enables trimming of seq file name + + // field map with single field and value + fieldHarvester.extractFields(fields, new BasicIngestHelper(createOneFieldMultiMap()), value, offset, splitStart); + + // Verify it contains expected fields + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME)); + assertContainsSupplementalFields(fields); + Assert.assertEquals(fields.toString(), NUM_SUPPLEMENTAL_FIELDS + 1, fields.size()); + + Collection result = fields.get(ORIG_FILE); + NormalizedContentInterface fieldValue = result.iterator().next(); + Assert.assertEquals(SEQ_FILENAME_ONLY + "|0", fieldValue.getEventFieldValue()); + Assert.assertEquals(SEQ_FILENAME_ONLY + "|0", fieldValue.getIndexedFieldValue()); + + // Verify there was no exception + assertNoErrors(fieldHarvester); + } + + @Test + public void disableRawFileName() { + // Configuration disables raw file name creation + Configuration config = new Configuration(); + config.setBoolean(FieldHarvester.LOAD_RAW_FILE_NAME, false); + FieldHarvester fieldHarvester = new FieldHarvester(config); + + // field map with single field and value + fieldHarvester.extractFields(fields, new BasicIngestHelper(createOneFieldMultiMap()), value, offset, splitStart); + + // Verify it contains expected fields + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME)); + Assert.assertTrue(fields.containsKey(LOAD_DATE)); + Assert.assertTrue(fields.containsKey(ORIG_FILE)); + // excluded due to config + Assert.assertFalse(fields.containsKey(RAW_FILE)); + Assert.assertEquals(fields.toString(), 3, fields.size()); + + // Verify there was no exception + assertNoErrors(fieldHarvester); + } + + @Test + public void addsVirtualFields() { + // Ensure that a virtual field is added + Multimap fields = createOneFieldMultiMap(); + IngestHelperInterface ingestHelper = new BasicWithVirtualFieldsIngestHelper(fields); + + // field map with single field and value + fieldHarvester.extractFields(this.fields, ingestHelper, value, offset, splitStart); + + // Verify field returned + Assert.assertTrue(this.fields.containsKey(SAMPLE_FIELD_NAME)); + // Verify field is used for virtual field creation + Assert.assertTrue(this.fields.containsKey(SAMPLE_FIELD_NAME + VIRTUAL_FIELD)); + assertContainsSupplementalFields(fields); + Assert.assertEquals(this.fields.toString(), 5, this.fields.size()); + + // Verify there was no exception + assertNoErrors(fieldHarvester); + } + + @Test + public void addsCompositeFields() { + // cause exception in getEventFields, get salvaged fields and ensure they're used for composite field creation + Multimap salvagableFields = createOneFieldMultiMap(); + IngestHelperInterface ingestHelper = new BasicWithCompositeFieldsIngestHelper(salvagableFields); + + // field map with single field and value + fieldHarvester.extractFields(fields, ingestHelper, value, offset, splitStart); + + // Verify field returned + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME)); + // Verify field is used for composite + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME + COMPOSITE_FIELD)); + assertContainsSupplementalFields(fields); + Assert.assertEquals(fields.toString(), 5, fields.size()); + + // Verify there was no exception + assertNoErrors(fieldHarvester); + } + + @Test + public void supplementsSalvagedFields() { + // cause exception in getEventFields, get salvaged fields and ensure they're used for virtual and composite + Multimap salvagableFields = createOneFieldMultiMap(); + ErroringSalvagableIngestHelper ingestHelper = new ErroringSalvagableIngestHelper(salvagableFields); + + // field map with single field and value + fieldHarvester.extractFields(fields, ingestHelper, value, offset, splitStart); + + // Verify salvaged fields returned + // Verify salvaged fields are used for virtual, composite + // Not virtual field is used by composite field implementation + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME)); + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME + VIRTUAL_FIELD)); + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME + COMPOSITE_FIELD)); + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME + VIRTUAL_FIELD + COMPOSITE_FIELD)); + assertContainsSupplementalFields(fields); + Assert.assertEquals(fields.toString(), 7, fields.size()); + + // Verify an exception was detected (for getEventFields) + assertExceptionCaptured(this.fieldHarvester, UnsupportedOperationException.class); + } + + @Test + public void emptySalvagedFields() { + // cause exception in getEventFields, causing retrieval of empty multimap of salvaged fields + ErroringSalvagableIngestHelper ingestHelper = new ErroringSalvagableIngestHelper(HashMultimap.create()); + + // field map with empty fields + fieldHarvester.extractFields(fields, ingestHelper, value, offset, splitStart); + + // empty salvaged fields + assertContainsOnlySupplementalFields(); + + // Verify an exception was detected (for getEventFields) + assertExceptionCaptured(this.fieldHarvester, UnsupportedOperationException.class); + } + + @Test + public void doubleException() { + // exception in getEventFields and in salvager + fieldHarvester.extractFields(fields, new DoubleErrorIngestHelper(), value, offset, splitStart); + + // Verify it contains expected fields + assertContainsOnlySupplementalFields(); + + // Verify the original exception was captured + assertExceptionCaptured(this.fieldHarvester, UnsupportedOperationException.class); + } + + @Test + public void extractFields() { + // field map with single field and value + fieldHarvester.extractFields(fields, new BasicIngestHelper(createOneFieldMultiMap()), value, offset, splitStart); + + // Verify it contains expected fields + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME)); + assertContainsSupplementalFields(fields); + Assert.assertEquals(fields.toString(), NUM_SUPPLEMENTAL_FIELDS + 1, fields.size()); + + assertNoErrors(fieldHarvester); + } + + @Test + public void erroredFieldExcluded() { + // create a field containing a field error + NormalizedFieldAndValueTest.NonGroupedInstance fieldWithError = new NormalizedFieldAndValueTest.NonGroupedInstance(); + fieldWithError.setError(new Exception()); + + // field map contains a field containing a field error + Multimap multiMap = HashMultimap.create(); + multiMap.put(SAMPLE_FIELD_NAME, fieldWithError); + + fieldHarvester.extractFields(fields, new BasicIngestHelper(multiMap), this.value, offset, splitStart); + + // Verify fields contains expected fields + Assert.assertTrue(fields.containsKey(SAMPLE_FIELD_NAME)); + assertContainsSupplementalFields(fields); + Assert.assertEquals(fields.toString(), NUM_SUPPLEMENTAL_FIELDS + 1, fields.size()); + + // Verify there was a FieldNormalizationError due to the field error + assertExceptionCaptured(this.fieldHarvester, FieldHarvester.FieldNormalizationError.class); + } + + @Test + public void nullIngestHelper() { + // field map with single field and value + fieldHarvester.extractFields(fields, null, value, offset, splitStart); + + // Verify it contains expected fields + assertContainsOnlySupplementalFields(); + + // Verify it captured the null pointer exception + assertExceptionCaptured(this.fieldHarvester, NullPointerException.class); + } + + private void assertNoErrors(FieldHarvester fieldHarvester) { + Assert.assertFalse("Unexpected exception: " + fieldHarvester.getException(), fieldHarvester.hasError()); + Assert.assertNull(fieldHarvester.getException()); + } + + private void assertExceptionCaptured(FieldHarvester fieldHarvester, Class exceptionClass) { + Assert.assertTrue(fieldHarvester.hasError()); + Assert.assertEquals(exceptionClass, fieldHarvester.getException().getClass()); + } + + private Multimap createOneFieldMultiMap() { + Multimap multiMap = HashMultimap.create(); + multiMap.put(SAMPLE_FIELD_NAME, new NormalizedFieldAndValueTest.NonGroupedInstance()); + return multiMap; + } + + private void assertContainsSupplementalFields(Multimap fields) { + Assert.assertTrue(fields.containsKey(LOAD_DATE)); + Assert.assertTrue(fields.containsKey(ORIG_FILE)); + Assert.assertTrue(fields.containsKey(RAW_FILE)); + } + + private void assertContainsOnlySupplementalFields() { + assertContainsSupplementalFields(fields); + // and only supplemental + Assert.assertEquals(fields.toString(), NUM_SUPPLEMENTAL_FIELDS, fields.size()); + } + + private static class DoubleErrorIngestHelper extends MinimalistIngestHelperInterfaceImpl implements FieldSalvager { + @Override + public Multimap getSalvageableEventFields(RawRecordContainer rawRecordContainer) { + throw new RuntimeException(); + } + } + + private static class BasicIngestHelper extends MinimalistIngestHelperInterfaceImpl { + private final Multimap multiMap; + + public BasicIngestHelper(Multimap multiMap) { + this.multiMap = multiMap; + } + + public Multimap getEventFields(RawRecordContainer event) { + return multiMap; + } + } + + private static class BasicWithCompositeFieldsIngestHelper extends BasicIngestHelper implements CompositeIngest { + public BasicWithCompositeFieldsIngestHelper(Multimap multiMap) { + super(multiMap); + } + + @Override + public void setCompositeFieldDefinitions(Multimap compositeFieldDefinitions) { + throw new UnsupportedOperationException(); + } + + @Override + public Multimap getCompositeFields(Multimap fields) { + Multimap compositeFields = HashMultimap.create(); + for (Map.Entry entry : fields.entries()) { + compositeFields.put(entry.getKey() + COMPOSITE_FIELD, entry.getValue()); + } + return compositeFields; + } + } + + private static class BasicWithVirtualFieldsIngestHelper extends BasicIngestHelper implements VirtualIngest { + public BasicWithVirtualFieldsIngestHelper(Multimap multiMap) { + super(multiMap); + } + + @Override + public Map getVirtualFieldDefinitions() { + return null; + } + + @Override + public void setVirtualFieldDefinitions(Map virtualFieldDefinitions) { + throw new UnsupportedOperationException(); + } + + @Override + public String getDefaultVirtualFieldSeparator() { + throw new UnsupportedOperationException(); + } + + @Override + public void setDefaultVirtualFieldSeparator(String separator) { + throw new UnsupportedOperationException(); + } + + @Override + public Multimap getVirtualFields(Multimap fields) { + Multimap compositeFields = HashMultimap.create(); + for (Map.Entry entry : fields.entries()) { + compositeFields.put(entry.getKey() + VIRTUAL_FIELD, entry.getValue()); + } + return compositeFields; + } + } + + private static class ErroringSalvagableIngestHelper extends MinimalistIngestHelperInterfaceImpl implements VirtualIngest, CompositeIngest, FieldSalvager { + private final Multimap multiMap; + + ErroringSalvagableIngestHelper(Multimap multiMap) { + this.multiMap = multiMap; + } + + @Override + public Multimap getSalvageableEventFields(RawRecordContainer rawRecordContainer) { + return this.multiMap; + } + + @Override + public void setCompositeFieldDefinitions(Multimap compositeFieldDefinitions) { + throw new UnsupportedOperationException(); + } + + @Override + public Multimap getCompositeFields(Multimap fields) { + Multimap compositeFields = HashMultimap.create(); + for (Map.Entry entry : fields.entries()) { + compositeFields.put(entry.getKey() + COMPOSITE_FIELD, entry.getValue()); + } + return compositeFields; + } + + @Override + public Map getVirtualFieldDefinitions() { + return null; + } + + @Override + public void setVirtualFieldDefinitions(Map virtualFieldDefinitions) { + throw new UnsupportedOperationException(); + } + + @Override + public String getDefaultVirtualFieldSeparator() { + throw new UnsupportedOperationException(); + } + + @Override + public void setDefaultVirtualFieldSeparator(String separator) { + throw new UnsupportedOperationException(); + } + + @Override + public Multimap getVirtualFields(Multimap fields) { + Multimap compositeFields = HashMultimap.create(); + for (Map.Entry entry : fields.entries()) { + compositeFields.put(entry.getKey() + VIRTUAL_FIELD, entry.getValue()); + } + return compositeFields; + } + } +} diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/IngestTestSetup.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/IngestTestSetup.java new file mode 100644 index 00000000000..f2392cdc0dc --- /dev/null +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/IngestTestSetup.java @@ -0,0 +1,65 @@ +package datawave.ingest.mapreduce; + +import datawave.ingest.data.RawRecordContainer; +import datawave.ingest.data.Type; +import datawave.ingest.data.TypeRegistry; +import datawave.util.TypeRegistryTestSetup; +import org.apache.hadoop.conf.Configuration; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; + +/** + * Test class containing some reusable test code for creating a test Type and test records. + */ +public class IngestTestSetup { + + static SimpleRawRecord createRecord(String dataTypeHandler, Configuration conf) { + Type type = setupTypeAndTypeRegistry(dataTypeHandler, conf); + long eventTime = System.currentTimeMillis(); + return createBasicRecord(eventTime, type); + } + + public static RawRecordContainer createBasicRecord() { + Type type = IngestTestSetup.createBasicType(); + long eventTime = System.currentTimeMillis(); + return createBasicRecord(eventTime, type); + } + + static Type setupTypeAndTypeRegistry(String dataTypeHandler, Configuration conf) { + String[] defaultDataTypeHandlers = {dataTypeHandler}; + Type type = new Type("file", null, null, defaultDataTypeHandlers, 10, null); + + String[] errorDataTypeHandlers = {SimpleDataTypeHandler.class.getName()}; + Type errorType = new Type(TypeRegistry.ERROR_PREFIX, null, null, errorDataTypeHandlers, 20, null); + + TypeRegistryTestSetup.resetTypeRegistryWithTypes(conf, type, errorType); + return type; + } + + static Type createBasicType() { + return createBasicType(new String[] {}); + } + + static Type createBasicType(String[] defaultDataTypeHandlers) { + return new Type("file", null, null, defaultDataTypeHandlers, 10, null); + } + + static SimpleRawRecord createBasicRecord(long eventTime, Type type) { + SimpleRawRecord result = new SimpleRawRecord(); + result.setDate(eventTime); + result.setRawFileTimestamp(eventTime); + result.setDataType(type); + result.setRawFileName("/some/filename"); + result.setRawData("some data".getBytes()); + result.generateId(null); + return result; + } + + static byte[] objectToRawBytes(Object map) throws IOException { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + new ObjectOutputStream(outputStream).writeObject(map); + return outputStream.toByteArray(); + } +} diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/handler/dateindex/DateIndexDataTypeHandlerTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/handler/dateindex/DateIndexDataTypeHandlerTest.java index 524ff956f08..d522237f6ef 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/handler/dateindex/DateIndexDataTypeHandlerTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/handler/dateindex/DateIndexDataTypeHandlerTest.java @@ -16,6 +16,7 @@ import datawave.ingest.table.config.DateIndexTableConfigHelper; import datawave.policy.IngestPolicyEnforcer; +import datawave.util.TypeRegistryTestSetup; import datawave.util.TableName; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; @@ -58,8 +59,7 @@ public void setup() throws Exception { conf.set("testdatatype.date.index.type.to.field.map", "ACTIVITY=ACTIVITY_DATE,LOADED=LOAD_DATE"); conf.set("all" + Properties.INGEST_POLICY_ENFORCER_CLASS, IngestPolicyEnforcer.NoOpIngestPolicyEnforcer.class.getName()); - TypeRegistry.reset(); - TypeRegistry.getInstance(conf); + TypeRegistryTestSetup.resetTypeRegistry(conf); handler = new DateIndexDataTypeHandler<>(); handler.setup(new TaskAttemptContextImpl(conf, new TaskAttemptID())); diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDeleteModeTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDeleteModeTest.java index c70791f0fe0..81bedcc9829 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDeleteModeTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDeleteModeTest.java @@ -8,7 +8,6 @@ import datawave.ingest.config.RawRecordContainerImpl; import datawave.ingest.data.RawRecordContainer; import datawave.ingest.data.Type; -import datawave.ingest.data.TypeRegistry; import datawave.ingest.data.config.BaseNormalizedContent; import datawave.ingest.data.config.GroupedNormalizedContentInterface; import datawave.ingest.data.config.NormalizedContentInterface; @@ -26,6 +25,7 @@ import datawave.ingest.test.StandaloneTaskAttemptContext; import datawave.ingest.time.Now; import datawave.metadata.protobuf.EdgeMetadata; +import datawave.util.TypeRegistryTestSetup; import datawave.util.TableName; import datawave.util.time.DateHelper; import org.apache.accumulo.core.data.Key; @@ -124,13 +124,11 @@ public static void tearDown() { @Before public void setup() { - TypeRegistry.reset(); conf = new Configuration(); conf.addResource(ClassLoader.getSystemResource("config/all-config.xml")); conf.addResource(ClassLoader.getSystemResource("config/edge-ingest-config.xml")); conf.addResource(ClassLoader.getSystemResource("config/metadata-config.xml")); - TypeRegistry registry = TypeRegistry.getInstance(conf); - registry.put(type.typeName(), type); + TypeRegistryTestSetup.resetTypeRegistryWithTypes(conf, type); } private RawRecordContainer getEvent(Configuration conf) { diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/CBMutationOutputFormatterTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/CBMutationOutputFormatterTest.java index 90e1c7b2bfe..a0aadbb90ea 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/CBMutationOutputFormatterTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/CBMutationOutputFormatterTest.java @@ -1,9 +1,9 @@ package datawave.ingest.mapreduce.job; import datawave.common.test.logging.CommonTestAppender; -import datawave.ingest.data.TypeRegistry; import datawave.data.hash.UID; import datawave.ingest.mapreduce.handler.shard.ShardedDataTypeHandler; +import datawave.util.TypeRegistryTestSetup; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.ClientConfiguration; import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; @@ -170,8 +170,6 @@ public void setup() { uutLogger.addAppender(uutAppender); Logger.getLogger(AccumuloOutputFormat.class).addAppender(uutAppender); - - TypeRegistry.reset(); } @After @@ -513,7 +511,7 @@ public void testRecordWriterWriteWithUpdatesAndTypes() throws IOException, Inter conf.addResource(url); - TypeRegistry.getInstance(conf); + TypeRegistryTestSetup.resetTypeRegistry(conf); String simulationKey = String.format("%s.%s.%s", AccumuloOutputFormat.class.getSimpleName(), Features.SIMULATION_MODE.getDeclaringClass() .getSimpleName(), StringUtils.camelize(Features.SIMULATION_MODE.name().toLowerCase())); @@ -573,7 +571,7 @@ public void testRecordWriterWriteWithUpdatesWithColFamilyTyped() throws IOExcept conf.addResource(url); - TypeRegistry.getInstance(conf); + TypeRegistryTestSetup.resetTypeRegistry(conf); String simulationKey = String.format("%s.%s.%s", AccumuloOutputFormat.class.getSimpleName(), Features.SIMULATION_MODE.getDeclaringClass() .getSimpleName(), StringUtils.camelize(Features.SIMULATION_MODE.name().toLowerCase())); @@ -633,7 +631,7 @@ public void testRecordWriterWriteWithUpdatesWithColFamilyTypedWithUID() throws I conf.addResource(url); - TypeRegistry.getInstance(conf); + TypeRegistryTestSetup.resetTypeRegistry(conf); String simulationKey = String.format("%s.%s.%s", AccumuloOutputFormat.class.getSimpleName(), Features.SIMULATION_MODE.getDeclaringClass() .getSimpleName(), StringUtils.camelize(Features.SIMULATION_MODE.name().toLowerCase())); @@ -695,7 +693,7 @@ public void testRecordWriterWriteWithUpdatesWithColFamilyTypedWithBadUID() throw conf.addResource(url); - TypeRegistry.getInstance(conf); + TypeRegistryTestSetup.resetTypeRegistry(conf); String simulationKey = String.format("%s.%s.%s", AccumuloOutputFormat.class.getSimpleName(), Features.SIMULATION_MODE.getDeclaringClass() .getSimpleName(), StringUtils.camelize(Features.SIMULATION_MODE.name().toLowerCase())); @@ -757,7 +755,7 @@ public void testRecordWriterWriteWithUpdatesWithColFamilyTypedWithoutUID() throw conf.addResource(url); - TypeRegistry.getInstance(conf); + TypeRegistryTestSetup.resetTypeRegistry(conf); String simulationKey = String.format("%s.%s.%s", AccumuloOutputFormat.class.getSimpleName(), Features.SIMULATION_MODE.getDeclaringClass() .getSimpleName(), StringUtils.camelize(Features.SIMULATION_MODE.name().toLowerCase())); diff --git a/warehouse/ingest-core/src/test/java/datawave/util/TypeRegistryTestSetup.java b/warehouse/ingest-core/src/test/java/datawave/util/TypeRegistryTestSetup.java new file mode 100644 index 00000000000..1533402ce0f --- /dev/null +++ b/warehouse/ingest-core/src/test/java/datawave/util/TypeRegistryTestSetup.java @@ -0,0 +1,20 @@ +package datawave.util; + +import datawave.ingest.data.Type; +import datawave.ingest.data.TypeRegistry; +import org.apache.hadoop.conf.Configuration; + +public class TypeRegistryTestSetup { + public static TypeRegistry resetTypeRegistry(Configuration conf) { + TypeRegistry.reset(); + return TypeRegistry.getInstance(conf); + } + + public static TypeRegistry resetTypeRegistryWithTypes(Configuration conf, Type... types) { + TypeRegistry registry = resetTypeRegistry(conf); + for (Type type : types) { + registry.put(type.typeName(), type); + } + return registry; + } +}