Skip to content

Commit

Permalink
Merge branch 'integration' into feature/delay-ivarators-based-on-card…
Browse files Browse the repository at this point in the history
…inality
  • Loading branch information
apmoriarty committed Dec 3, 2024
2 parents 446845b + 3b31c16 commit cbd343b
Show file tree
Hide file tree
Showing 59 changed files with 1,104 additions and 193 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ function datawaveIngestWikipedia() {
[ ! -f "${wikipediaRawFile}" ] && error "File not found: ${wikipediaRawFile}" && return 1

local wikipediaHdfsFile="${DW_DATAWAVE_INGEST_HDFS_BASEDIR}/$( basename ${wikipediaRawFile} )"
local putFileCommand="hdfs dfs -copyFromLocal ${wikipediaRawFile} ${wikipediaHdfsFile}"
local putFileCommand="hdfs dfs -copyFromLocal -f ${wikipediaRawFile} ${wikipediaHdfsFile}"

local inputFormat="datawave.ingest.wikipedia.WikipediaEventInputFormat"
local jobCommand="${DW_DATAWAVE_INGEST_HOME}/bin/ingest/live-ingest.sh ${wikipediaHdfsFile} ${DW_DATAWAVE_INGEST_NUM_SHARDS} -inputFormat ${inputFormat} -data.name.override=wikipedia ${extraOpts}"
Expand All @@ -211,7 +211,7 @@ function datawaveIngestCsv() {
[ ! -f "${csvRawFile}" ] && error "File not found: ${csvRawFile}" && return 1

local csvHdfsFile="${DW_DATAWAVE_INGEST_HDFS_BASEDIR}/$( basename ${csvRawFile} )"
local putFileCommand="hdfs dfs -copyFromLocal ${csvRawFile} ${csvHdfsFile}"
local putFileCommand="hdfs dfs -copyFromLocal -f ${csvRawFile} ${csvHdfsFile}"

local inputFormat="datawave.ingest.csv.mr.input.CSVFileInputFormat"
local jobCommand="${DW_DATAWAVE_INGEST_HOME}/bin/ingest/live-ingest.sh ${csvHdfsFile} ${DW_DATAWAVE_INGEST_NUM_SHARDS} -inputFormat ${inputFormat} -data.name.override=mycsv ${extraOpts}"
Expand All @@ -232,7 +232,7 @@ function datawaveIngestJson() {
[ ! -f "${jsonRawFile}" ] && error "File not found: ${jsonRawFile}" && return 1

local jsonHdfsFile="${DW_DATAWAVE_INGEST_HDFS_BASEDIR}/$( basename ${jsonRawFile} )"
local putFileCommand="hdfs dfs -copyFromLocal ${jsonRawFile} ${jsonHdfsFile}"
local putFileCommand="hdfs dfs -copyFromLocal -f ${jsonRawFile} ${jsonHdfsFile}"

local inputFormat="datawave.ingest.json.mr.input.JsonInputFormat"
local jobCommand="${DW_DATAWAVE_INGEST_HOME}/bin/ingest/live-ingest.sh ${jsonHdfsFile} ${DW_DATAWAVE_INGEST_NUM_SHARDS} -inputFormat ${inputFormat} -data.name.override=myjson ${extraOpts}"
Expand Down Expand Up @@ -347,3 +347,10 @@ function datawaveIngestTarballName() {
local dwVersion="$(getDataWaveVersion)"
echo "$( basename "${DW_DATAWAVE_INGEST_TARBALL/-\*-/-$dwVersion-}" )"
}

function datawaveIngestExamples() {
datawaveIngestWikipedia ${DW_DATAWAVE_INGEST_TEST_FILE_WIKI}
datawaveIngestJson ${DW_DATAWAVE_INGEST_TEST_FILE_JSON}
datawaveIngestCsv ${DW_DATAWAVE_INGEST_TEST_FILE_CSV}
}

Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,6 @@ function initializeDatawaveTables() {
fi
}

function ingestExampleData() {
# Ingest some canned, example data files
datawaveIngestWikipedia "${DW_DATAWAVE_INGEST_TEST_FILE_WIKI}"
datawaveIngestJson "${DW_DATAWAVE_INGEST_TEST_FILE_JSON}"
datawaveIngestCsv "${DW_DATAWAVE_INGEST_TEST_FILE_CSV}"
}


initializeDatawaveTables

Expand All @@ -186,4 +179,4 @@ info "See \$DW_CLOUD_HOME/bin/services/datawave/bootstrap-ingest.sh to view/edit

# Ingest raw data examples, if appropriate...

[ "${DW_REDEPLOY_IN_PROGRESS}" != true ] && [ "${DW_DATAWAVE_INGEST_TEST_SKIP}" == false ] && ingestExampleData
[ "${DW_REDEPLOY_IN_PROGRESS}" != true ] && [ "${DW_DATAWAVE_INGEST_TEST_SKIP}" == false ] && datawaveIngestExamples
6 changes: 3 additions & 3 deletions contrib/datawave-quickstart/bin/services/hadoop/bootstrap.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ dfs.replication 1"

DW_HADOOP_MR_HEAPDUMP_DIR="${DW_CLOUD_DATA}/heapdumps"
# mapred-site.xml (Format: <property-name><space><property-value>{<newline>})
DW_HADOOP_MAPRED_SITE_CONF="mapreduce.jobhistory.address http://${DW_BIND_HOST}:8020
mapreduce.jobhistory.webapp.address http://${DW_BIND_HOST}:8021
DW_HADOOP_MAPRED_SITE_CONF="mapreduce.jobhistory.address ${DW_BIND_HOST}:8020
mapreduce.jobhistory.webapp.address ${DW_BIND_HOST}:8021
mapreduce.jobhistory.intermediate-done-dir ${DW_HADOOP_MR_INTER_DIR}
mapreduce.jobhistory.done-dir ${DW_HADOOP_MR_DONE_DIR}
mapreduce.map.memory.mb 2048
Expand All @@ -72,7 +72,7 @@ yarn.nodemanager.pmem-check-enabled false
yarn.nodemanager.vmem-check-enabled false
yarn.nodemanager.resource.memory-mb 6144
yarn.app.mapreduce.am.resource.mb 1024
yarn.log.server.url http://localhost:8070/jobhistory/logs"
yarn.log.server.url http://localhost:8021/jobhistory/logs"

# capacity-scheduler.xml (Format: <property-name><space><property-value>{<newline>})
DW_HADOOP_CAPACITY_SCHEDULER_CONF="yarn.scheduler.capacity.maximum-applications 10000
Expand Down
2 changes: 1 addition & 1 deletion contrib/datawave-quickstart/docker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -500,4 +500,4 @@
</build>
</profile>
</profiles>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import datawave.ingest.util.cache.watch.FileRuleCacheValue;
import datawave.iterators.filter.ageoff.AgeOffPeriod;
import datawave.iterators.filter.ageoff.AppliedRule;
import datawave.iterators.filter.ageoff.FilterRule;
import datawave.util.CompositeTimestamp;

/**
* This class provides a subclass of the {@code org.apache.accumulo.core.iterators.Filter} class and implements the {@code Option Describer} interface. It
Expand Down Expand Up @@ -167,7 +169,7 @@ public boolean accept(Key k, Value v) {
return true;

// short circuit check
long timeStamp = k.getTimestamp();
long timeStamp = CompositeTimestamp.getAgeOffDate(k.getTimestamp());
if (timeStamp > this.shortCircuitDateMillis)
return true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import datawave.iterators.filter.ageoff.AgeOffPeriod;
import datawave.iterators.filter.ageoff.AppliedRule;
import datawave.iterators.filter.ageoff.FilterOptions;
import datawave.util.CompositeTimestamp;

/**
* This class provides an abstract base class to be extended to filter based on matching a REGEX to the {@code String} object that represents some portion of a
Expand Down Expand Up @@ -69,7 +70,7 @@ public boolean accept(AgeOffPeriod period, Key k, Value v) {
String keyField = getKeyField(k, v);
Matcher matcher = pattern.matcher(keyField);
if (matcher.find()) {
long timeStamp = k.getTimestamp();
long timeStamp = CompositeTimestamp.getAgeOffDate(k.getTimestamp());
dtFlag = timeStamp > period.getCutOffMilliseconds();
if (log.isTraceEnabled()) {
log.trace("timeStamp = " + timeStamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import datawave.iterators.filter.ageoff.AgeOffPeriod;
import datawave.iterators.filter.ageoff.AppliedRule;
import datawave.iterators.filter.ageoff.FilterOptions;
import datawave.util.CompositeTimestamp;
import datawave.util.StringUtils;

/**
Expand Down Expand Up @@ -69,7 +70,7 @@ public boolean accept(AgeOffPeriod period, Key k, Value v) {
dtFlag = true;
} else {
if (hasToken(k, v, patternBytes)) {
long timeStamp = k.getTimestamp();
long timeStamp = CompositeTimestamp.getAgeOffDate(k.getTimestamp());
dtFlag = timeStamp > period.getCutOffMilliseconds();
if (log.isTraceEnabled()) {
log.trace("timeStamp = " + timeStamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import datawave.iterators.filter.ageoff.AgeOffPeriod;
import datawave.iterators.filter.ageoff.AppliedRule;
import datawave.iterators.filter.ageoff.FilterOptions;
import datawave.util.CompositeTimestamp;

/**
* TokenizingAgeoffFilter cuts a field into tokens (splitting at a specified set of delimiters), and makes ageoff decisions based on whether or not any of the
Expand Down Expand Up @@ -119,7 +120,7 @@ public boolean accept(AgeOffPeriod period, Key k, Value V) {
cutoffTimestamp -= calculatedTTL;
}
ruleApplied = true;
return k.getTimestamp() > cutoffTimestamp;
return CompositeTimestamp.getAgeOffDate(k.getTimestamp()) > cutoffTimestamp;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.collect.Sets;

import datawave.iterators.filter.AgeOffConfigParams;
import datawave.util.CompositeTimestamp;

/**
* Data type age off filter. Traverses through indexed tables
Expand Down Expand Up @@ -209,11 +210,11 @@ public boolean accept(AgeOffPeriod period, Key k, Value v) {
if (dataTypeCutoff == null) {
if (defaultCutoffTime >= 0) {
ruleApplied = true;
accept = k.getTimestamp() > defaultCutoffTime;
accept = CompositeTimestamp.getAgeOffDate(k.getTimestamp()) > defaultCutoffTime;
}
} else {
ruleApplied = true;
accept = k.getTimestamp() > dataTypeCutoff;
accept = CompositeTimestamp.getAgeOffDate(k.getTimestamp()) > dataTypeCutoff;
}
// after age-off is applied check, if we are accepting this KeyValue and this is a Scan on a dataType which only accepts on timestamp
// only continue to accept the KeyValue if the timestamp for the dataType matches what is configured
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import datawave.iterators.filter.AgeOffConfigParams;
import datawave.iterators.filter.ColumnVisibilityOrFilter;
import datawave.util.CompositeTimestamp;

/**
* Field age off filter. Traverses through indexed tables and non-indexed tables. Example follows. Note that any field TTL will follow the same units specified
Expand Down Expand Up @@ -208,7 +209,7 @@ public boolean accept(AgeOffPeriod period, Key k, Value v) {
Long dataTypeCutoff = (fieldTimes.containsKey(field)) ? fieldTimes.get(field) : null;
if (dataTypeCutoff != null) {
ruleApplied = true;
return k.getTimestamp() > dataTypeCutoff;
return CompositeTimestamp.getAgeOffDate(k.getTimestamp()) > dataTypeCutoff;
}

return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.apache.log4j.Logger;

import datawave.iterators.filter.AgeOffConfigParams;
import datawave.util.CompositeTimestamp;

/**
* Data type age off filter. Traverses through indexed tables
Expand Down Expand Up @@ -59,7 +60,11 @@ public boolean accept(AgeOffPeriod period, Key k, Value v) {

// this rule determines whether to accept / deny (ageoff) a K/V
// based solely on whether a timestamp is before (older than) the cutoff for aging off
return k.getTimestamp() > period.getCutOffMilliseconds();
if (CompositeTimestamp.getAgeOffDate(k.getTimestamp()) > period.getCutOffMilliseconds()) {
return true;
} else {
return false;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import datawave.iterators.filter.ageoff.AgeOffPeriod;
import datawave.iterators.filter.ageoff.AppliedRule;
import datawave.iterators.filter.ageoff.FilterOptions;
import datawave.util.CompositeTimestamp;
import datawave.util.StringUtils;

/**
Expand Down Expand Up @@ -108,7 +109,7 @@ public boolean accept(AgeOffPeriod ageOffPeriod, Key k, Value V) {
cutOff -= timeToLive;
}
this.filterRuleApplied = true;
return k.getTimestamp() > cutOff;
return CompositeTimestamp.getAgeOffDate(k.getTimestamp()) > cutOff;
}
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import datawave.iterators.filter.ageoff.ConfigurableIteratorEnvironment;
import datawave.iterators.filter.ageoff.FilterOptions;
import datawave.query.iterator.SortedListKeyValueIterator;
import datawave.util.CompositeTimestamp;

public class ConfigurableAgeOffFilterTest {

Expand Down Expand Up @@ -138,6 +139,9 @@ public void testAcceptKeyValue_TtlSet() throws Exception {
// copy cofigs to actual filter we are testing
filter.initialize(wrapper);

long tomorrow = System.currentTimeMillis() + CompositeTimestamp.MILLIS_PER_DAY;
long compositeTS = CompositeTimestamp.getCompositeTimeStamp(daysAgo(365), tomorrow);

// brand new key should be good
assertThat(filter.accept(new Key(), VALUE), is(true));
// first five will hit the ttl short circuit
Expand All @@ -155,6 +159,8 @@ public void testAcceptKeyValue_TtlSet() throws Exception {
assertThat(filter.accept(getKey("foo", daysAgo(8)), VALUE), is(true));
// this is really old and matches so should not be accepted
assertThat(filter.accept(getKey("foo", daysAgo(365)), VALUE), is(false));
// this is really old and matches, but has a future age off date, so should be accepted
assertThat(filter.accept(getKey("foo", compositeTS), VALUE), is(true));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import datawave.iterators.filter.AgeOffConfigParams;
import datawave.iterators.filter.AgeOffTtlUnits;
import datawave.util.CompositeTimestamp;

public class FieldAgeOffFilterTest {
private static final String VISIBILITY_PATTERN = "MY_VIS";
Expand Down Expand Up @@ -84,6 +85,7 @@ public void testIndexTrueUsesDefaultWhenFieldLacksTtl() {
Key key = new Key("1234", "field_z\\x00my-uuid", "field_z\u0000value", VISIBILITY_PATTERN, tenSecondsAgo);
Assert.assertFalse(ageOffFilter.accept(filterOptions.getAgeOffPeriod(System.currentTimeMillis()), key, new Value()));
Assert.assertTrue(ageOffFilter.isFilterRuleApplied());

key = new Key("1234", "field_y", "field_y\u0000value", VISIBILITY_PATTERN, tenSecondsAgo);
Assert.assertFalse(ageOffFilter.accept(filterOptions.getAgeOffPeriod(System.currentTimeMillis()), key, new Value()));
Assert.assertTrue(ageOffFilter.isFilterRuleApplied());
Expand Down Expand Up @@ -236,6 +238,39 @@ public void testIgnoresDocument() {
Assert.assertFalse(ageOffFilter.isFilterRuleApplied());
}

@Test
public void testCompositeTimestamp() {
EditableAccumuloConfiguration conf = new EditableAccumuloConfiguration(DefaultConfiguration.getInstance());
conf.put("table.custom.isindextable", "true");
iterEnv.setConf(conf);

long tenSecondsAgo = System.currentTimeMillis() - (10L * ONE_SEC);
long tomorrow = System.currentTimeMillis() + CompositeTimestamp.MILLIS_PER_DAY;

long compositeTS = CompositeTimestamp.getCompositeTimeStamp(tenSecondsAgo, tomorrow);

FieldAgeOffFilter ageOffFilter = new FieldAgeOffFilter();
FilterOptions filterOptions = createFilterOptionsWithPattern();
// set the default to 5 seconds
filterOptions.setTTL(5L);
filterOptions.setTTLUnits(AgeOffTtlUnits.SECONDS);
// set up ttls for field_y and field_z only, deliberately exclude the ttl for field_y
filterOptions.setOption("fields", "field_y");
filterOptions.setOption("field_y.ttl", "2"); // 2 seconds
ageOffFilter.init(filterOptions, iterEnv);

// age off date allows this to accept
Key key = new Key("1234", "field_y", "field_y\u0000value", VISIBILITY_PATTERN, compositeTS);
Assert.assertTrue(ageOffFilter.accept(filterOptions.getAgeOffPeriod(System.currentTimeMillis()), key, new Value()));
Assert.assertTrue(ageOffFilter.isFilterRuleApplied());

// vanilla date does not
key = new Key("1234", "field_y", "field_y\u0000value", VISIBILITY_PATTERN, tenSecondsAgo);
Assert.assertFalse(ageOffFilter.accept(filterOptions.getAgeOffPeriod(System.currentTimeMillis()), key, new Value()));
Assert.assertTrue(ageOffFilter.isFilterRuleApplied());

}

@Test
public void testKeepsMatchBeforeTtl() {
long oneSecondAgo = System.currentTimeMillis() - (1 * ONE_SEC);
Expand Down
Loading

0 comments on commit cbd343b

Please sign in to comment.