Skip to content

Commit

Permalink
Add support for splitting rollup queries (#1853)
Browse files Browse the repository at this point in the history
* Add an SLA config flag for rollup intervals

Adds a configuration option for rollup intervals to specify their
maximum acceptable delay. Queries that cover a time between now and that
maximum delay will need to query other tables for that time interval.

* Add global config flag to enable splitting queries

Adds a global config flag to enable splitting queries that would hit the
rollup table, but the rollup table has a delay SLA configured.
In that case, this feature allows splitting a query into to; one that
gets the data from the rollups table until the time where it's
guaranteed to be available, and the rest from the raw table.

* Add a new SplitRollupQuery

Adds a SplitRollupQuery class that suports splitting a rollup query into
two separate queries.
This is useful for when a rollup table is filled by e.g. a batch job
that processes the data from the previous day on a daily basis. Rollup
data for yesterday will then only be available some time today. This
delay SLA can be configured on a per-table basis. The delay would
specify by how much time the table can be behind real time.

If a query comes in that would query data from that blackout period
where data is only available in the raw table, but not yet guaranteed to
be in the rollup table, the incoming query can be split into two using
the SplitRollupQuery class. It wraps a query that queries the rollup
table until the last guaranteed to be available timestamp based on the
SLA; and one that gets the remaining data from the raw table.

* Extract an AbstractQuery

Extracts an AbstractQuery from the TsdbQuery implementation since we'd
like to reuse some parts of it in other Query classes (in this case
SplitRollupQuery)

* Extract an AbstractSpanGroup

* Avoid NullPointerException when setting start time

Avoids a NullPointerException that happened when we were trying to set
the start time on a query that would be eligible to split, but due to
the SLA config only hit the raw table anyway.

* Scale timestamps to milliseconds for split queries

Scales all timestamps for split queries to milliseconds. It's important
to maintain consistent units between all the partial queries that make
up the bigger one.

* Fix starting time error for split queries

Fixes a bug that would happen when the start time of a query aligns
perfectly with the time configured in the SLA for the delay of a rollup
table.
For a defined SLA, e.g. 1 day, if the start time of the query was
exactly 1 day ago, the end time of the rollups part of the query would
be updated and then be equal to its start time. That isn't allowed and
causes a query exception.
  • Loading branch information
muffix authored and johann8384 committed Dec 12, 2024
1 parent 08ea306 commit 93867ca
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 54 deletions.
2 changes: 1 addition & 1 deletion src/core/SplitRollupQuery.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// This file is part of OpenTSDB.
// Copyright (C) 2021 The OpenTSDB Authors.
// Copyright (C) 2010-2021 The OpenTSDB Authors.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License as published by
Expand Down
3 changes: 1 addition & 2 deletions src/core/SplitRollupSpanGroup.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// This file is part of OpenTSDB.
// Copyright (C) 2021 The OpenTSDB Authors.
// Copyright (C) 2012-2021 The OpenTSDB Authors.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License as published by
Expand Down Expand Up @@ -119,7 +119,6 @@ public Map<String, String> call(ArrayList<Map<String, String>> resolvedTags) thr
*/
@Override
public Bytes.ByteMap<byte[]> getTagUids() {
Bytes.ByteMap<byte[]> tagUids = new Bytes.ByteMap<byte[]>();

for (SpanGroup group : spanGroups) {
tagUids.putAll(group.getTagUids());
Expand Down
100 changes: 50 additions & 50 deletions src/core/TsdbQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,22 +98,22 @@ final class TsdbQuery extends AbstractQuery {

/** End time (UNIX timestamp in seconds) on 32 bits ("unsigned" int). */
private long end_time = UNSET;

/** Whether or not to delete the queried data */
private boolean delete;

/** ID of the metric being looked up. */
private byte[] metric;

/** Row key regex to pass to HBase if we have tags or TSUIDs */
private String regex;

/** Whether or not to enable the fuzzy row filter for Hbase */
private boolean enable_fuzzy_filter;

/** Whether or not the user wants to use the fuzzy filter */
private boolean override_fuzzy_filter;

/**
* Tags by which we must group the results.
* Each element is a tag ID.
Expand All @@ -132,7 +132,7 @@ final class TsdbQuery extends AbstractQuery {

/** Specifies the various options for rate calculations */
private RateOptions rate_options;

/** Aggregator function to use. */
private Aggregator aggregator;

Expand All @@ -141,55 +141,55 @@ final class TsdbQuery extends AbstractQuery {

/** Rollup interval and aggregator, null if not applicable. */
private RollupQuery rollup_query;

/** Map of RollupInterval objects in the order of next best match
* like 1d, 1h, 10m, 1m, for rollup of 1d. */
private List<RollupInterval> best_match_rollups;

/** How to use the rollup data */
private ROLLUP_USAGE rollup_usage = ROLLUP_USAGE.ROLLUP_NOFALLBACK;
/** Search the query on pre-aggregated table directly instead of post fetch

/** Search the query on pre-aggregated table directly instead of post fetch
* aggregation. */
private boolean pre_aggregate;

/** Optional list of TSUIDs to fetch and aggregate instead of a metric */
private List<String> tsuids;

/** An index that links this query to the original sub query */
private int query_index;

/** Tag value filters to apply post scan */
private List<TagVFilter> filters;

/** An object for storing stats in regarding the query. May be null */
private QueryStats query_stats;

/** Whether or not to match series with ONLY the given tags */
private boolean explicit_tags;

private List<Float> percentiles;

private boolean show_histogram_buckets;

/** Set at filter resolution time to determine if we can use multi-gets */
private boolean use_multi_gets;

/** Set by the user if they want to bypass multi-gets */
private boolean override_multi_get;

/** Whether or not to use the search plugin for multi-get resolution. */
private boolean multiget_with_search;

/** Whether or not to fall back on query failure. */
private boolean search_query_failure;

/** The maximum number of bytes allowed per query. */
private long max_bytes = 0;

/** The maximum number of data points allowed per query. */
private long max_data_points = 0;

/**
* Enum for rollup fallback control.
* @since 2.4
Expand All @@ -199,15 +199,15 @@ public static enum ROLLUP_USAGE {
ROLLUP_NOFALLBACK, //Use rollup data, and don't fallback on no data
ROLLUP_FALLBACK, //Use rollup data and fallback to next best match on data
ROLLUP_FALLBACK_RAW; //Use rollup data and fallback to raw on no data

/**
* Parse and transform a string to ROLLUP_USAGE object
* @param str String to be parsed
* @return enum param tells how to use the rollup data
*/
public static ROLLUP_USAGE parse(String str) {
ROLLUP_USAGE def = ROLLUP_NOFALLBACK;

if (str != null) {
try {
def = ROLLUP_USAGE.valueOf(str.toUpperCase());
Expand All @@ -217,10 +217,10 @@ public static ROLLUP_USAGE parse(String str) {
+ "uses raw data but don't fallback on no data");
}
}

return def;
}

/**
* Whether to fallback to next best match or raw
* @return true means fall back else false
Expand Down Expand Up @@ -249,15 +249,15 @@ public String getRollupTable() {
return "raw";
}
}
/** Search the query on pre-aggregated table directly instead of post fetch
* aggregation.
* @since 2.4

/** Search the query on pre-aggregated table directly instead of post fetch
* aggregation.
* @since 2.4
*/
public boolean isPreAggregate() {
return this.pre_aggregate;
}

/**
* Sets the start time for the query
* @param timestamp Unix epoch timestamp in seconds or milliseconds
Expand All @@ -266,7 +266,7 @@ public boolean isPreAggregate() {
*/
@Override
public void setStartTime(final long timestamp) {
if (timestamp < 0 || ((timestamp & Const.SECOND_MASK) != 0 &&
if (timestamp < 0 || ((timestamp & Const.SECOND_MASK) != 0 &&
timestamp > 9999999999999L)) {
throw new IllegalArgumentException("Invalid timestamp: " + timestamp);
} else if (end_time != UNSET && timestamp > getEndTime()) {
Expand Down Expand Up @@ -1464,8 +1464,8 @@ protected Scanner getScanner(final int salt_bucket) throws HBaseException {
final Scanner scanner = QueryUtil.getMetricScanner(tsdb, salt_bucket, metric,
(int) getScanStartTimeSeconds(), end_time == UNSET
? -1 // Will scan until the end (0xFFF...).
: (int) getScanEndTimeSeconds(),
tableToBeScanned(),
: (int) getScanEndTimeSeconds(),
tableToBeScanned(),
TSDB.FAMILY());
if(tsdb.getConfig().use_otsdb_timestamp()) {
long stTime = (getScanStartTimeSeconds() * 1000);
Expand Down Expand Up @@ -1498,7 +1498,7 @@ protected Scanner getScanner(final int salt_bucket) throws HBaseException {
new BinaryPrefixComparator(rollup_query.getRollupAgg().toString()
.getBytes(Const.ASCII_CHARSET))));
rollup_filters.add(new QualifierFilter(CompareFilter.CompareOp.EQUAL,
new BinaryPrefixComparator(new byte[] {
new BinaryPrefixComparator(new byte[] {
(byte) tsdb.getRollupConfig().getIdForAggregator(
rollup_query.getRollupAgg().toString())
})));
Expand All @@ -1510,7 +1510,7 @@ protected Scanner getScanner(final int salt_bucket) throws HBaseException {
new BinaryPrefixComparator(rollup_query.getRollupAgg().toString()
.getBytes(Const.ASCII_CHARSET))));
filters.add(new QualifierFilter(CompareFilter.CompareOp.EQUAL,
new BinaryPrefixComparator(new byte[] {
new BinaryPrefixComparator(new byte[] {
(byte) tsdb.getRollupConfig().getIdForAggregator(
rollup_query.getRollupAgg().toString())
})));
Expand All @@ -1527,10 +1527,10 @@ protected Scanner getScanner(final int salt_bucket) throws HBaseException {
(byte) tsdb.getRollupConfig().getIdForAggregator("sum")
})));
filters.add(new QualifierFilter(CompareFilter.CompareOp.EQUAL,
new BinaryPrefixComparator(new byte[] {
new BinaryPrefixComparator(new byte[] {
(byte) tsdb.getRollupConfig().getIdForAggregator("count")
})));

if (existing != null) {
final List<ScanFilter> combined = new ArrayList<ScanFilter>(2);
combined.add(existing);
Expand All @@ -1545,14 +1545,14 @@ protected Scanner getScanner(final int salt_bucket) throws HBaseException {
}

/**
* Identify the table to be scanned based on the roll up and pre-aggregate
* Identify the table to be scanned based on the roll up and pre-aggregate
* query parameters
* @return table name as byte array
* @since 2.4
*/
private byte[] tableToBeScanned() {
final byte[] tableName;

if (RollupQuery.isValidQuery(rollup_query)) {
if (pre_aggregate) {
tableName= rollup_query.getRollupInterval().getGroupbyTable();
Expand All @@ -1567,10 +1567,10 @@ else if (pre_aggregate) {
else {
tableName = tsdb.dataTable();
}

return tableName;
}

/** Returns the UNIX timestamp from which we must start scanning. */
long getScanStartTimeSeconds() {
// Begin with the raw query start time.
Expand All @@ -1580,15 +1580,15 @@ long getScanStartTimeSeconds() {
if ((start & Const.SECOND_MASK) != 0L) {
start /= 1000L;
}

// if we have a rollup query, we have different row key start times so find
// the base time from which we need to search
if (rollup_query != null) {
long base_time = RollupUtils.getRollupBasetime(start,
long base_time = RollupUtils.getRollupBasetime(start,
rollup_query.getRollupInterval());
if (rate) {
// scan one row back so we can get the first rate value.
base_time = RollupUtils.getRollupBasetime(base_time - 1,
base_time = RollupUtils.getRollupBasetime(base_time - 1,
rollup_query.getRollupInterval());
}
return base_time;
Expand Down Expand Up @@ -1627,11 +1627,11 @@ long getScanEndTimeSeconds() {
end++;
}
}

if (rollup_query != null) {
return RollupUtils.getRollupBasetime(end +
(rollup_query.getRollupInterval().getIntervalSeconds() *
rollup_query.getRollupInterval().getIntervals()),
return RollupUtils.getRollupBasetime(end +
(rollup_query.getRollupInterval().getIntervalSeconds() *
rollup_query.getRollupInterval().getIntervals()),
rollup_query.getRollupInterval());
}

Expand Down
2 changes: 1 addition & 1 deletion test/core/TestSplitRollupQuery.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// This file is part of OpenTSDB.
// Copyright (C) 2021 The OpenTSDB Authors.
// Copyright (C) 2012-2021 The OpenTSDB Authors.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License as published by
Expand Down

0 comments on commit 93867ca

Please sign in to comment.