Skip to content

Commit

Permalink
Fix #1438 - optionally gather fields after ingest exception
Browse files Browse the repository at this point in the history
  • Loading branch information
Matthew Peterson committed Feb 24, 2022
1 parent 9d01e4a commit aebd8d2
Show file tree
Hide file tree
Showing 18 changed files with 1,433 additions and 213 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
package datawave.ingest.mapreduce;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import datawave.data.normalizer.DateNormalizer;
import datawave.ingest.data.RawRecordContainer;
import datawave.ingest.data.config.NormalizedContentInterface;
import datawave.ingest.data.config.NormalizedFieldAndValue;
import datawave.ingest.data.config.ingest.CompositeIngest;
import datawave.ingest.data.config.ingest.FilterIngest;
import datawave.ingest.data.config.ingest.IngestHelperInterface;
import datawave.ingest.data.config.ingest.VirtualIngest;
import datawave.ingest.time.Now;
import datawave.util.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.apache.log4j.NDC;

import java.util.Date;
import java.util.Map;

public class FieldHarvester {
private static final Logger log = Logger.getLogger(FieldHarvester.class);

private static Now now = Now.getInstance();

public static final String LOAD_DATE_FIELDNAME = "LOAD_DATE";
public static final String SEQUENCE_FILE_FIELDNAME = "ORIG_FILE";
public static final String LOAD_SEQUENCE_FILE_NAME = "ingest.event.mapper.load.seq.filename";
public static final String TRIM_SEQUENCE_FILE_NAME = "ingest.event.mapper.trim.sequence.filename";
public static final String RAW_FILE_FIELDNAME = "RAW_FILE";
public static final String LOAD_RAW_FILE_NAME = "ingest.event.mapper.load.raw.filename";

private boolean createSequenceFileName;
private boolean trimSequenceFileName;
private boolean createRawFileName;
private final DateNormalizer dateNormalizer = new DateNormalizer();

private static final String SRC_FILE_DEL = "|";
private Exception exception;

public FieldHarvester(Configuration configuration) {
this.createSequenceFileName = configuration.getBoolean(LOAD_SEQUENCE_FILE_NAME, true);
this.trimSequenceFileName = configuration.getBoolean(TRIM_SEQUENCE_FILE_NAME, true);
this.createRawFileName = configuration.getBoolean(LOAD_RAW_FILE_NAME, true);
}

public boolean hasError() {
return null != this.exception;
}

public Exception getException() {
return this.exception;
}

/**
* Updates "fields" with extracted, derived, and automatically generated fields.
*
* @param fields
* the Multimap to modify with extracted and generated fields
* @param ingestHelper
* interface to use for field extraction
* @param value
* the record from which the fields will be extracted
* @param offset
* record offset within the source file
* @param splitStart
* the splitStart for the record
*/
public void extractFields(Multimap<String,NormalizedContentInterface> fields, IngestHelperInterface ingestHelper, RawRecordContainer value, long offset,
String splitStart) {
// reset exception-in-extraction tracking
this.exception = null;

// "candidateFields" holds the fields that may eventually be added to "fields"
Multimap<String,NormalizedContentInterface> candidateFields = null;
try {
// get salvaged fields if getEventFields throws exception
candidateFields = faultTolerantGetEventFields(value, ingestHelper);

// try adding supplemental fields to candidateFields, whether or not there was an exception
addSupplementalFields(value, offset, splitStart, ingestHelper, candidateFields);
} catch (Exception exception) {
this.exception = exception;
} finally {
// Add each "candidateFields" field value to "fields" as long as the field value is without error
addErrorFreeFields(fields, candidateFields);
}
}

/**
* Calls IngestHelper.getEventFields with value. If an exception is thrown, captures it and attempts to salvage fields from the value.
*/
// package method visibility for EventMapper.getFields only
@Deprecated
Multimap<String,NormalizedContentInterface> faultTolerantGetEventFields(RawRecordContainer value, IngestHelperInterface ingestHelper) {
try {
// Parse the event into its candidate field names and values using the IngestHelperInterface.
return ingestHelper.getEventFields(value);
} catch (Exception exception) {
// delay throwing the exception
this.exception = exception;
return attemptToSalvageFields(value, ingestHelper);
}
}

// todo test case where salvage fields are empty
// package method visibility for EventMapper.getFields only
void addSupplementalFields(RawRecordContainer value, long offset, String splitStart, IngestHelperInterface ingestHelper,
Multimap<String,NormalizedContentInterface> fields) {
addVirtualFields(ingestHelper, fields);
addCompositeFields(ingestHelper, fields);
addLoadDateField(fields);
addFileNameFields(value, offset, splitStart, fields);
applyFieldFilters(ingestHelper, fields);
}

/*
* Populate the "fields" method parameter with any candidateFields that do not have an Error
*/
private void addErrorFreeFields(Multimap<String,NormalizedContentInterface> fields, Multimap<String,NormalizedContentInterface> candidateFields) {
if (null == candidateFields) {
return;
}
Throwable fieldError = null;
for (Map.Entry<String,NormalizedContentInterface> entry : candidateFields.entries()) {
// noinspection ThrowableResultOfMethodCallIgnored
if (entry.getValue().getError() != null) {
fieldError = entry.getValue().getError();
}
fields.put(entry.getKey(), entry.getValue());
}
if (fieldError != null) {
this.exception = new FieldNormalizationError("Failed getting all fields", fieldError);
}
}

private void addVirtualFields(IngestHelperInterface ingestHelper, Multimap<String,NormalizedContentInterface> newFields) {
// Also get the virtual fields, if applicable.
if (ingestHelper instanceof VirtualIngest) {
VirtualIngest vHelper = (VirtualIngest) ingestHelper;
Multimap<String,NormalizedContentInterface> virtualFields = vHelper.getVirtualFields(newFields);
for (Map.Entry<String,NormalizedContentInterface> v : virtualFields.entries())
newFields.put(v.getKey(), v.getValue());
}
}

private void addCompositeFields(IngestHelperInterface ingestHelper, Multimap<String,NormalizedContentInterface> newFields) {
// Also get the composite fields, if applicable
if (ingestHelper instanceof CompositeIngest) {
CompositeIngest vHelper = (CompositeIngest) ingestHelper;
Multimap<String,NormalizedContentInterface> compositeFields = vHelper.getCompositeFields(newFields);
for (String fieldName : compositeFields.keySet()) {
// if this is an overloaded composite field, we are replacing the existing field data
if (vHelper.isOverloadedCompositeField(fieldName)) {
newFields.removeAll(fieldName);
}
newFields.putAll(fieldName, compositeFields.get(fieldName));
}
}
}

private void addLoadDateField(Multimap<String,NormalizedContentInterface> newFields) {
// Create a LOAD_DATE parameter, which is the current time in milliseconds, for all datatypes
long loadDate = now.get();
NormalizedFieldAndValue loadDateValue = new NormalizedFieldAndValue(LOAD_DATE_FIELDNAME, Long.toString(loadDate));
// set an indexed field value for use by the date index data type handler
loadDateValue.setIndexedFieldValue(dateNormalizer.normalizeDelegateType(new Date(loadDate)));
newFields.put(LOAD_DATE_FIELDNAME, loadDateValue);
}

private void addRawFileField(RawRecordContainer value, Multimap<String,NormalizedContentInterface> newFields, String seqFileName) {
if (createRawFileName && !value.getRawFileName().isEmpty() && !value.getRawFileName().equals(seqFileName)) {
newFields.put(RAW_FILE_FIELDNAME, new NormalizedFieldAndValue(RAW_FILE_FIELDNAME, value.getRawFileName()));
}
}

private void addOrigFileField(Multimap<String,NormalizedContentInterface> newFields, long offset, String splitStart, String seqFileName) {
if (null != seqFileName) {
StringBuilder seqFile = new StringBuilder(seqFileName);

seqFile.append(SRC_FILE_DEL).append(offset);

if (null != splitStart) {
seqFile.append(SRC_FILE_DEL).append(splitStart);
}

newFields.put(SEQUENCE_FILE_FIELDNAME, new NormalizedFieldAndValue(SEQUENCE_FILE_FIELDNAME, seqFile.toString()));
}
}

private String getSeqFileName() {
String seqFileName;
seqFileName = NDC.peek();

if (trimSequenceFileName) {
seqFileName = StringUtils.substringAfterLast(seqFileName, "/");
}
return seqFileName;
}

private void addFileNameFields(RawRecordContainer value, long offset, String splitStart, Multimap<String,NormalizedContentInterface> newFields) {
String seqFileName = null;

if (createSequenceFileName) {
seqFileName = getSeqFileName();

// place the sequence filename into the event
addOrigFileField(newFields, offset, splitStart, seqFileName);
}

addRawFileField(value, newFields, seqFileName);
}

private void applyFieldFilters(IngestHelperInterface ingestHelper, Multimap<String,NormalizedContentInterface> newFields) {
// Also if this helper needs to filter the fields before returning, apply now
if (ingestHelper instanceof FilterIngest) {
FilterIngest fHelper = (FilterIngest) ingestHelper;
fHelper.filter(newFields);
}
}

/**
* If IngestHelper implements FieldSalvager, get the salvageable fields from value. Otherwise, return an empty Multimap.
*/
private Multimap<String,NormalizedContentInterface> attemptToSalvageFields(RawRecordContainer value, IngestHelperInterface ingestHelper) {
// If this helper is able, attempt to salvage a subset of the fields
if (null != ingestHelper && ingestHelper instanceof FieldSalvager) {
FieldSalvager salvager = (FieldSalvager) ingestHelper;
try {
Multimap<String,NormalizedContentInterface> salvagedFields = salvager.getSalvageableEventFields(value);
if (null != salvagedFields) {
return salvagedFields;
}
} catch (Exception salvagerException) {
// Do not overwrite the original exception
if (null == this.exception) {
this.exception = new IllegalStateException("Unexpected state (FieldExpander.exception should be non-null if salvaging", salvagerException);
} else {
// allow original exception (this.exception) to be thrown by caller
log.error("Even salvager threw an exception", salvagerException);
}
}
}
return HashMultimap.create();
}

public static class FieldNormalizationError extends Exception {
private static final long serialVersionUID = 1L;

public FieldNormalizationError(String message, Throwable cause) {
super(message, cause);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package datawave.ingest.mapreduce;

import com.google.common.collect.Multimap;
import datawave.ingest.data.RawRecordContainer;
import datawave.ingest.data.config.NormalizedContentInterface;

/**
* This optional interface is intended to complement the IngestHelperInterface interface's handling of errors that occur within ingest jobs.
*
* One use case is when IngestHelperInterface's getEventFields throws an exception. The getEventFields method will not return a Multimap of field values
* (because it instead threw an exception). Prior to FieldSalvager, this meant that the error tables would not have information on any of the
* RawRecordContainer's field values.
*
* FieldSalvager implementations can attempt to provide a subset of the field values, so that the error tables can have more helpful information about the
* failed record, perhaps aiding troubleshooting efforts. An implementation could return only those field names that are relatively well-structured and
* predictably formatted, very unlikely to cause exceptions while processing.
*/
public interface FieldSalvager {
/**
* @param rawRecordContainer
* @return Multimap containing subset of field values, possibly empty but not null
*/
Multimap<String,NormalizedContentInterface> getSalvageableEventFields(RawRecordContainer rawRecordContainer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import datawave.ingest.data.config.GroupedNormalizedContentInterface;
import datawave.ingest.data.config.NormalizedContentInterface;
import datawave.ingest.data.config.ingest.IngestHelperInterface;
import datawave.ingest.mapreduce.EventMapper;
import datawave.ingest.mapreduce.FieldHarvester;
import datawave.ingest.mapreduce.handler.ExtendedDataTypeHandler;
import datawave.ingest.mapreduce.handler.edge.define.EdgeDataBundle;
import datawave.ingest.mapreduce.handler.edge.define.EdgeDefinition;
Expand Down Expand Up @@ -500,7 +500,7 @@ public long process(KEYIN key, RawRecordContainer event, Multimap<String,Normali
}

// Get the load date of the event from the fields map
Collection<NormalizedContentInterface> loadDates = fields.get(EventMapper.LOAD_DATE_FIELDNAME);
Collection<NormalizedContentInterface> loadDates = fields.get(FieldHarvester.LOAD_DATE_FIELDNAME);
if (!loadDates.isEmpty()) {
NormalizedContentInterface nci = loadDates.iterator().next();
Date date = new Date(Long.parseLong(nci.getEventFieldValue()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import datawave.ingest.config.RawRecordContainerImpl;
import datawave.ingest.data.config.MarkingsHelper;

import datawave.util.TypeRegistryTestSetup;
import org.apache.hadoop.conf.Configuration;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -53,8 +54,7 @@ public void setUp() throws Exception {
conf.set("samplecsv" + TypeRegistry.INGEST_HELPER, TestCSVIngestHelper.class.getName());
conf.set("samplecsv.reader.class", TestCSVReader.class.getName());
conf.set("samplecsv" + MarkingsHelper.DEFAULT_MARKING, "PUBLIC|PRIVATE");
TypeRegistry.reset();
TypeRegistry.getInstance(conf);
TypeRegistryTestSetup.resetTypeRegistry(conf);
dataType = TypeRegistry.getType("samplecsv");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package datawave.ingest.data.config;

import datawave.ingest.data.TypeRegistry;

import datawave.policy.IngestPolicyEnforcer;
import datawave.util.TypeRegistryTestSetup;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -25,8 +24,7 @@ public void setup() {
@Test(expected = IllegalArgumentException.class)
public void testInvalidConfig() {
DataTypeHelperImpl helper = new DataTypeHelperImpl();
TypeRegistry.reset();
TypeRegistry.getInstance(conf);
TypeRegistryTestSetup.resetTypeRegistry(conf);
helper.setup(conf);
}

Expand All @@ -36,8 +34,7 @@ public void testValidConfig() throws Exception {
Assert.assertNotNull(configStream);
conf.addResource(configStream);
Assert.assertThat(conf.get("data.name"), is("fake"));
TypeRegistry.reset();
TypeRegistry.getInstance(conf);
TypeRegistryTestSetup.resetTypeRegistry(conf);
DataTypeHelperImpl helper = new DataTypeHelperImpl();
helper.setup(conf);

Expand All @@ -54,8 +51,7 @@ public void testDowncaseFields() throws Exception {
Assert.assertNotNull(configStream);
conf.addResource(configStream);
conf.set("fake" + DataTypeHelper.Properties.DOWNCASE_FIELDS, "one,two,three,FOUR");
TypeRegistry.reset();
TypeRegistry.getInstance(conf);
TypeRegistryTestSetup.resetTypeRegistry(conf);
DataTypeHelperImpl helper = new DataTypeHelperImpl();
helper.setup(conf);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public static class NonGroupedInstance implements NormalizedContentInterface {
private Map<String,String> _markings;
private Throwable _error;

protected NonGroupedInstance() {
public NonGroupedInstance() {
_fieldName = "TestNonGroupedInstance";

_indexedFieldName = "TestIndexedField";
Expand Down Expand Up @@ -141,7 +141,7 @@ public static class GroupedInstance implements GroupedNormalizedContentInterface
private String _group;
private String _subGroup;

protected GroupedInstance() {
public GroupedInstance() {
_fieldName = "TestNonGroupedInstance";

_indexedFieldName = "TestIndexedField";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import datawave.ingest.data.TypeRegistry;
import datawave.ingest.data.config.DataTypeHelper.Properties;

import datawave.util.TypeRegistryTestSetup;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -21,10 +22,7 @@ public void setup() {
conf = new Configuration();
conf.set(Properties.DATA_NAME, "test");
conf.set("test" + TypeRegistry.INGEST_HELPER, TestBaseIngestHelper.class.getName());

TypeRegistry.reset();
TypeRegistry.getInstance(conf);

TypeRegistryTestSetup.resetTypeRegistry(conf);
}

@Test(expected = IllegalArgumentException.class)
Expand Down
Loading

0 comments on commit aebd8d2

Please sign in to comment.