From faf7544b8975870d7b7df3ba9a64f37317768ff9 Mon Sep 17 00:00:00 2001 From: austin007008 Date: Mon, 18 Nov 2024 17:36:48 +0000 Subject: [PATCH] working iterator (not working transform) --- .../main/java/datawave/query/Constants.java | 2 + .../logic/DColumnSummaryIterator.java | 296 ++++++++++ .../logic/TermFrequencyExcerptIterator.java | 6 +- .../table/parser/ContentKeyValueFactory.java | 30 +- .../query/transformer/SummaryTransform.java | 509 ++++++++++++++++++ .../logic/DColumnSummaryIteratorTest.java | 182 +++++++ 6 files changed, 1009 insertions(+), 16 deletions(-) create mode 100644 warehouse/query-core/src/main/java/datawave/query/iterator/logic/DColumnSummaryIterator.java create mode 100644 warehouse/query-core/src/main/java/datawave/query/transformer/SummaryTransform.java create mode 100644 warehouse/query-core/src/test/java/datawave/query/iterator/logic/DColumnSummaryIteratorTest.java diff --git a/warehouse/query-core/src/main/java/datawave/query/Constants.java b/warehouse/query-core/src/main/java/datawave/query/Constants.java index c1ab9ff4f56..a38f36f874f 100644 --- a/warehouse/query-core/src/main/java/datawave/query/Constants.java +++ b/warehouse/query-core/src/main/java/datawave/query/Constants.java @@ -79,6 +79,8 @@ public class Constants { // From ingest public static final Text TERM_FREQUENCY_COLUMN_FAMILY = new Text("tf"); + public static final Text D_COLUMN_FAMILY = new Text("d"); + // content functions public static final String TERM_OFFSET_MAP_JEXL_VARIABLE_NAME = ContentFunctions.TERM_OFFSET_MAP_JEXL_VARIABLE_NAME; public static final String CONTENT_FUNCTION_NAMESPACE = ContentFunctions.CONTENT_FUNCTION_NAMESPACE; diff --git a/warehouse/query-core/src/main/java/datawave/query/iterator/logic/DColumnSummaryIterator.java b/warehouse/query-core/src/main/java/datawave/query/iterator/logic/DColumnSummaryIterator.java new file mode 100644 index 00000000000..39b1ccd3da8 --- /dev/null +++ b/warehouse/query-core/src/main/java/datawave/query/iterator/logic/DColumnSummaryIterator.java @@ -0,0 +1,296 @@ +package datawave.query.iterator.logic; + +import datawave.query.Constants; +import datawave.query.table.parser.ContentKeyValueFactory; +import org.apache.accumulo.core.data.ArrayByteSequence; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; + +import static datawave.query.iterator.logic.TermFrequencyExcerptIterator.getDtUid; +import static datawave.query.iterator.logic.TermFrequencyExcerptIterator.getDtUidFromEventKey; +import static datawave.query.iterator.logic.TermFrequencyExcerptIterator.getSortedCFs; + +/** + * This iterator is intended to scan the d column for a specified document. The result will be a summary + * for each document scanned. + */ +public class DColumnSummaryIterator implements SortedKeyValueIterator { + private static final Logger log = LoggerFactory.getLogger(DColumnSummaryIterator.class); + private static final Collection D_COLUMN_FAMILY_BYTE_SEQUENCE = Collections + .singleton(new ArrayByteSequence(Constants.D_COLUMN_FAMILY.getBytes())); + + public static final String SUMMARY_SIZE = "summary.size"; + + public static final String CONTENT_NAME = "content.name"; + + private final ArrayList contentSummaryOrder = new ArrayList<>(); + + /** the underlying source */ + protected SortedKeyValueIterator source; + + /** The specified dt/uid column families */ + protected SortedSet columnFamilies; + + /** inclusive or exclusive dt/uid column families */ + protected boolean inclusive; + + /** the underlying D column scan range */ + protected Range scanRange; + + /** the top key */ + protected Key tk; + + /** the top value */ + protected Value tv; + + protected int summarySize; + + @Override + public boolean hasTop() { + return tk != null; + } + + @Override + public SortedKeyValueIterator deepCopy(IteratorEnvironment env) { + DColumnSummaryIterator it = new DColumnSummaryIterator(); + it.source = source.deepCopy(env); + return it; + } + + @Override + public void init(SortedKeyValueIterator source, Map options, IteratorEnvironment env) throws IOException { + this.source = source; + this.summarySize = Integer.parseInt(options.get(SUMMARY_SIZE)); + if(options.containsKey(CONTENT_NAME)) { + contentSummaryOrder.remove(options.get(CONTENT_NAME)); + contentSummaryOrder.add(0, options.get(CONTENT_NAME)); + } + } + + public void init(SortedKeyValueIterator source, Map options, IteratorEnvironment env, List contentSummaryOrder) throws IOException { + this.contentSummaryOrder.addAll(contentSummaryOrder); + init(source, options, env); + } + + @Override + public Key getTopKey() { + return tk; + } + + @Override + public Value getTopValue() { + return tv; + } + + @Override + public void seek(Range range, Collection columnFamilies, boolean inclusive) throws IOException { + if (log.isDebugEnabled()) { + log.debug("{} seek'ing with requested range {}", this, range); + } + + // capture the column families and the inclusiveness + this.columnFamilies = columnFamilies != null ? getSortedCFs(columnFamilies) : Collections.emptySortedSet(); + this.inclusive = inclusive; + + // Determine the start key in the d keys + Key startKey = null; + if (range.getStartKey() != null) { + // get the start document + String dtAndUid = getDtUidFromEventKey(range.getStartKey(), true, range.isStartKeyInclusive()); + // if no start document + if (dtAndUid == null) { + // if no column families or not using these column families inclusively + if (this.columnFamilies.isEmpty() || !this.inclusive) { + // then start at the beginning of the d range + startKey = new Key(range.getStartKey().getRow(), Constants.D_COLUMN_FAMILY); + } else { + // otherwise start at the first document specified + startKey = new Key(range.getStartKey().getRow(), Constants.D_COLUMN_FAMILY, + new Text(this.columnFamilies.first() + Constants.NULL)); + } + } else { + // we had a start document specified in the start key, so start there + startKey = new Key(range.getStartKey().getRow(), Constants.D_COLUMN_FAMILY, new Text(dtAndUid)); + } + } + log.debug("{} calling seek to start key: {}", this, startKey); + + // Determine the end key in the d keys + Key endKey = null; + if (range.getEndKey() != null) { + // get the end document + String dtAndUid = getDtUidFromEventKey(range.getEndKey(), false, range.isEndKeyInclusive()); + // if no end document + if (dtAndUid == null) { + // if we do not have column families specified, or they are not inclusive + if (this.columnFamilies.isEmpty() || !this.inclusive) { + // then go to the end of the d keys + endKey = new Key(range.getEndKey().getRow(), Constants.D_COLUMN_FAMILY, new Text(Constants.MAX_UNICODE_STRING)); + } else { + // otherwise end at the last document specified + endKey = new Key(range.getEndKey().getRow(), Constants.D_COLUMN_FAMILY, + new Text(this.columnFamilies.last() + Constants.NULL + Constants.MAX_UNICODE_STRING)); + } + } else { + // we had an end document specified in the end key, so end there + endKey = new Key(range.getStartKey().getRow(), Constants.D_COLUMN_FAMILY, new Text(dtAndUid)); + } + } + log.debug("{} seek'ing to end key: {}", this, endKey); + + // if we have actually exhausted our range, then return with no next key + if (endKey != null && startKey != null && endKey.compareTo(startKey) <= 0) { + this.scanRange = null; + this.tk = null; + this.tv = null; + return; + } + + // set our d keys scan range + this.scanRange = new Range(startKey, false, endKey, false); + + if (log.isDebugEnabled()) { + log.debug("{} seek'ing to: {} from requested range {}", this, this.scanRange, range); + } + + // seek the underlying source + source.seek(this.scanRange, D_COLUMN_FAMILY_BYTE_SEQUENCE, true); + + // get the next key + next(); + } + + @Override + public void next() throws IOException { + tk = null; + tv = null; + + if (log.isTraceEnabled()) { + log.trace("{} calling next on {}", source.hasTop(), scanRange); + } + + // find a valid dt/uid (depends on initial column families set in seek call) + String dtUid = null; + while (source.hasTop() && dtUid == null) { + Key top = source.getTopKey(); + String thisDtUid = getDtUidFromDKey(top); + // if this dt and uid are in the accepted column families... + if (columnFamilies.contains(thisDtUid) == inclusive) { + // we can use this document + dtUid = thisDtUid; + } else { + seekToNextUid(top.getRow(), thisDtUid); + } + } + + // if no more d keys, then we are done. + if (!source.hasTop() || dtUid == null) { + return; + } + + Key top = source.getTopKey(); + + final Map foundContent = new HashMap<>(); + + // while we have d keys for the same document + while (source.hasTop() && dtUid.equals(getDtUidFromDKey(source.getTopKey()))) { + top = source.getTopKey(); + + // get the content name + String currentContentName = getContentName(top); + + if(contentSummaryOrder.contains(currentContentName)) { + foundContent.put(currentContentName, source.getTopValue().get()); + } + + // get the next d key + source.next(); + } + + String summary = createSummary(contentSummaryOrder, foundContent); + if(summary != null) { + tk = new Key(top.getRow(), new Text(dtUid), new Text(summary), top.getColumnVisibility()); + tv = new Value(); + return; + } + + // If we get here, we have not found content to summarize, so return null + tk = null; + tv = null; + } + + private static String createSummary(List contentSummaryOrder, Map foundContent) { + for(String name : contentSummaryOrder) { + if(foundContent.containsKey(name)) { + return name + Constants.NULL + new String(ContentKeyValueFactory.decodeAndDecompressContent(foundContent.get(name))); + } + } + return null; + } + + /** + * Seek to the dt/uid following the one passed in + * + * @param row + * a row + * @param dtAndUid + * the dt and uid string + * @throws IOException + * for issues with read/write + */ + private void seekToNextUid(Text row, String dtAndUid) throws IOException { + Key startKey = new Key(row, Constants.D_COLUMN_FAMILY, new Text(dtAndUid + '.')); + this.scanRange = new Range(startKey, false, this.scanRange.getEndKey(), this.scanRange.isEndKeyInclusive()); + if (log.isDebugEnabled()) { + log.debug("{} seek'ing to next document: {}", this, this.scanRange); + } + + source.seek(this.scanRange, Collections.singleton(new ArrayByteSequence(Constants.D_COLUMN_FAMILY.getBytes())), true); + } + + /** + * Get the content name from the end of the column qualifier of the d key + * + * @param dKey + * the d key + * @return the content name + */ + private static String getContentName(Key dKey) { + String cq = dKey.getColumnQualifier().toString(); + int index = cq.lastIndexOf(Constants.NULL); + return cq.substring(index + 1); + } + + /** + * get the dt and uid from a d key + * + * @param dKey + * the d key + * @return the dt\x00uid + */ + private static String getDtUidFromDKey(Key dKey) { + return getDtUid(dKey.getColumnQualifier().toString()); + } + + @Override + public String toString() { + return "DColumnExcerptIterator: " + contentSummaryOrder; + } + +} diff --git a/warehouse/query-core/src/main/java/datawave/query/iterator/logic/TermFrequencyExcerptIterator.java b/warehouse/query-core/src/main/java/datawave/query/iterator/logic/TermFrequencyExcerptIterator.java index f3b098493f4..cd0e4279636 100644 --- a/warehouse/query-core/src/main/java/datawave/query/iterator/logic/TermFrequencyExcerptIterator.java +++ b/warehouse/query-core/src/main/java/datawave/query/iterator/logic/TermFrequencyExcerptIterator.java @@ -868,7 +868,7 @@ private void seekToNextUid(Text row, String dtAndUid) throws IOException { * the column families * @return a sorted set of column families as Strings */ - private static SortedSet getSortedCFs(Collection columnFamilies) { + protected static SortedSet getSortedCFs(Collection columnFamilies) { return columnFamilies.stream().map(m -> { try { return Text.decode(m.getBackingArray(), m.offset(), m.length()); @@ -916,7 +916,7 @@ private static String getDtUidFromTfKey(Key tfKey) { * inclusive boolean flag * @return the start or end document (cq) for our tf scan range. Null if dt,uid does not exist in the event key */ - private static String getDtUidFromEventKey(Key eventKey, boolean startKey, boolean inclusive) { + protected static String getDtUidFromEventKey(Key eventKey, boolean startKey, boolean inclusive) { // if an infinite end range, or unspecified end document, then no document to specify if (eventKey == null || eventKey.getColumnFamily() == null || eventKey.getColumnFamily().getLength() == 0) { return null; @@ -941,7 +941,7 @@ private static String getDtUidFromEventKey(Key eventKey, boolean startKey, boole } // get the dt/uid from the beginning of a given string - private static String getDtUid(String str) { + protected static String getDtUid(String str) { int index = str.indexOf(Constants.NULL); index = str.indexOf(Constants.NULL, index + 1); return index == -1 ? str : str.substring(0, index); diff --git a/warehouse/query-core/src/main/java/datawave/query/table/parser/ContentKeyValueFactory.java b/warehouse/query-core/src/main/java/datawave/query/table/parser/ContentKeyValueFactory.java index 558d7ef2e4a..cb20caecc80 100644 --- a/warehouse/query-core/src/main/java/datawave/query/table/parser/ContentKeyValueFactory.java +++ b/warehouse/query-core/src/main/java/datawave/query/table/parser/ContentKeyValueFactory.java @@ -45,19 +45,7 @@ public static ContentKeyValue parse(Key key, Value value, Authorizations auths, * We are storing 'documents' in this column gzip'd and base64 encoded. Base64.decode detects and handles compression. */ byte[] contents = value.get(); - try { - contents = decompress(Base64.getMimeDecoder().decode(contents)); - } catch (IOException e) { - log.error("Error decompressing Base64 encoded GZIPInputStream", e); - } catch (Exception e) { - // Thrown when data is not Base64 encoded. Try GZIP - try { - contents = decompress(contents); - } catch (IOException ioe) { - log.error("Error decompressing GZIPInputStream", e); - } - } - + contents = decodeAndDecompressContent(contents); c.setContents(contents); } @@ -66,6 +54,22 @@ public static ContentKeyValue parse(Key key, Value value, Authorizations auths, return c; } + public static byte[] decodeAndDecompressContent(byte[] contents){ + try { + contents = decompress(Base64.getMimeDecoder().decode(contents)); + } catch (IOException e) { + log.error("Error decompressing Base64 encoded GZIPInputStream", e); + } catch (Exception e) { + // Thrown when data is not Base64 encoded. Try GZIP + try { + contents = decompress(contents); + } catch (IOException ioe) { + log.error("Error decompressing GZIPInputStream", e); + } + } + return contents; + } + private static boolean isCompressed(byte[] compressed) { return (compressed[0] == (byte) (GZIPInputStream.GZIP_MAGIC)) && (compressed[1] == (byte) (GZIPInputStream.GZIP_MAGIC >> 8)); } diff --git a/warehouse/query-core/src/main/java/datawave/query/transformer/SummaryTransform.java b/warehouse/query-core/src/main/java/datawave/query/transformer/SummaryTransform.java new file mode 100644 index 00000000000..5041f0642cb --- /dev/null +++ b/warehouse/query-core/src/main/java/datawave/query/transformer/SummaryTransform.java @@ -0,0 +1,509 @@ +package datawave.query.transformer; + +import com.google.common.collect.Iterators; +import com.google.protobuf.InvalidProtocolBufferException; +import datawave.common.util.ArgumentChecker; +import datawave.ingest.protobuf.TermWeight; +import datawave.ingest.protobuf.TermWeightPosition; +import datawave.query.Constants; +import datawave.query.attributes.Attribute; +import datawave.query.attributes.Attributes; +import datawave.query.attributes.Content; +import datawave.query.attributes.Document; +import datawave.query.attributes.ExcerptFields; +import datawave.query.attributes.ValueTuple; +import datawave.query.function.JexlEvaluation; +import datawave.query.iterator.logic.DColumnSummaryIterator; +import datawave.query.iterator.logic.TermFrequencyExcerptIterator; +import datawave.query.postprocessing.tf.PhraseIndexes; +import datawave.query.postprocessing.tf.PhraseOffset; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Set; + +import static datawave.query.iterator.logic.TermFrequencyExcerptIterator.Configuration.END_OFFSET; +import static datawave.query.iterator.logic.TermFrequencyExcerptIterator.Configuration.FIELD_NAME; +import static datawave.query.iterator.logic.TermFrequencyExcerptIterator.Configuration.START_OFFSET; + +public class SummaryTransform extends DocumentTransform.DefaultDocumentTransform { + + private static final Logger log = LoggerFactory.getLogger(SummaryTransform.class); + + private final DColumnSummaryIterator summaryIterator; + private final ExcerptFields excerptFields; + private final IteratorEnvironment env; + private final SortedKeyValueIterator source; + + private final ArrayList hitTermValues = new ArrayList<>(); + + public SummaryTransform(ExcerptFields excerptFields, IteratorEnvironment env, SortedKeyValueIterator source) { + this(excerptFields, env, source, new TermFrequencyExcerptIterator()); + } + + public SummaryTransform(ExcerptFields excerptFields, IteratorEnvironment env, SortedKeyValueIterator source, + SortedKeyValueIterator summaryIterator) { + ArgumentChecker.notNull(excerptFields); + this.excerptFields = excerptFields; + this.env = env; + this.source = source; + this.summaryIterator = (DColumnSummaryIterator) summaryIterator; + } + + @Nullable + @Override + public Entry apply(@Nullable Entry entry) { + if (entry != null) { + Document document = entry.getValue(); + // Do not bother adding excerpts to transient documents. + if (document.isToKeep()) { + PhraseIndexes phraseIndexes = getPhraseIndexes(document); + if (!phraseIndexes.isEmpty()) { + if (log.isTraceEnabled()) { + log.trace("Fetching phrase excerpts {} for document {}", excerptFields, document.getMetadata()); + } + Set excerpts = getExcerpts(phraseIndexes); + addExcerptsToDocument(excerpts, document); + } else { + if (log.isTraceEnabled()) { + log.trace("Phrase indexes were not added to document {}, skipping", document.getMetadata()); + } + } + } + } + return entry; + } + + /** + * Retrieve the phrase indexes from the {@value #PHRASE_INDEXES_ATTRIBUTE} attribute in the document. + * + * @param document + * the document + * @return the phrase indexes + */ + private PhraseIndexes getPhraseIndexes(Document document) { + PhraseIndexes phraseIndexes = null; + PhraseIndexes allPhraseIndexes = new PhraseIndexes(); + // first lets find all the phrase indexes that came from phrase functions + if (document.containsKey(PHRASE_INDEXES_ATTRIBUTE)) { + Content content = (Content) document.get(PHRASE_INDEXES_ATTRIBUTE); + phraseIndexes = PhraseIndexes.from(content.getContent()); + allPhraseIndexes.addAll(phraseIndexes); + } + // now lets find all the terms for excerpt fields and add them to the list + if (document.containsKey(JexlEvaluation.HIT_TERM_FIELD)) { + Attributes hitList = (Attributes) document.get(JexlEvaluation.HIT_TERM_FIELD); + // for each hit term + for (Attribute attr : hitList.getAttributes()) { + ValueTuple hitTuple = attributeToHitTuple(attr); + // if this is for a requested excerpt field + if (excerptFields.containsField(hitTuple.getFieldName())) { + // get the offset, preferring offsets that overlap with existing phrases for this field/eventId + TermWeightPosition pos = getOffset(hitTuple, phraseIndexes); + if (pos != null) { + // add the term as a phrase as defined in the term weight position. Note that this will collapse with any overlapping phrases already in + // the list. + allPhraseIndexes.addIndexTriplet(String.valueOf(hitTuple.getFieldName()), keyToEventId(attr.getMetadata()), pos.getLowOffset(), + pos.getOffset()); + } + // save the hit term for later call-out + Collections.addAll(hitTermValues, ((String) hitTuple.getValue()).split(Constants.SPACE)); + } + } + } + // return the file set of phrase indexes if any + return allPhraseIndexes; + } + + /** + * Get the term weight position (offset) for the specified hit term. This will return an offset overlapping a phrase in the existing phrase index map first. + * Otherwise, the first position will be returned. + * + * @param hitTuple + * The hit term tuple + * @param phraseIndexes + * The phrase indexes + * @return The TermWeightPosition for the given hit term + */ + private TermWeightPosition getOffset(ValueTuple hitTuple, PhraseIndexes phraseIndexes) { + Key docKey = hitTuple.getSource().getMetadata(); + // if we do not know the source document key, then we cannot find the term offset + if (docKey == null) { + log.warn("Unable to find the source document for a hit term, skipping excerpt generation"); + return null; + } + String fieldName = hitTuple.getFieldName(); + String eventId = keyToEventId(docKey); + + // get the key at which we would find the term frequencies + Key tfKey = new Key(docKey.getRow().toString(), Constants.TERM_FREQUENCY_COLUMN_FAMILY.toString(), + docKey.getColumnFamily().toString() + '\u0000' + hitTuple.getValue() + '\u0000' + hitTuple.getFieldName()); + Range range = new Range(tfKey, tfKey.followingKey(PartialKey.ROW_COLFAM_COLQUAL)); + try { + // seek directly to that key + source.seek(range, Collections.emptyList(), false); + if (source.hasTop()) { + TermWeightPosition pos = null; + + // parse the term frequencies + TermWeight.Info twInfo = TermWeight.Info.parseFrom(source.getTopValue().get()); + + // if we have phrase indexes, then find one that overlaps if any + if (phraseIndexes != null) { + pos = phraseIndexes.getOverlappingPosition(fieldName, eventId, twInfo); + } + + // if no overlapping phrases, then return the first position + if (pos == null) { + TermWeightPosition.Builder position = new TermWeightPosition.Builder(); + position.setTermWeightOffsetInfo(twInfo, 0); + pos = position.build(); + } + + return pos; + } + + } catch (InvalidProtocolBufferException e) { + log.error("Value passed to aggregator was not of type TermWeight.Info for {}", tfKey, e); + } catch (IOException e) { + log.error("Failed to scan for term frequencies at {}", tfKey, e); + } + return null; + } + + /** + * Add the excerpts to the document as part of {@value #HIT_EXCERPT}. + * + * @param excerpts + * the excerpts to add + * @param document + * the document + */ + private static void addExcerptsToDocument(Set excerpts, Document document) { + Attributes attributesWithoutScores = new Attributes(true); + Attributes attributesWithScores = new Attributes(true); + Attributes attributesOneBest = new Attributes(true); + + boolean hasScores = false; + + for (Excerpt excerpt : excerpts) { + Content contentWithoutScores = new Content(excerpt.getExcerptWithoutScores(), excerpt.getSource(), true); + attributesWithoutScores.add(contentWithoutScores); + + String excerptWithScores = excerpt.getExcerptWithScores(); + if (excerptWithScores.isBlank() || excerptWithScores.equals(TermFrequencyExcerptIterator.NOT_SCORED_MARKER)) { + continue; + } + + hasScores = true; + + Content contentWithScores = new Content(excerptWithScores, excerpt.getSource(), true); + attributesWithScores.add(contentWithScores); + Content contentOneBest = new Content(excerpt.getExcerptOneBest(), excerpt.getSource(), true); + attributesOneBest.add(contentOneBest); + } + + document.put(HIT_EXCERPT, attributesWithoutScores); + if (hasScores) { + document.put(HIT_EXCERPT_WITH_SCORES, attributesWithScores); + document.put(HIT_EXCERPT_ONE_BEST, attributesOneBest); + } + } + + /** + * Get the excerpts. + * + * @param phraseIndexes + * the pre-identified phrase offsets + * @return the excerpts + */ + private Set getExcerpts(PhraseIndexes phraseIndexes) { + final PhraseIndexes offsetPhraseIndexes = getOffsetPhraseIndexes(phraseIndexes, excerptFields); + if (offsetPhraseIndexes.isEmpty()) { + return Collections.emptySet(); + } + + // Fetch the excerpts. + Set excerpts = new HashSet<>(); + for (String field : offsetPhraseIndexes.getFields()) { + Collection indexes = offsetPhraseIndexes.getPhraseOffsets(field); + for (PhraseOffset phraseOffset : indexes) { + String eventId = phraseOffset.getEventId(); + int start = phraseOffset.getStartOffset(); + int end = phraseOffset.getEndOffset(); + if (log.isTraceEnabled()) { + log.trace("Fetching excerpt [{},{}] for field {} for document {}", start, end, field, eventId.replace('\u0000', '/')); + } + + // Construct the required range for this document. + Key startKey = eventIdToKey(eventId); + Key endKey; + if (startKey != null) { + endKey = startKey.followingKey(PartialKey.ROW_COLFAM); + } else { + throw new IllegalStateException("eventID string was null"); + } + Range range = new Range(startKey, true, endKey, false); + + Excerpt excerpt = getExcerpt(field, start, end, range, hitTermValues); + // Only retain non-blank excerpts. + if (!excerpt.isEmpty()) { + excerpts.add(excerpt); + } else { + if (log.isTraceEnabled()) { + log.trace("Failed to find excerpt [{},{}] for field {} for document {}", start, end, field, eventId.replace('\u0000', '/')); + } + } + } + } + return excerpts; + } + + /** + * Get the excerpt for the specified field. + * + * @param field + * the field + * @param start + * the start index of the excerpt + * @param end + * the end index of the excerpt + * @param range + * the range to use when seeking + * @param hitTermValues + * the term values to match + * @return the excerpt + */ + private Excerpt getExcerpt(String field, int start, int end, Range range, ArrayList hitTermValues) { + + final Map excerptIteratorOptions = new HashMap<>(); + excerptIteratorOptions.put(FIELD_NAME, field); + // We will attempt to create the excerpt we want up to two times. + // Currently, the only condition that will cause a second attempt is if we detect stop words in the TFs we scan. + // The main difference in the second attempt is that it runs with an expanded range to allow us to remove the + // stop words and still have a correctly sized excerpt + for (int attempt = 0; attempt <= 1; attempt++) { + // if this is the first attempt, set the start and end offsets using the passed in values + if (attempt == 0) { + excerptIteratorOptions.put(START_OFFSET, String.valueOf(start)); + excerptIteratorOptions.put(END_OFFSET, String.valueOf(end)); + } else { + // if this is the second attempt, set up the iterator with a larger range by adding/subtracting + // the start and end offsets by "expandedSize" + int expandedStart = Math.max(start - expandSize, 0); + int expandedEnd = end + expandSize; + excerptIteratorOptions.put(START_OFFSET, String.valueOf(expandedStart)); + excerptIteratorOptions.put(END_OFFSET, String.valueOf(expandedEnd)); + + if (log.isDebugEnabled()) { + log.debug("size of excerpt requested: {}", excerptFields.getOffset(field) * 2); + log.debug("original range is ({},{}) and the expanded range is ({},{})", start, end, expandedStart, expandedEnd); + } + } + + try { + // set all of our options for the iterator + summaryIterator.init(source, excerptIteratorOptions, env); + + // run the iterator + summaryIterator.seek(range, Collections.emptyList(), false); + + // if an excerpt is returned... + if (summaryIterator.hasTop()) { + // the excerpt will be in the column qualifier of the top key + Key topKey = summaryIterator.getTopKey(); + // The column qualifier is expected to be field\0phraseWithScores\0phraseWithoutScores\0oneBestExcerpt. + // split the column qualifier on null bytes to get the different parts + // we should have 4 parts after splitting the column qualifier on the null bytes + final String[] parts = topKey.getColumnQualifier().toString().split(Constants.NULL); + // if we don't have 4 parts after splitting the column qualifier... + if (parts.length != 4) { + if (attempt == 0) { // if this is the first attempt, try again + continue; + } + + // if this is the second attempt, log an error + if (log.isErrorEnabled()) { + log.error("{} returned top key with incorrectly-formatted column qualifier in key: {} when scanning for excerpt [{},{}] for field {} within range {} : parts= {}", + TermFrequencyExcerptIterator.class.getSimpleName(), topKey, start, end, field, range, Arrays.toString(parts)); + } + break; + } + + // if we have reached the limit of times to try, or we have no stop words removed + if (!parts[1].equals(TermFrequencyExcerptIterator.WORD_SKIPPED_MARKER)) { + // return just the excerpt parts + return new Excerpt(range.getStartKey(), parts[1], parts[2], parts[3]); + } + } else { // If no excerpt was returned on the first attempt, try again. If no excerpt was returned on the second attempt, log an error. + if (attempt == 1 && log.isErrorEnabled()) { + log.error("TermFrequencyExcerptIterator returned with hasTop() false: something went wrong in the iterator (or given bad parameters to run with)"); + log.error("The iterator options were: Field \"{}\" Range= {} StartOffset= {} EndOffset= {} HitTerms= {}", field, range, + excerptIteratorOptions.get(START_OFFSET), excerptIteratorOptions.get(END_OFFSET), hitTermValues); + break; + } + } + } catch (IOException e) { + throw new RuntimeException("Failed to scan for excerpt [" + start + "," + end + "] for field " + field + " within range " + range, e); + } + } + // when working correctly, it should always return from inside the loop so if this is reached something went very wrong + return ERROR_EXCERPT; + } + + /** + * Returned a filtered {@link PhraseIndexes} that only contains the fields for which excerpts are desired, with the indexes offset by the specified excerpt + * offset. + * + * @param phraseIndexes + * the original phrase indexes + * @param excerptFields + * the fields that we want excerpts for + * @return the filtered, offset phrase indexes + */ + private static PhraseIndexes getOffsetPhraseIndexes(PhraseIndexes phraseIndexes, ExcerptFields excerptFields) { + PhraseIndexes offsetPhraseIndexes = new PhraseIndexes(); + for (String field : excerptFields.getFields()) { + // Filter out phrases that are not in desired fields. + Collection indexes = phraseIndexes.getPhraseOffsets(field); + if (indexes != null) { + int offset = excerptFields.getOffset(field); + // Ensure the offset is modified to encompass the target excerpt range. + for (PhraseOffset indexPair : indexes) { + String eventId = indexPair.getEventId(); + int start = indexPair.getStartOffset() <= offset ? 0 : indexPair.getStartOffset() - offset; + int end = indexPair.getEndOffset() + offset + 1; // Add 1 here to offset the non-inclusive end of the range that will be used when scanning. + offsetPhraseIndexes.addIndexTriplet(field, eventId, start, end); + } + } + } + return offsetPhraseIndexes; + } + + /** + * Given a hit term attribute, return a ValueTuple representation which will give us the field and value parsed out. + * + * @param source + * a hit term attribute + * @return A ValueTuple representation of the document hit-term attribute + */ + private static ValueTuple attributeToHitTuple(Attribute source) { + String hitTuple = String.valueOf(source.getData()); + int index = hitTuple.indexOf(':'); + String fieldName = hitTuple.substring(0, index); + String value = hitTuple.substring(index + 1); + return new ValueTuple(fieldName, value, value, source); + } + + /** + * Given an event ID, return the document Key + * + * @param eventId + * eventId string + * @return the document Key + */ + private static Key eventIdToKey(String eventId) { + if (eventId != null) { + int split = eventId.indexOf('\u0000'); + if (split < 0) { + throw new IllegalStateException("Malformed eventId (expected a null separator): " + eventId); + } + return new Key(eventId.substring(0, split), eventId.substring(split + 1)); + } + return null; + } + + /** + * Given a document key, return the eventId + * + * @param docKey + * document key + * @return the event id (shard\x00dt\x00uid) + */ + private static String keyToEventId(Key docKey) { + return docKey != null ? docKey.getRow().toString() + '\u0000' + docKey.getColumnFamily().toString() : null; + } + + /** + * Add phrase excerpts to the documents from the given iterator. + * + * @param in + * the iterator source + * @return an iterator that will supply the enriched documents + */ + public Iterator> getIterator(final Iterator> in) { + return Iterators.transform(in, this); + } + + /** + * A class that holds the info for one excerpt. + */ + private static class Excerpt { + private final String excerptWithScores; + private final String excerptWithoutScores; + private final String excerptOneBest; + private final Key source; + + public Excerpt(Key source, String excerptWithScores, String excerptWithoutScores, String excerptOneBest) { + this.source = source; + this.excerptWithScores = excerptWithScores; + this.excerptWithoutScores = excerptWithoutScores; + this.excerptOneBest = excerptOneBest; + } + + public String getExcerptWithScores() { + return excerptWithScores; + } + + public String getExcerptWithoutScores() { + return excerptWithoutScores; + } + + public String getExcerptOneBest() { + return excerptOneBest; + } + + public Key getSource() { + return source; + } + + public boolean isEmpty() { + return excerptWithScores.isEmpty() && excerptWithoutScores.isEmpty() && excerptOneBest.isEmpty(); + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + Excerpt excerpt1 = (Excerpt) o; + return (excerptWithScores.equals(excerpt1.excerptWithScores) && source.equals(excerpt1.source)) + && (excerptWithoutScores.equals(excerpt1.excerptWithoutScores)) && (excerptOneBest.equals(excerpt1.excerptOneBest)); + } + + @Override + public int hashCode() { + return Objects.hash(excerptWithScores, excerptWithoutScores, excerptOneBest, source); + } + } + +} diff --git a/warehouse/query-core/src/test/java/datawave/query/iterator/logic/DColumnSummaryIteratorTest.java b/warehouse/query-core/src/test/java/datawave/query/iterator/logic/DColumnSummaryIteratorTest.java new file mode 100644 index 00000000000..f7494fc70cd --- /dev/null +++ b/warehouse/query-core/src/test/java/datawave/query/iterator/logic/DColumnSummaryIteratorTest.java @@ -0,0 +1,182 @@ +package datawave.query.iterator.logic; + +import datawave.ingest.mapreduce.handler.ExtendedDataTypeHandler; +import datawave.query.Constants; +import datawave.query.iterator.SortedListKeyValueIterator; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.hadoop.io.Text; +import org.easymock.EasyMockRunner; +import org.easymock.EasyMockSupport; +import org.easymock.Mock; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Base64; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.zip.GZIPOutputStream; + +import static datawave.query.iterator.logic.DColumnSummaryIterator.CONTENT_NAME; +import static datawave.query.iterator.logic.DColumnSummaryIterator.SUMMARY_SIZE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@RunWith(EasyMockRunner.class) +public class DColumnSummaryIteratorTest extends EasyMockSupport { + + private static final Text row = new Text("20220115_1"); + private static final Text colf = new Text(ExtendedDataTypeHandler.FULL_CONTENT_COLUMN_FAMILY); + + @Mock + private IteratorEnvironment env; + private static final List> source = new ArrayList<>(); + private final Map options = new HashMap<>(); + private final DColumnSummaryIterator iterator = new DColumnSummaryIterator(); + + @BeforeClass + public static void beforeClass() throws IOException { + givenData("email", "123.456.789", "CONTENT1", "test content"); + givenData("email", "987.654.321", "CONTENT1", "test content two first"); + givenData("email", "987.654.321", "CONTENT2", "test content two second"); + givenData("pdf", "111.222.333", "CONTENT2", "this is a test"); + } + + private static void givenData(String datatype, String uid, String contentName, String content) throws IOException { + Text colq = new Text(datatype + Constants.NULL + uid + Constants.NULL + contentName); + Key key = new Key(row, colf, colq, new ColumnVisibility("ALL"), new Date().getTime()); + final ByteArrayOutputStream bos = new ByteArrayOutputStream(Math.max(content.getBytes().length / 2, 1024)); + final OutputStream b64s = Base64.getEncoder().wrap(bos); + final GZIPOutputStream gzip = new GZIPOutputStream(b64s); + gzip.write(content.getBytes()); + gzip.close(); + b64s.close(); + bos.close(); + Value value = new Value(bos.toByteArray()); + Map.Entry entry = new AbstractMap.SimpleEntry<>(key, value); + source.add(entry); + } + + @After + public void tearDown() { + options.clear(); + } + + private void givenOptions(String contentName, int summarySize) { + if(contentName != null) { + options.put(CONTENT_NAME, contentName); + } + options.put(SUMMARY_SIZE, String.valueOf(summarySize)); + } + + private void initIterator() throws IOException { + iterator.init(new SortedListKeyValueIterator(source), options, env); + } + + /** + * Verify that the expected phrase is found for the typical usage case of this iterator. + */ + @Test + public void testMatchFound1() throws IOException { + givenOptions("CONTENT1", 100); + initIterator(); + + Key startKey = new Key(row, new Text("email" + Constants.NULL + "123.456.789")); + Range range = new Range(startKey, true, startKey.followingKey(PartialKey.ROW_COLFAM), false); + + iterator.seek(range, Collections.emptyList(), false); + + assertTrue(iterator.hasTop()); + + Key topKey = iterator.getTopKey(); + assertEquals(row, topKey.getRow()); + assertEquals(new Text("email" + Constants.NULL + "123.456.789"), topKey.getColumnFamily()); + assertEquals(new Text("CONTENT1" + Constants.NULL + "test content"), topKey.getColumnQualifier()); + } + + /** + * Verify that the expected phrase is found for the typical usage case of this iterator. + */ + @Test + public void testMatchFound2() throws IOException { + givenOptions("CONTENT2", 100); + initIterator(); + + Key startKey = new Key(row, new Text("email" + Constants.NULL + "987.654.321")); + Range range = new Range(startKey, true, startKey.followingKey(PartialKey.ROW_COLFAM), false); + + iterator.seek(range, Collections.emptyList(), false); + + assertTrue(iterator.hasTop()); + + Key topKey = iterator.getTopKey(); + assertEquals(row, topKey.getRow()); + assertEquals(new Text("email" + Constants.NULL + "987.654.321"), topKey.getColumnFamily()); + assertEquals(new Text("CONTENT2" + Constants.NULL + "test content two second"), topKey.getColumnQualifier()); + } + + /** + * Verify that the expected phrase is found for the typical usage case of this iterator. + */ + @Test + public void testMatchFoundSpecificContentNotFirstInList() throws IOException { + givenOptions("CONTENT2", 100); + iterator.init(new SortedListKeyValueIterator(source), options, env, List.of("CONTENT1", "CONTENT2")); + + Key startKey = new Key(row, new Text("email" + Constants.NULL + "987.654.321")); + Range range = new Range(startKey, true, startKey.followingKey(PartialKey.ROW_COLFAM), false); + + iterator.seek(range, Collections.emptyList(), false); + + assertTrue(iterator.hasTop()); + + Key topKey = iterator.getTopKey(); + assertEquals(row, topKey.getRow()); + assertEquals(new Text("email" + Constants.NULL + "987.654.321"), topKey.getColumnFamily()); + assertEquals(new Text("CONTENT2" + Constants.NULL + "test content two second"), topKey.getColumnQualifier()); + } + + /** + * Verify that specifying a non-matching datatype or uid results in the iterator not having a top. + */ + @Test + public void testNoMatchFoundForDataTypeAndUid() throws IOException { + givenOptions("CONTENT2", 50); + initIterator(); + + Key startKey = new Key(row, new Text("other" + Constants.NULL + "111.111.111")); + Range range = new Range(startKey, true, startKey.followingKey(PartialKey.ROW_COLFAM), false); + + iterator.seek(range, Collections.emptyList(), false); + + assertFalse(iterator.hasTop()); + } + + @Test + public void testNoMatchFoundForContentName() throws IOException { + givenOptions("THISWONTBEFOUND", 100); + iterator.init(new SortedListKeyValueIterator(source), options, env); + + Key startKey = new Key(row, new Text("email" + Constants.NULL + "987.654.321")); + Range range = new Range(startKey, true, startKey.followingKey(PartialKey.ROW_COLFAM), false); + + iterator.seek(range, Collections.emptyList(), false); + + assertFalse(iterator.hasTop()); + } +}