Skip to content

Commit

Permalink
working iterator (not working transform)
Browse files Browse the repository at this point in the history
  • Loading branch information
austin007008 committed Nov 18, 2024
1 parent cd604f3 commit faf7544
Show file tree
Hide file tree
Showing 6 changed files with 1,009 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ public class Constants {
// From ingest
public static final Text TERM_FREQUENCY_COLUMN_FAMILY = new Text("tf");

public static final Text D_COLUMN_FAMILY = new Text("d");

// content functions
public static final String TERM_OFFSET_MAP_JEXL_VARIABLE_NAME = ContentFunctions.TERM_OFFSET_MAP_JEXL_VARIABLE_NAME;
public static final String CONTENT_FUNCTION_NAMESPACE = ContentFunctions.CONTENT_FUNCTION_NAMESPACE;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,296 @@
package datawave.query.iterator.logic;

import datawave.query.Constants;
import datawave.query.table.parser.ContentKeyValueFactory;
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;

import static datawave.query.iterator.logic.TermFrequencyExcerptIterator.getDtUid;
import static datawave.query.iterator.logic.TermFrequencyExcerptIterator.getDtUidFromEventKey;
import static datawave.query.iterator.logic.TermFrequencyExcerptIterator.getSortedCFs;

/**
* This iterator is intended to scan the d column for a specified document. The result will be a summary
* for each document scanned.
*/
public class DColumnSummaryIterator implements SortedKeyValueIterator<Key,Value> {
private static final Logger log = LoggerFactory.getLogger(DColumnSummaryIterator.class);
private static final Collection<ByteSequence> D_COLUMN_FAMILY_BYTE_SEQUENCE = Collections
.singleton(new ArrayByteSequence(Constants.D_COLUMN_FAMILY.getBytes()));

public static final String SUMMARY_SIZE = "summary.size";

public static final String CONTENT_NAME = "content.name";

private final ArrayList<String> contentSummaryOrder = new ArrayList<>();

/** the underlying source */
protected SortedKeyValueIterator<Key,Value> source;

/** The specified dt/uid column families */
protected SortedSet<String> columnFamilies;

/** inclusive or exclusive dt/uid column families */
protected boolean inclusive;

/** the underlying D column scan range */
protected Range scanRange;

/** the top key */
protected Key tk;

/** the top value */
protected Value tv;

protected int summarySize;

@Override
public boolean hasTop() {
return tk != null;
}

@Override
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
DColumnSummaryIterator it = new DColumnSummaryIterator();
it.source = source.deepCopy(env);
return it;
}

@Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
this.source = source;
this.summarySize = Integer.parseInt(options.get(SUMMARY_SIZE));
if(options.containsKey(CONTENT_NAME)) {
contentSummaryOrder.remove(options.get(CONTENT_NAME));
contentSummaryOrder.add(0, options.get(CONTENT_NAME));
}
}

public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env, List<String> contentSummaryOrder) throws IOException {
this.contentSummaryOrder.addAll(contentSummaryOrder);
init(source, options, env);
}

@Override
public Key getTopKey() {
return tk;
}

@Override
public Value getTopValue() {
return tv;
}

@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
if (log.isDebugEnabled()) {
log.debug("{} seek'ing with requested range {}", this, range);
}

// capture the column families and the inclusiveness
this.columnFamilies = columnFamilies != null ? getSortedCFs(columnFamilies) : Collections.emptySortedSet();
this.inclusive = inclusive;

// Determine the start key in the d keys
Key startKey = null;
if (range.getStartKey() != null) {
// get the start document
String dtAndUid = getDtUidFromEventKey(range.getStartKey(), true, range.isStartKeyInclusive());
// if no start document
if (dtAndUid == null) {
// if no column families or not using these column families inclusively
if (this.columnFamilies.isEmpty() || !this.inclusive) {
// then start at the beginning of the d range
startKey = new Key(range.getStartKey().getRow(), Constants.D_COLUMN_FAMILY);
} else {
// otherwise start at the first document specified
startKey = new Key(range.getStartKey().getRow(), Constants.D_COLUMN_FAMILY,
new Text(this.columnFamilies.first() + Constants.NULL));
}
} else {
// we had a start document specified in the start key, so start there
startKey = new Key(range.getStartKey().getRow(), Constants.D_COLUMN_FAMILY, new Text(dtAndUid));
}
}
log.debug("{} calling seek to start key: {}", this, startKey);

// Determine the end key in the d keys
Key endKey = null;
if (range.getEndKey() != null) {
// get the end document
String dtAndUid = getDtUidFromEventKey(range.getEndKey(), false, range.isEndKeyInclusive());
// if no end document
if (dtAndUid == null) {
// if we do not have column families specified, or they are not inclusive
if (this.columnFamilies.isEmpty() || !this.inclusive) {
// then go to the end of the d keys
endKey = new Key(range.getEndKey().getRow(), Constants.D_COLUMN_FAMILY, new Text(Constants.MAX_UNICODE_STRING));
} else {
// otherwise end at the last document specified
endKey = new Key(range.getEndKey().getRow(), Constants.D_COLUMN_FAMILY,
new Text(this.columnFamilies.last() + Constants.NULL + Constants.MAX_UNICODE_STRING));
}
} else {
// we had an end document specified in the end key, so end there
endKey = new Key(range.getStartKey().getRow(), Constants.D_COLUMN_FAMILY, new Text(dtAndUid));
}
}
log.debug("{} seek'ing to end key: {}", this, endKey);

// if we have actually exhausted our range, then return with no next key
if (endKey != null && startKey != null && endKey.compareTo(startKey) <= 0) {
this.scanRange = null;
this.tk = null;
this.tv = null;
return;
}

// set our d keys scan range
this.scanRange = new Range(startKey, false, endKey, false);

if (log.isDebugEnabled()) {
log.debug("{} seek'ing to: {} from requested range {}", this, this.scanRange, range);
}

// seek the underlying source
source.seek(this.scanRange, D_COLUMN_FAMILY_BYTE_SEQUENCE, true);

// get the next key
next();
}

@Override
public void next() throws IOException {
tk = null;
tv = null;

if (log.isTraceEnabled()) {
log.trace("{} calling next on {}", source.hasTop(), scanRange);
}

// find a valid dt/uid (depends on initial column families set in seek call)
String dtUid = null;
while (source.hasTop() && dtUid == null) {
Key top = source.getTopKey();
String thisDtUid = getDtUidFromDKey(top);
// if this dt and uid are in the accepted column families...
if (columnFamilies.contains(thisDtUid) == inclusive) {
// we can use this document
dtUid = thisDtUid;
} else {
seekToNextUid(top.getRow(), thisDtUid);
}
}

// if no more d keys, then we are done.
if (!source.hasTop() || dtUid == null) {
return;
}

Key top = source.getTopKey();

final Map<String, byte[]> foundContent = new HashMap<>();

// while we have d keys for the same document
while (source.hasTop() && dtUid.equals(getDtUidFromDKey(source.getTopKey()))) {
top = source.getTopKey();

// get the content name
String currentContentName = getContentName(top);

if(contentSummaryOrder.contains(currentContentName)) {
foundContent.put(currentContentName, source.getTopValue().get());
}

// get the next d key
source.next();
}

String summary = createSummary(contentSummaryOrder, foundContent);
if(summary != null) {
tk = new Key(top.getRow(), new Text(dtUid), new Text(summary), top.getColumnVisibility());
tv = new Value();
return;
}

// If we get here, we have not found content to summarize, so return null
tk = null;
tv = null;
}

private static String createSummary(List<String> contentSummaryOrder, Map<String, byte[]> foundContent) {
for(String name : contentSummaryOrder) {
if(foundContent.containsKey(name)) {
return name + Constants.NULL + new String(ContentKeyValueFactory.decodeAndDecompressContent(foundContent.get(name)));
}
}
return null;
}

/**
* Seek to the dt/uid following the one passed in
*
* @param row
* a row
* @param dtAndUid
* the dt and uid string
* @throws IOException
* for issues with read/write
*/
private void seekToNextUid(Text row, String dtAndUid) throws IOException {
Key startKey = new Key(row, Constants.D_COLUMN_FAMILY, new Text(dtAndUid + '.'));
this.scanRange = new Range(startKey, false, this.scanRange.getEndKey(), this.scanRange.isEndKeyInclusive());
if (log.isDebugEnabled()) {
log.debug("{} seek'ing to next document: {}", this, this.scanRange);
}

source.seek(this.scanRange, Collections.singleton(new ArrayByteSequence(Constants.D_COLUMN_FAMILY.getBytes())), true);
}

/**
* Get the content name from the end of the column qualifier of the d key
*
* @param dKey
* the d key
* @return the content name
*/
private static String getContentName(Key dKey) {
String cq = dKey.getColumnQualifier().toString();
int index = cq.lastIndexOf(Constants.NULL);
return cq.substring(index + 1);
}

/**
* get the dt and uid from a d key
*
* @param dKey
* the d key
* @return the dt\x00uid
*/
private static String getDtUidFromDKey(Key dKey) {
return getDtUid(dKey.getColumnQualifier().toString());
}

@Override
public String toString() {
return "DColumnExcerptIterator: " + contentSummaryOrder;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -868,7 +868,7 @@ private void seekToNextUid(Text row, String dtAndUid) throws IOException {
* the column families
* @return a sorted set of column families as Strings
*/
private static SortedSet<String> getSortedCFs(Collection<ByteSequence> columnFamilies) {
protected static SortedSet<String> getSortedCFs(Collection<ByteSequence> columnFamilies) {
return columnFamilies.stream().map(m -> {
try {
return Text.decode(m.getBackingArray(), m.offset(), m.length());
Expand Down Expand Up @@ -916,7 +916,7 @@ private static String getDtUidFromTfKey(Key tfKey) {
* inclusive boolean flag
* @return the start or end document (cq) for our tf scan range. Null if dt,uid does not exist in the event key
*/
private static String getDtUidFromEventKey(Key eventKey, boolean startKey, boolean inclusive) {
protected static String getDtUidFromEventKey(Key eventKey, boolean startKey, boolean inclusive) {
// if an infinite end range, or unspecified end document, then no document to specify
if (eventKey == null || eventKey.getColumnFamily() == null || eventKey.getColumnFamily().getLength() == 0) {
return null;
Expand All @@ -941,7 +941,7 @@ private static String getDtUidFromEventKey(Key eventKey, boolean startKey, boole
}

// get the dt/uid from the beginning of a given string
private static String getDtUid(String str) {
protected static String getDtUid(String str) {
int index = str.indexOf(Constants.NULL);
index = str.indexOf(Constants.NULL, index + 1);
return index == -1 ? str : str.substring(0, index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,7 @@ public static ContentKeyValue parse(Key key, Value value, Authorizations auths,
* We are storing 'documents' in this column gzip'd and base64 encoded. Base64.decode detects and handles compression.
*/
byte[] contents = value.get();
try {
contents = decompress(Base64.getMimeDecoder().decode(contents));
} catch (IOException e) {
log.error("Error decompressing Base64 encoded GZIPInputStream", e);
} catch (Exception e) {
// Thrown when data is not Base64 encoded. Try GZIP
try {
contents = decompress(contents);
} catch (IOException ioe) {
log.error("Error decompressing GZIPInputStream", e);
}
}

contents = decodeAndDecompressContent(contents);
c.setContents(contents);
}

Expand All @@ -66,6 +54,22 @@ public static ContentKeyValue parse(Key key, Value value, Authorizations auths,
return c;
}

public static byte[] decodeAndDecompressContent(byte[] contents){
try {
contents = decompress(Base64.getMimeDecoder().decode(contents));
} catch (IOException e) {
log.error("Error decompressing Base64 encoded GZIPInputStream", e);
} catch (Exception e) {
// Thrown when data is not Base64 encoded. Try GZIP
try {
contents = decompress(contents);
} catch (IOException ioe) {
log.error("Error decompressing GZIPInputStream", e);
}
}
return contents;
}

private static boolean isCompressed(byte[] compressed) {
return (compressed[0] == (byte) (GZIPInputStream.GZIP_MAGIC)) && (compressed[1] == (byte) (GZIPInputStream.GZIP_MAGIC >> 8));
}
Expand Down
Loading

0 comments on commit faf7544

Please sign in to comment.