Skip to content

Commit

Permalink
feat: log number of row groups
Browse files Browse the repository at this point in the history
Print to external log file the number of fetched row groups.
  • Loading branch information
rimarin committed Feb 22, 2024
1 parent 6eb7745 commit 5164a1c
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 5 deletions.
1 change: 1 addition & 0 deletions extension/parquet/include/parquet_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ struct ParquetReaderScanState {
bool prefetch_mode = false;
bool current_group_prefetched = false;
set<string> fetchedFiles;
unordered_map<string, set<int64_t>> fetchedRowGroups;
};

struct ParquetColumnDefinition {
Expand Down
13 changes: 13 additions & 0 deletions extension/parquet/parquet_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ struct ParquetReadGlobalState : public GlobalTableFunctionState {
vector<column_t> column_ids;
TableFilterSet *filters;
set<string> fetchedFiles;
unordered_map<string, set<int64_t>> fetchedRowsGroups;

idx_t MaxThreads() const override {
return max_threads;
Expand Down Expand Up @@ -615,6 +616,9 @@ class ParquetScanFunction {
}

gstate.fetchedFiles.insert(data.scan_state.fetchedFiles.begin(), data.scan_state.fetchedFiles.end());
for (const auto &fileToRowsGroups : data.scan_state.fetchedRowGroups){
gstate.fetchedRowsGroups[fileToRowsGroups.first].insert(fileToRowsGroups.second.begin(), fileToRowsGroups.second.end());
}

bind_data.chunk_count++;
if (output.size() > 0) {
Expand All @@ -629,6 +633,15 @@ class ParquetScanFunction {
numPartitionsFile.open("partitions.log", std::fstream::out);
numPartitionsFile << gstate.fetchedFiles.size() << "\n";
numPartitionsFile.close();
// Log the number of fetched row groups
uint64_t totalRowGroups = 0;
for (const auto &fileToRowsGroups : gstate.fetchedRowsGroups){
totalRowGroups += fileToRowsGroups.second.size();
}
std::ofstream numRowGroupsFile;
numRowGroupsFile.open("row_groups.log", std::fstream::out);
numRowGroupsFile << totalRowGroups << "\n";
numRowGroupsFile.close();
}

static unique_ptr<NodeStatistics> ParquetCardinality(ClientContext &context, const FunctionData *bind_data) {
Expand Down
15 changes: 10 additions & 5 deletions extension/parquet/parquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -910,19 +910,24 @@ bool ParquetReader::ScanInternal(ParquetReaderScanState &state, DataChunk &resul
}

uint64_t to_scan_compressed_bytes = 0;
bool columnSkipped = false;
bool rowGroupSkipped = false;
for (idx_t col_idx = 0; col_idx < reader_data.column_ids.size(); col_idx++) {
columnSkipped |= PrepareRowGroupBuffer(state, col_idx);
// True: row group is relevant for the query filters
// False: row group was pruned thanks to columnar statistics
// True: column chunk was pruned thanks to columnar statistics
// False: row group is relevant for the query filters
bool columnChunkSkipped = PrepareRowGroupBuffer(state, col_idx);
rowGroupSkipped |= columnChunkSkipped;

auto file_col_idx = reader_data.column_ids[col_idx];

auto &root_reader = state.root_reader->Cast<StructColumnReader>();
to_scan_compressed_bytes += root_reader.GetChildReader(file_col_idx)->TotalCompressedSize();
}

if (!columnSkipped){
if (!rowGroupSkipped){
// Keep track of the fetched row group
auto fetchedRowGroup = state.group_idx_list[state.current_group];
state.fetchedRowGroups[file_name].emplace(fetchedRowGroup);
// Keep track of the fetched file
state.fetchedFiles.emplace(file_name);
}

Expand Down

0 comments on commit 5164a1c

Please sign in to comment.