Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use dynamic filter to prune Iceberg splits based on partition values #9193

Merged
merged 5 commits into from
Oct 25, 2021

Conversation

alexjo2144
Copy link
Member

@alexjo2144 alexjo2144 commented Sep 9, 2021

For #4115

session.getTimeZoneKey(),
maxSplitsPerSecond,
maxOutstandingSplits,
new BoundedExecutor(executorService, splitLoaderConcurrency),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this needs to be configurable, might just want to hardcode the thread count.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe iceberg is using multiple threads internally, but isn't the split loading is still single threaded per split source on Trino side ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we hard-code splitLoaderConcurrency to 1 given that we're not parallelising split loading in IcebergSplitSource ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That, or we could just pass the ExecutorService straight through without wrapping in BoundedExecutor

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not completely single threaded because the AsyncQueue uses the executor to do some of its operations.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't setting splitLoaderConcurrency to 1 result in a deadlock, as we would use one thread for:

addExceptionCallback(ResumableTasks.submit(executor, loaderTask), this::onLoaderFailure);

And then there would be no threads left for splitQueue handling:

new ThrottledAsyncQueue<>(maxSplitsPerSecond, maxOutstandingSplits, executor);

@alexjo2144 alexjo2144 force-pushed the iceberg/dynamic-filter-partitions branch from 2ba2e75 to 67934a5 Compare September 9, 2021 22:43
session.getTimeZoneKey(),
maxSplitsPerSecond,
maxOutstandingSplits,
new BoundedExecutor(executorService, splitLoaderConcurrency),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe iceberg is using multiple threads internally, but isn't the split loading is still single threaded per split source on Trino side ?

return TaskStatus.finished();
}

public boolean partitionPassesDynamicFilter(Map<Integer, String> partitionKeys)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have common code for this and TableStatisticsMaker#dataFileMatches ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also look at file level Domain for the columns for which we have dynamic filter and prune splits based on that ?
We don't necessarily have to restrict ourselves to partitioned columns here.

@alexjo2144 alexjo2144 force-pushed the iceberg/dynamic-filter-partitions branch 3 times, most recently from 497985f to 3d85856 Compare September 13, 2021 17:31
@alexjo2144
Copy link
Member Author

Comments address, thanks @raunaqmorarka

@alexjo2144 alexjo2144 force-pushed the iceberg/dynamic-filter-partitions branch from 3d85856 to e34a1f8 Compare September 13, 2021 18:32
Copy link
Member

@raunaqmorarka raunaqmorarka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please share tpch/tpcds benchmark results after applying current round of comments

private void checkLoaderException()
{
if (loaderException != null) {
if (loaderException instanceof TrinoException) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For IOException maybe we should throw TrinoException(ICEBERG_FILESYSTEM_ERROR, e)
Though I don't know what kind of exceptions can be produced here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe. The none of the Iceberg TableScan methods throw a checked IOException, but it seems reasonable that they could.

@findepi findepi requested a review from losipiuk September 16, 2021 14:52
Copy link
Member

@findepi findepi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(skim)

@Provides
public ExecutorService createIcebergExecutor(CatalogName catalogName)
{
return newCachedThreadPool(daemonThreadsNamed("iceberg-" + catalogName + "-%s"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should cleanly forbid catalog names with %s (or sanitize catalogName in places like this)

preexisting (for hive), no change requested here

}
catch (IOException e) {
throw new UncheckedIOException(e);
checkLoaderException();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason to try fail close?

Copy link
Member

@electrum electrum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The split queuing adds significant complexity. Why do we need this feature? It was added to Hive because someone wanted to reduce load on HDFS, but I haven’t heard anyone ask about this for Iceberg.

@alexjo2144 alexjo2144 force-pushed the iceberg/dynamic-filter-partitions branch from e34a1f8 to 3823f26 Compare September 17, 2021 16:22
@sopel39
Copy link
Member

sopel39 commented Sep 17, 2021

The split queuing adds significant complexity. Why do we need this feature? It was added to Hive because someone wanted to reduce load on HDFS, but I haven’t heard anyone ask about this for Iceberg.

It's not only about load, but also performance. DF brings huge perf improvements. Therefore if Iceberg is much slower than Hive connector, it won't be a feasible alternative.

@sopel39
Copy link
Member

sopel39 commented Sep 17, 2021

@alexjo2144 would it be possible to get macro benchmarks result?

@electrum
Copy link
Member

electrum commented Sep 17, 2021

Dynamic filtering is obviously something we want. But I don't see why it is tied with split throttling. If we want to block, we can simply return the DF future from ConnectorSplitSource.getNextBatch():

ConnectorSplitBatch EMPTY_BATCH = new ConnectorSplitBatch(ImmutableList.of(), false);

long startTime; // time of first getNextBatch() call
long elapsed = NANOS.toMillis(System.nanoTime() - startTime);
long remainingWait = waitTime.toMillis() - elapsed;

if ((remainingWait > 0) && dynamicFilter.isBlocked()) {
    return dynamicFilter.isBlocked()
            .thenApply(ignored -> EMPTY_BATCH)
            .completeOnTimeout(EMPTY_BATCH, remainingWait, MILLIS);
}

@alexjo2144
Copy link
Member Author

Dynamic filtering is obviously something we want. But I don't see why it is tied with split throttling

I can take the throttling part out. It only got included because I was trying to mimic some of the Hive pattern and used ThrotledAsyncQueue, but we can use a non-throttled version.

@raunaqmorarka
Copy link
Member

@electrum In hive connector, although we've implemented the blocking for DF, we keep it turned off by default to avoid any regressions for small queries. If we keep it turned off by default here as well, then would that just result in all the splits getting generated and scheduled up front ? If yes, then we would have to set some non-zero default for the DF blocking timeout and that can potentially slow down some short running queries or queries which waited on DF but the DF wasn't useful.
Also, does throttling of split generation help with reducing memory usage on coordinator or reducing wasted work when a table scan is aborted due to a LIMIT ?

.map(IcebergSplit.class::cast)
.forEach(splits::add);
}
assertThat(splits.build().size()).isGreaterThan(0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can assert that you actually waited ~2s here.
Btw can you make the test without waiting? Maybe using artificial Ticker instead wall time.
You may also verify that future returned from splitSource.getNextBatch gets unblocked when you complete future returned from DynamicFilter.isBlocked

Ignore if too painful

@alexjo2144 alexjo2144 force-pushed the iceberg/dynamic-filter-partitions branch from d2c9e08 to 103a99d Compare October 21, 2021 19:32
@alexjo2144 alexjo2144 force-pushed the iceberg/dynamic-filter-partitions branch from 103a99d to fcc241c Compare October 21, 2021 19:52
@alexjo2144
Copy link
Member Author

All set, thanks for the comments @losipiuk

@findepi
Copy link
Member

findepi commented Oct 22, 2021

CI #9620 (reopened)

column.getType(),
fromByteBuffer(type, lowerBounds.get(fieldId)),
fromByteBuffer(type, upperBounds.get(fieldId)),
mayContainNulls);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can realize all-null column when scanTask.file().valueCounts() matches scanTask.file().nullValueCounts() at given key. then we could create Domain.onlyNull for such case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's maybe do this in follow up

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya, we do that in TupleDomainParquetPredicate as well

@alexjo2144 alexjo2144 force-pushed the iceberg/dynamic-filter-partitions branch from fcc241c to 0924252 Compare October 22, 2021 17:32
@alexjo2144
Copy link
Member Author

alexjo2144 commented Oct 22, 2021

Discussed offline, but I removed the Executor and any async handling, it does not seem to be useful. SourcePartitionedScheduler only ever has one outstanding batch at a time for non-grouped execution, so doing the partition filtering on an async thread does not allow for parallel execution unless we also have grouped execution.

@alexjo2144
Copy link
Member Author

CI Flake: #7224

@losipiuk
Copy link
Member

Changes LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

6 participants