Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix resultcache multiple postaggregation restore #15402

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.query.aggregation.MetricManipulatorFns;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
Expand Down Expand Up @@ -651,9 +650,7 @@ public ResultRow apply(Object input)
);

if (isResultLevelCache) {
Iterator<PostAggregator> postItr = query.getPostAggregatorSpecs().iterator();
int postPos = 0;
while (postItr.hasNext() && results.hasNext()) {
for (int postPos = 0; postPos < query.getPostAggregatorSpecs().size(); postPos++) {
resultRow.set(postAggregatorStart + postPos, results.next());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if this can happen but to be on the safe side?

Suggested change
for (int postPos = 0; postPos < query.getPostAggregatorSpecs().size(); postPos++) {
resultRow.set(postAggregatorStart + postPos, results.next());
}
for (int postPos = 0; postPos < query.getPostAggregatorSpecs().size(); postPos++) {
if (results.hasNext()) resultRow.set(postAggregatorStart + postPos, results.next());
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the contract around here should be to restore the same row that it have saved into the cache - if its not able to do so; I think it should fail with an exception - if it doesn't do that wouldn't that cause incorrect results - or not?

Copy link
Contributor

@LakshSingla LakshSingla Nov 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also going through this area for a different purpose, and indeed there's a check below this code that checks if both the iterators have been exhausted completely, so even with the above check, the code would fail below. However, the error message would be more descriptive, so I think we should add the check back, otherwise it would fail with a generic NoSuchElementException exception

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure about the case you were thinking - but I'm afraid that check might not be that usefull for these cases; consider the following:

  • lets have
    • a results with {1 dim}+{1 aggs}+{1 postagg} = 3
    • a resultRow with {1 dim}+{1 aggs}+{2 postagg} = 4 (for whatever reason ?)
    • since in these cases the system is already facing a serious error - I'm not sure what value a nicer error message will give - as such error could not be fixed easily for sure
  • I think a more reasonable check would be to ensure that results.size() == resultRow.size() - so that we can remove the checking of all these iterators - as they are trying to do the same in a more complicated way

I've added an if to throw an exception in case there are no next when there should be one; but all these conditionals are kinda redundant as the iterator was created for a list

Copy link
Contributor

@LakshSingla LakshSingla Nov 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java#L660
I was referring to this check. It should also handle the case that you mentioned above right? Perhaps adding the size check that you mentioned earlier is more legible. I am okay with the code either way.

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,8 @@ private void doTestCacheStrategy(final ColumnType valueType, final Object dimVal
)
)
.setPostAggregatorSpecs(
ImmutableList.of(new ConstantPostAggregator("post", 10))
new ConstantPostAggregator("post", 10),
new ConstantPostAggregator("post2", 20)
)
.setGranularity(QueryRunnerTestHelper.DAY_GRAN)
.build();
Expand Down Expand Up @@ -1017,14 +1018,14 @@ private void doTestCacheStrategy(final ColumnType valueType, final Object dimVal
Assert.assertEquals(result1, fromCacheResult);

// test timestamps that result in integer size millis
final ResultRow result2 = ResultRow.of(123L, dimValue, 1, dimValue, 10);
final ResultRow result2 = ResultRow.of(123L, dimValue, 1, dimValue, 10, 20);

// Please see the comments on aggregator serde and type handling in CacheStrategy.fetchAggregatorsFromCache()
final ResultRow typeAdjustedResult2;
if (valueType.is(ValueType.FLOAT)) {
typeAdjustedResult2 = ResultRow.of(123L, dimValue, 1, 2.1d, 10);
typeAdjustedResult2 = ResultRow.of(123L, dimValue, 1, 2.1d, 10, 20);
} else if (valueType.is(ValueType.LONG)) {
typeAdjustedResult2 = ResultRow.of(123L, dimValue, 1, 2, 10);
typeAdjustedResult2 = ResultRow.of(123L, dimValue, 1, 2, 10, 20);
} else {
typeAdjustedResult2 = result2;
}
Expand Down