Skip to content

Commit

Permalink
Implements the chained SSDeep Discovery Query (#2242)
Browse files Browse the repository at this point in the history
  • Loading branch information
drewfarris authored Mar 15, 2024
1 parent 150e293 commit cdfbcc2
Show file tree
Hide file tree
Showing 13 changed files with 827 additions and 10 deletions.
6 changes: 6 additions & 0 deletions warehouse/assemble/datawave/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@
<groupId>gov.nsa.datawave</groupId>
<artifactId>datawave-ingest-scripts</artifactId>
</dependency>
<dependency>
<groupId>gov.nsa.datawave</groupId>
<artifactId>datawave-ingest-ssdeep</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>gov.nsa.datawave</groupId>
<artifactId>datawave-ingest-wikipedia</artifactId>
Expand Down Expand Up @@ -672,6 +677,7 @@
<include>${project.groupId}:datawave-edge-model-configuration-core</include>
<include>${project.groupId}:datawave-ingest-wikipedia</include>
<include>${project.groupId}:datawave-ingest-nyctlc</include>
<include>${project.groupId}:datawave-ingest-ssdeep</include>
<include>gov.nsa.datawave.microservice:accumulo-utils</include>
<include>gov.nsa.datawave.microservice:metadata-utils</include>
<include>gov.nsa.datawave.microservice:type-utils</include>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ DATAWAVE_INGEST_CSV_JAR=$(findJar datawave-ingest-csv)
DATAWAVE_INGEST_JSON_JAR=$(findJar datawave-ingest-json)
DATAWAVE_INGEST_WIKIPEDIA_JAR=$(findJar datawave-ingest-wikipedia)
DATAWAVE_INGEST_NYCTLC_JAR=$(findJar datawave-ingest-nyctlc)
DATAWAVE_INGEST_SSDEEP_JAR=$(findJar datawave-ingest-ssdeep)
DATAWAVE_INGEST_CORE_JAR=$(findJar datawave-ingest-core)
DATAWAVE_INGEST_CONFIG_JAR=$(findJar datawave-ingest-configuration)
DATAWAVE_COMMON_JAR=$(findJar datawave-common)
DATAWAVE_COMMON_SSDEEP_JAR=$(findJar datawave-ssdeep-common)
DATAWAVE_ACCUMULO_EXTENSIONS_JAR=$(findJar datawave-accumulo-extensions)
DATAWAVE_METRICS_CORE_JAR=$(findJar datawave-metrics-core)
DATAWAVE_METADATA_UTILS_JAR=$(findJar metadata-utils)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ CLASSPATH=${CLASSPATH}:${DATAWAVE_ACCUMULO_UTILS_JAR}
CLASSPATH=${CLASSPATH}:${INMEMORY_ACCUMULO_JAR}
CLASSPATH=${CLASSPATH}:${DATAWAVE_BASE_REST_RESPONSES_JAR}
CLASSPATH=${CLASSPATH}:${DATAWAVE_COMMON_UTILS_JAR}
CLASSPATH=${CLASSPATH}:${DATAWAVE_COMMON_SSDEEP_JAR}
CLASSPATH=${CLASSPATH}:${DATAWAVE_INDEX_STATS_JAR}
CLASSPATH=${CLASSPATH}:${DATAWAVE_INGEST_CORE_JAR}
CLASSPATH=${CLASSPATH}:${DATAWAVE_INGEST_CONFIG_JAR}
CLASSPATH=${CLASSPATH}:${DATAWAVE_INGEST_CSV_JAR}
CLASSPATH=${CLASSPATH}:${DATAWAVE_INGEST_JSON_JAR}
CLASSPATH=${CLASSPATH}:${DATAWAVE_INGEST_WIKIPEDIA_JAR}
CLASSPATH=${CLASSPATH}:${DATAWAVE_INGEST_NYCTLC_JAR}
CLASSPATH=${CLASSPATH}:${DATAWAVE_INGEST_SSDEEP_JAR}
CLASSPATH=${CLASSPATH}:${DATAWAVE_METADATA_UTILS_JAR}
CLASSPATH=${CLASSPATH}:${DATAWAVE_TYPE_UTILS_JAR}
CLASSPATH=${CLASSPATH}:${CURATOR_FRAMEWORK_JAR}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package datawave.query.tables.ssdeep;

import datawave.query.discovery.DiscoveredThing;

/**
* Captures a ssdeep query, matching ssdeep and the discovery data about that match. This class immutable once created
*/
public class DiscoveredSSDeep {
/** A scored match between two ssdeep hashes, output by the SSDeep similarity query logic */
public final ScoredSSDeepPair scoredSSDeepPair;
/** The discovered information about the matching SSDeep hash */
public final DiscoveredThing discoveredThing;

public DiscoveredSSDeep(ScoredSSDeepPair scoredSSDeepPair, DiscoveredThing discoveredThing) {
this.scoredSSDeepPair = scoredSSDeepPair;
this.discoveredThing = discoveredThing;
}

public ScoredSSDeepPair getScoredSSDeepPair() {
return scoredSSDeepPair;
}

public DiscoveredThing getDiscoveredThing() {
return discoveredThing;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package datawave.query.tables.ssdeep;

import java.util.Iterator;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.log4j.Logger;

import com.google.common.collect.Multimap;
import com.google.common.collect.TreeMultimap;

import datawave.query.discovery.DiscoveredThing;
import datawave.query.tables.chained.strategy.FullChainStrategy;
import datawave.webservice.query.Query;
import datawave.webservice.query.QueryImpl;
import datawave.webservice.query.logic.QueryLogic;

/**
* A chain strategy that is designed to first run a ssdeep similarity query and then run a subsequent discovery query for each matching ssdeep hash found by
* that similarity query. Effectively allows the user to discover information related to hashes that are similar to one or more query hashes
*/
public class FullSSDeepDiscoveryChainStrategy extends FullChainStrategy<ScoredSSDeepPair,DiscoveredSSDeep> {
private static final Logger log = Logger.getLogger(FullSSDeepDiscoveryChainStrategy.class);

private Multimap<String,ScoredSSDeepPair> scoredMatches;

@Override
protected Query buildLatterQuery(Query initialQuery, Iterator<ScoredSSDeepPair> initialQueryResults, String latterLogicName) {
log.debug("buildLatterQuery() called...");

// track the scored matches we've seen while traversing the initial query results.
// this has to be case-insensitive because the CHECKSUM_SSDEEP index entries are most likely downcased.
scoredMatches = TreeMultimap.create(String.CASE_INSENSITIVE_ORDER, ScoredSSDeepPair.NATURAL_ORDER);

String queryString = captureScoredMatchesAndBuildQuery(initialQueryResults, scoredMatches);

Query q = new QueryImpl(); // TODO, need to use a factory? don't hardcode this.
q.setQuery(queryString);
q.setId(UUID.randomUUID());
q.setPagesize(Integer.MAX_VALUE); // TODO: choose something reasonable.
q.setQueryAuthorizations(initialQuery.getQueryAuthorizations());
q.setUserDN(initialQuery.getUserDN());
return q;
}

@Override
public Iterator<DiscoveredSSDeep> runChainedQuery(AccumuloClient client, Query initialQuery, Set<Authorizations> auths,
Iterator<ScoredSSDeepPair> initialQueryResults, QueryLogic<DiscoveredSSDeep> latterQueryLogic) throws Exception {
final Iterator<DiscoveredSSDeep> it = super.runChainedQuery(client, initialQuery, auths, initialQueryResults, latterQueryLogic);

// Create a defensive copy of the score map because stream evaluation may be delayed.
final Multimap<String,ScoredSSDeepPair> localScoredMatches = TreeMultimap.create(String.CASE_INSENSITIVE_ORDER, ScoredSSDeepPair.NATURAL_ORDER);
localScoredMatches.putAll(scoredMatches);

return getEnrichedDiscoveredSSDeepIterator(it, localScoredMatches);
}

/**
*
* @param initialQueryResults
* an iterator of scored ssdeep pairs that represent the results of the initial ssdeep similarity query.
* @param scoredMatches
* used to capture the scored matches contained within the initialQueryResults
* @return the query string for the next stage of the query.
*/
public static String captureScoredMatchesAndBuildQuery(Iterator<ScoredSSDeepPair> initialQueryResults,
final Multimap<String,ScoredSSDeepPair> scoredMatches) {
// extract the matched ssdeeps from the query results and generate the discovery query.
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(initialQueryResults, Spliterator.ORDERED), false)
.filter(queryResult -> scoredMatches.put(queryResult.getMatchingHash().toString(), queryResult))
.map(queryResult -> queryResult.getMatchingHash().toString()).distinct().peek(ssdeep -> log.debug("Added new ssdeep " + ssdeep))
.map(ssdeep -> "CHECKSUM_SSDEEP:\"" + ssdeep + "\"").collect(Collectors.joining(" OR ", "", ""));
}

/**
* Given an iterator of DiscoveredSSDeep objects that have no matching query or weighted score, lookup the potential queries that returned them and the
* weighted score associated with that query and use them to produce enriched results.
*
* @param resultsIterator
* an iterator of unenrched DiscoveredSSDeep's that don't have query or score info.
* @param scoredMatches
* the colletion of matchin hashes and the original queries that lead them to be returned.
* @return an iterator of DiscoveredSSDeep's enriched with the queries that returned them.
*/
public static Iterator<DiscoveredSSDeep> getEnrichedDiscoveredSSDeepIterator(Iterator<DiscoveredSSDeep> resultsIterator,
final Multimap<String,ScoredSSDeepPair> scoredMatches) {
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(resultsIterator, Spliterator.ORDERED), false)
.flatMap(discoveredSSdeep -> enrichDiscoveredSSDeep(discoveredSSdeep, scoredMatches)).iterator();
}

/**
* Given a single discovered ssdeep, use the scoredMatches map to determine which queries it is related to. This will return zero to many new
* DiscoveredSSDeep entries for each query that the matching ssdeep hash appeared in.
*
* @param discoveredSSDeep
* the ssdeep discovery information a single matched hash
* @param scoredMatches
* the set of scored matches from the ssdeep similarity logic, used to look up score and query info for the matched hash.
* @return a stream of DiscoveredSSDeep objects that align discovery information with the original query hashes.
*/
public static Stream<DiscoveredSSDeep> enrichDiscoveredSSDeep(DiscoveredSSDeep discoveredSSDeep, final Multimap<String,ScoredSSDeepPair> scoredMatches) {
final DiscoveredThing discoveredThing = discoveredSSDeep.getDiscoveredThing();
final String term = discoveredThing.getTerm();
return scoredMatches.get(term).stream().map(scoredPair -> new DiscoveredSSDeep(scoredPair, discoveredThing));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package datawave.query.tables.ssdeep;

import java.util.Collections;
import java.util.Iterator;
import java.util.Set;

import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.log4j.Logger;

import datawave.query.tables.chained.ChainedQueryTable;
import datawave.webservice.query.Query;
import datawave.webservice.query.configuration.GenericQueryConfiguration;
import datawave.webservice.query.logic.QueryLogicTransformer;

/**
* Implements a ChainedQueryTable that will first use the SSDeepSimilarityQueryLogic to find similar hashes for a set of query hashes and then run the
* SSDeepDiscoveryQueryLogic to retrieve discovery info for those matched hashes.
*/
public class SSDeepChainedDiscoveryQueryLogic extends ChainedQueryTable<ScoredSSDeepPair,DiscoveredSSDeep> {

private static final Logger log = Logger.getLogger(SSDeepChainedDiscoveryQueryLogic.class);

private Query discoveryQuery = null;

public SSDeepChainedDiscoveryQueryLogic() {
super();
}

@SuppressWarnings("CopyConstructorMissesField")
public SSDeepChainedDiscoveryQueryLogic(SSDeepChainedDiscoveryQueryLogic other) {
super(other);
}

@Override
public void close() {
super.close();
}

public GenericQueryConfiguration initialize(AccumuloClient client, Query settings, Set<Authorizations> auths) throws Exception {
super.initialize(client, settings, auths);
this.discoveryQuery = settings.duplicate(settings.getQueryName() + "_discovery_query");

log.debug("Initial settings parameters: " + settings.getParameters().toString());
GenericQueryConfiguration config = this.logic1.initialize(client, settings, auths);
return config;
}

public void setupQuery(GenericQueryConfiguration config) throws Exception {
if (null == this.getChainStrategy()) {
final String error = "No transformed ChainStrategy provided for SSDeepChainedDiscoveryQueryLogic!";
log.error(error);
throw new RuntimeException(error);
}

log.info("Setting up ssdeep query using config");
this.logic1.setupQuery(config);

final Iterator<ScoredSSDeepPair> iter1 = this.logic1.iterator();

log.info("Running chained discovery query");
this.iterator = this.getChainStrategy().runChainedQuery(config.getClient(), this.discoveryQuery, config.getAuthorizations(), iter1, this.logic2);
}

@Override
public QueryLogicTransformer getTransformer(Query settings) {
return this.logic2.getTransformer(settings);
}

@Override
public SSDeepChainedDiscoveryQueryLogic clone() throws CloneNotSupportedException {
return new SSDeepChainedDiscoveryQueryLogic(this);
}

public Set<String> getExampleQueries() {
return Collections.emptySet();
}

}
Loading

0 comments on commit cdfbcc2

Please sign in to comment.