Skip to content

Commit

Permalink
Add WritesQueryMetrics to extended class EventQueryTransformerSupport…
Browse files Browse the repository at this point in the history
… (ref: DocumentTransformSupport.java)
  • Loading branch information
SethSmucker committed Dec 5, 2024
1 parent 95cd81e commit 7d8a6c0
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import datawave.marking.MarkingFunctions;

public abstract class BaseQueryLogicTransformer<I,O> extends AbstractQueryLogicTransformer<I,O> implements QueryLogicTransformer<I,O> {
public abstract class BaseQueryLogicTransformer<I,O> extends AbstractQueryLogicTransformer<I,O> implements QueryLogicTransformer<I,O> {

protected MarkingFunctions markingFunctions;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.Map.Entry;
import java.util.Set;

import datawave.core.query.logic.QueryLogicTransformer;
import datawave.core.query.logic.WritesQueryMetrics;
import datawave.microservice.querymetric.BaseQueryMetric;
import org.apache.accumulo.core.data.Key;
Expand Down Expand Up @@ -152,49 +153,4 @@ public EventBase transform(Entry<Key,Value> entry) {

return event;
}

@Override
public void writeQueryMetrics(BaseQueryMetric metric) {

}

@Override
public boolean hasMetrics() {
return false;
}

@Override
public long getSourceCount() {
return 0;
}

@Override
public long getNextCount() {
return 0;
}

@Override
public long getSeekCount() {
return 0;
}

@Override
public long getYieldCount() {
return 0;
}

@Override
public long getDocRanges() {
return 0;
}

@Override
public long getFiRanges() {
return 0;
}

@Override
public void resetMetrics() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@
import java.util.Set;
import java.util.TreeSet;

import datawave.core.query.exception.EmptyObjectException;
import datawave.core.query.logic.WritesQueryMetrics;
import datawave.microservice.querymetric.BaseQueryMetric;
import datawave.query.attributes.Attribute;
import datawave.query.attributes.Document;
import datawave.query.attributes.TimingMetadata;
import datawave.query.function.LogTiming;
import datawave.query.iterator.QueryOptions;
import datawave.query.iterator.profile.QuerySpan;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
Expand All @@ -35,7 +44,7 @@
import datawave.webservice.result.BaseQueryResponse;
import datawave.webservice.result.EventQueryResponseBase;

public abstract class EventQueryTransformerSupport<I,O> extends BaseQueryLogicTransformer<I,O> implements CacheableLogic {
public abstract class EventQueryTransformerSupport<I,O> extends BaseQueryLogicTransformer<I,O> implements CacheableLogic, WritesQueryMetrics {

protected EventFields eventFields = new EventFields();

Expand All @@ -57,12 +66,24 @@ public abstract class EventQueryTransformerSupport<I,O> extends BaseQueryLogicTr
protected String tableName;
protected ResponseObjectFactory responseObjectFactory;

private long sourceCount = 0;
private long nextCount = 0;
private long seekCount = 0;
private long yieldCount = 0L;
private long docRanges = 0;
private long fiRanges = 0;
private boolean logTimingDetails = false;

public EventQueryTransformerSupport(String tableName, Query settings, MarkingFunctions markingFunctions, ResponseObjectFactory responseObjectFactory) {
super(markingFunctions);
this.settings = settings;
this.auths = new Authorizations(settings.getQueryAuthorizations().split(","));
this.tableName = tableName;
this.responseObjectFactory = responseObjectFactory;
String logTimingDetailsStr = settings.findParameter(QueryOptions.LOG_TIMING_DETAILS).getParameterValue().trim();
if (org.apache.commons.lang.StringUtils.isNotBlank(logTimingDetailsStr)) {
logTimingDetails = Boolean.parseBoolean(logTimingDetailsStr);
}
}

public EventQueryTransformerSupport(BaseQueryLogic<Entry<Key,Value>> logic, Query settings, MarkingFunctions markingFunctions,
Expand Down Expand Up @@ -216,4 +237,104 @@ public void setQm(QueryModel qm) {
this.qm = qm;
}

protected void extractMetrics(Document document, Key documentKey) {

Map<String, Attribute<? extends Comparable<?>>> dictionary = document.getDictionary();
Attribute<? extends Comparable<?>> timingMetadataAttribute = dictionary.get(LogTiming.TIMING_METADATA);
if (timingMetadataAttribute != null && timingMetadataAttribute instanceof TimingMetadata) {
TimingMetadata timingMetadata = (TimingMetadata) timingMetadataAttribute;
long currentSourceCount = timingMetadata.getSourceCount();
long currentNextCount = timingMetadata.getNextCount();
long currentSeekCount = timingMetadata.getSeekCount();
long currentYieldCount = timingMetadata.getYieldCount();
String host = timingMetadata.getHost();
sourceCount += currentSourceCount;
nextCount += currentNextCount;
seekCount += currentSeekCount;
yieldCount += currentYieldCount;
Map<String,Long> stageTimers = timingMetadata.getStageTimers();
if (stageTimers.containsKey(QuerySpan.Stage.DocumentSpecificTree.toString())) {
docRanges++;
} else if (stageTimers.containsKey(QuerySpan.Stage.FieldIndexTree.toString())) {
fiRanges++;
}

if (logTimingDetails || log.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("retrieved document from host:").append(host).append(" at key:").append(documentKey.toStringNoTime()).append(" stageTimers:")
.append(stageTimers);
sb.append(" sourceCount:").append(currentSourceCount).append(" nextCount:").append(currentNextCount).append(" seekCount:")
.append(currentSeekCount).append(" yieldCount:").append(currentYieldCount);
if (log.isTraceEnabled()) {
log.trace(sb.toString());
} else {
log.info(sb.toString());
}
}
if (dictionary.size() == 1) {
// this document contained only timing metadata
throw new EmptyObjectException();
}
}
}

@Override
public boolean hasMetrics() {
return sourceCount + nextCount + seekCount + yieldCount + docRanges + fiRanges > 0;
}

@Override
public long getSourceCount() {
return sourceCount;
}

@Override
public long getNextCount() {
return nextCount;
}

@Override
public long getSeekCount() {
return seekCount;
}

@Override
public long getYieldCount() {
return yieldCount;
}

@Override
public long getDocRanges() {
return docRanges;
}

@Override
public long getFiRanges() {
return fiRanges;
}

@Override
public void writeQueryMetrics(BaseQueryMetric metric) {

// if any timing details have been returned, add metrics
if (hasMetrics()) {
metric.setSourceCount(sourceCount);
metric.setNextCount(nextCount);
metric.setSeekCount(seekCount);
metric.setYieldCount(yieldCount);
metric.setDocRanges(docRanges);
metric.setFiRanges(fiRanges);
}
}

@Override
public void resetMetrics() {
sourceCount = 0;
nextCount = 0;
seekCount = 0;
yieldCount = 0;
docRanges = 0;
fiRanges = 0;
}

}

0 comments on commit 7d8a6c0

Please sign in to comment.