Skip to content

Commit

Permalink
Get result iterator in PinotBrokerPageSource constructor
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Aug 10, 2023
1 parent 3e54e0e commit a35f374
Showing 1 changed file with 5 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,16 @@ public class PinotBrokerPageSource
implements ConnectorPageSource
{
private final PinotQueryInfo query;
private final PinotClient pinotClient;
private final ConnectorSession session;
private final List<PinotColumnHandle> columnHandles;
private final List<Decoder> decoders;
private final BlockBuilder[] columnBuilders;
private final long readTimeNanos;
private final Iterator<BrokerResultRow> resultIterator;

private boolean finished;
private long readTimeNanos;
private long completedBytes;
private final AtomicLong currentRowCount = new AtomicLong();
private final int limitForBrokerQueries;

private Iterator<BrokerResultRow> resultIterator;

public PinotBrokerPageSource(
ConnectorSession session,
PinotQueryInfo query,
Expand All @@ -62,16 +58,16 @@ public PinotBrokerPageSource(
int limitForBrokerQueries)
{
this.query = requireNonNull(query, "query is null");
this.pinotClient = requireNonNull(pinotClient, "pinotClient is null");
this.session = requireNonNull(session, "session is null");
this.columnHandles = requireNonNull(columnHandles, "columnHandles is null");
this.decoders = createDecoders(columnHandles);
this.limitForBrokerQueries = limitForBrokerQueries;

this.columnBuilders = columnHandles.stream()
.map(PinotColumnHandle::getDataType)
.map(type -> type.createBlockBuilder(null, 1))
.toArray(BlockBuilder[]::new);
long start = System.nanoTime();
resultIterator = pinotClient.createResultIterator(session, query, columnHandles);
readTimeNanos = System.nanoTime() - start;
}

private static List<Decoder> createDecoders(List<PinotColumnHandle> columnHandles)
Expand Down Expand Up @@ -107,12 +103,6 @@ public Page getNextPage()
if (finished) {
return null;
}
if (resultIterator == null) {
long start = System.nanoTime();
resultIterator = pinotClient.createResultIterator(session, query, columnHandles);
readTimeNanos = System.nanoTime() - start;
}

if (!resultIterator.hasNext()) {
finished = true;
return null;
Expand Down

0 comments on commit a35f374

Please sign in to comment.