-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ML: Add support for rollup Indexes in Datafeeds #34654
ML: Add support for rollup Indexes in Datafeeds #34654
Conversation
Pinging @elastic/ml-core |
Jenkins retest this please |
One current issue is that doing a "rollup index ONLY" does not work...I am looking into that. Rollups don't really have timestamp fields, so unsure how to consolidate that with needing to support rollup only lookbacks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving some of the comments I had before you park this.
@@ -43,32 +44,33 @@ | |||
private static ParseField INDEX_PATTERN = new ParseField("index_pattern"); | |||
private static ParseField FIELDS = new ParseField("fields"); | |||
|
|||
private final RollupJobConfig rollupJobConfig; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is missing from serialization (both wire and xContent). Is that by design?
DataExtractorFactory.create(client, | ||
previewDatafeed.build(), | ||
job, | ||
new Auditor(client, clusterService.nodeName()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should be able to inject the auditor in the action, store it as a member and use that instead of constructing a new one.
response -> { | ||
if (response.getJobs().isEmpty()) { // This means no rollup indexes are in the config | ||
if (datafeed.hasAggregations()) { | ||
auditor.info(job.getId(), "Creating aggregated data extractor for datafeed [" + datafeed.getId() + "]"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are those messages audited? They look more suitable for logging rather than auditing. The auditor will create job notifications and the user will see those in the job messages tab in the UI. We've been keeping those messages at a higher level for operational awareness. The type of extractor seems like a technical detail the user shouldn't really be aware of. That is my take at least, but I'm open for discussion if I missed something.
|
||
GetRollupIndexCapsAction.Request request = new GetRollupIndexCapsAction.Request(datafeed.getIndices().toArray(new String[0])); | ||
|
||
ClientHelper.<GetRollupIndexCapsAction.Request, GetRollupIndexCapsAction.Response, GetRollupIndexCapsAction.RequestBuilder> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you probably don't need the type here which will make this line more readable.
if (datafeed.hasAggregations()) { // Rollup indexes require aggregations | ||
RollupDataExtractorFactory.create(client, datafeed, job, response.getJobs(), factoryHandler); | ||
} else { | ||
throw new IllegalArgumentException("Aggregations are required when using Rollup indices"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use the listener?: factoryHandler.onFailure(new IAE(...));
There is a messages class which centrally bundles all messages as constants, might be good to use it here.
@@ -424,7 +424,7 @@ private TimeValue defaultFrequencyTarget(TimeValue bucketSpan) { | |||
|
|||
private static final TimeValue MIN_DEFAULT_QUERY_DELAY = TimeValue.timeValueMinutes(1); | |||
private static final TimeValue MAX_DEFAULT_QUERY_DELAY = TimeValue.timeValueMinutes(2); | |||
private static final int DEFAULT_AGGREGATION_CHUNKING_BUCKETS = 1000; | |||
public static final int DEFAULT_AGGREGATION_CHUNKING_BUCKETS = 1000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: push to the top as it is now public
|
||
@Override | ||
public void cancel() { | ||
LOGGER.trace("[{}] Data extractor received cancel request", context.jobId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: debug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was trace
originally, but I agree it is worth taking the chance and changing it to debug
.
import org.elasticsearch.xpack.core.rollup.action.RollupJobCaps.RollupFieldCaps; | ||
import org.elasticsearch.xpack.core.rollup.job.DateHistogramGroupConfig; | ||
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; | ||
import org.joda.time.DateTimeZone; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a push to move away from joda, is it possible to use java.time
instead?
...va/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractorFactory.java
Show resolved
Hide resolved
private static boolean hasAggregations(ParsedRollupCaps rollupCaps, List<AggregationBuilder> datafeedAggregations) { | ||
for (AggregationBuilder aggregationBuilder : datafeedAggregations) { | ||
String type = aggregationBuilder.getType(); | ||
String field = ((ValuesSourceAggregationBuilder) aggregationBuilder).field(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be cleaner if the parameter of this method is already List<ValuesSourceAggregationBuilder>
and if the calling code would therefore downcast as soon as it verified/knows that the AggregationBuilder
is a ValuesSourceAggregationBuilder
(I assume anything else would be a bug anyway).
This is now blocked due to: #34815 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving a first batch of comments. Still working through the last 2/5 of the PR :-)
listener.onFailure(e); | ||
} | ||
}); | ||
DataExtractorFactory.create(client, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: since there is now no change in this file, you could remove the file entirely from the commit.
factory -> listener.onResponse(datafeed.getChunkingConfig().isEnabled() | ||
? new ChunkedDataExtractorFactory(client, datafeed, job, factory) : factory) | ||
, listener::onFailure | ||
factory -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this change also seems unnecessary now
|
||
@Override | ||
public void cancel() { | ||
LOGGER.trace("[{}] Data extractor received cancel request", context.jobId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was trace
originally, but I agree it is worth taking the chance and changing it to debug
.
* | ||
* @param <T> The request builder type for getting data from ElasticSearch | ||
*/ | ||
public abstract class AbstractAggregationDataExtractor<T extends ActionRequestBuilder<SearchRequest, SearchResponse>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make package private?
return ClientHelper.executeWithHeaders(context.headers, ClientHelper.ML_ORIGIN, client, searchRequestBuilder::get); | ||
} | ||
|
||
protected abstract T buildSearchRequest(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
template method FTW! :-)
// For derivative aggregations the first bucket will always be null | ||
// so query one extra histogram bucket back and hope there is data | ||
// in that bucket | ||
long histogramSearchStartTime = Math.max(0, context.start - ExtractorUtils.getHistogramIntervalMillis(context.aggs)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of this method is still common between the 2 implementations. There are some flaky bits here, like the -1 histogram bucket for the derivative aggregations. Would be nice to keep those in a single place.
What if we make the abstract method by buildSearchRequest(SearchSourceBuilder)
, then we have the common logic in the super class? We can set the types in the AggregationDataExtractor
while we don't do so in the rollup?
return false; | ||
} | ||
try { | ||
long jobInterval = validateAndGetCalendarInterval(rollupJobGroupConfig.getInterval()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I find that when using static methods like in this case, it makes the code more readable to explicitly call ExtractorUtils.validateAndGetCalendarInterval(...)
rather than statically import the method. It means I don't have to wonder where this method comes from. I suppose if a method was used many times in a file it could be otherwise. Anyway, not something you need to change, just a thought.
private final Set<String> supportedMetrics; | ||
private final Set<String> supportedTerms; | ||
private final Map<String, Object> datehistogramAgg; | ||
private static List<String> aggsToIgnore = Arrays.asList(HistogramAggregationBuilder.NAME, DateHistogramAggregationBuilder.NAME); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make final
private static ParsedRollupCaps fromJobFieldCaps(Map<String, RollupFieldCaps> rollupFieldCaps, String timeField) { | ||
Map<String, Object> datehistogram = null; | ||
RollupFieldCaps timeFieldCaps = rollupFieldCaps.get(timeField); | ||
if ((timeFieldCaps == null) == false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
simplify: if (timeFieldCaps != null)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fine with that simplification (it is my preference to use !=
other than == false
) but the prevailing pattern I have seen throughout elastic is == false
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
{expression} == false
is preferred instead of !{expression}
. Other than that, !=
is used normally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, rest went quickly. Only one comment.
* @throws IOException when timefield range search fails | ||
*/ | ||
private DataSummary buildDataSummary() throws IOException { | ||
if (context.hasAggregations) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about extracting both branches into their own methods here? i.e. createDataSummaryForAggregations
and createDataSummaryForScroll
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For sure.
This is directly blocked by #34831 |
if (rollupJobGroupConfig.hasDatehistogram() == false) { | ||
return false; | ||
} | ||
if ("UTC".equalsIgnoreCase(rollupJobGroupConfig.getTimezone()) == false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
out of curiosity: is that something for a follow-up? If the bucket span is compatible with shifting, it should not be a big deal to support other timezones?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++
I imagine it only really matters for bucket spans of more than 30 minutes. For ML 1 day bucket spans are always 24 hours and UTC, whereas for a date histogram with 1 day buckets (a) they could be offset from the UTC buckets the analytics expects to see and (b) if there is daylight saving then one bucket per year will be 23 hours and one will be 25 hours. Also, for 1 hour bucket spans the offset problem could occur if the timezone is N.5 hours different from UTC, e.g. India at 5.5 hours ahead. But for bucket spans up to and including 30 minutes we should be able to cope with any rollup timezone.
Or am I missing some other problem here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think your description is correct @droberts195. We have gone through this before and the restriction of UTC timezone is already in place for aggregations (without rollups). Ben has just copied the validation over to the rollup realm. Since things might go wrong when dealing with non-UTC, imposing the UTC restriction is a good solution. It only requires the user to NOT change the default timezone of the date histogram aggregation. It does not pose any requirements on the actual data and how they're indexed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK cool. Let's leave any changes in this area for a separate PR then. Definitely don't make any changes for this in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, just realised my comment above is confused. This is checking the rollup was in UTC. It is worth finding out what happens if you search a rollup which uses timezone X by aggregating over timezone Y. We need that before making a call on how we deal with this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
} | ||
|
||
protected SearchResponse executeSearchRequest(T searchRequestBuilder) { | ||
return ClientHelper.executeWithHeaders(context.headers, ClientHelper.ML_ORIGIN, client, searchRequestBuilder::get); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will fail if a rollup search has been selected but the user who created the datafeed doesn't have permission to use rollups.
We have to decide how we want to handle this situation better. The two options are:
- If the index pattern includes a rollup index when the datafeed is created and the user does not have permission to run rollup searches then refuse to create the datafeed
- During the check on whether to use a normal search or rollup search, if the user doesn't have permission to run rollup searches then silently fall back to a normal search
I think option 1 is probably better because it avoids the possibility of a datafeed that the user thinks will use rollups silently not working as expected.
To implement that, look in TransportPutDatafeedAction.masterOperation
. It currently does a HasPrivilegesRequest
to check that the user can search the desired indices. If the index pattern provided includes a rollup index then it should also check privileges for rollup search (normal search privilege is still required as well because rollup search uses that internally).
There is still a theoretical flaw in this as an index pattern could be provided that does not match a rollup index at the time the datafeed is created, but does later on. For example, suppose a datafeed is created against farequote*
. Initially this matches farequote-20181029
, farequote-20181030
, farequote-20181031
, etc. But later it also matches farequote_rollup
. With the current logic the datafeed would switch to rollup search at that time, and start to fail if the user did not have permission to use rollup search. But this is an edge case and I'm happy to just cover the more obvious case initially.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@droberts195 how is this behavior any different than when a user creates a datafeed in which they don't have access to that specific index?
The reason for why I have it the way it is, is because of behavior parity with how we handle permission issues when reading data from an index.
I will happily change to verify the user has admin privileges for the indices. I would just note that the Rollup folks are working towards simply requiring regular search permissions for _rollup_search
so that it is treated as a simple index read, which would give it parity with other datafeed read actions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how is this behavior any different than when a user creates a datafeed in which they don't have access to that specific index?
The problem is that there are many types of "access". Each action can be allowed or disallowed for a particular user. There are index privileges that group together multiple actions. But there's also some overlap in what actions the different privileges allow.
the Rollup folks are working towards simply requiring regular search permissions for
_rollup_search
They'll do this by adjusting the actions that are allowed by the READ_AUTOMATON
. But our privileges check in TransportPutDatafeedAction.java
is checking specifically for the ability to run the _search
action, not for the "read" index privilege. I think we should also check for the ability to use the _rollup_search
action at that same point in the code if the index pattern provided includes a rollup index (which will require an extra test to be done early in TransportPutDatafeedAction
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see what you are saying @droberts195 I was thinking of the extraction layer, not the creation layer of the datafeed. Will update shortly
Jenkins retest this please |
Jenkins retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Actually there's a checkstyle error in the latest commit:
But I'm happy for you to merge this as soon as you get a green PR build. |
* Adding rollup support for datafeeds * Fixing tests and adjusting formatting * minor formatting chagne * fixing some syntax and removing redundancies * Refactoring and fixing failing test * Refactoring, adding paranoid null check * Moving rollup into the aggregation package * making AggregationToJsonProcessor package private again * Addressing test failure * Fixing validations, chunking * Addressing failing test * rolling back RollupJobCaps changes * Adding comment and cleaning up test * Addressing review comments and test failures * Moving builder logic into separate methods * Addressing PR comments, adding test for rollup permissions * Fixing test failure * Adding rollup priv check on datafeed put * Handling missing index when getting caps * Fixing unused import
This adds rollup index support for data extraction in datafeeds.
There are SOME changes inside the Rollup side of the house, so review from them for these changes may be necessary
RollupSearchAction
so that a previously crafted search can be injected via the constructor