Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] Upgrade lq cmd tool to be able to inspect new versioned format #334

Merged
merged 3 commits into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion cpp/src/lance/arrow/fragment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,24 @@ ::arrow::Result<::arrow::RecordBatchGenerator> LanceFragment::ScanBatchesAsync(
return ::arrow::RecordBatchGenerator(std::move(batch_reader));
}

::arrow::Future<std::optional<int64_t>> LanceFragment::CountRows(
::arrow::compute::Expression predicate,
const std::shared_ptr<::arrow::dataset::ScanOptions>& options) {
if (!::arrow::compute::ExpressionHasFieldRefs(predicate)) {
auto result = options->io_context.executor()->Submit(
[=](auto fragment) -> ::arrow::Result<std::optional<int64_t>> {
return fragment->FastCountRows();
},
this);
if (!result.ok()) {
return result.status();
}
return result.ValueOrDie();
}
// Fallback to default method
return ::arrow::dataset::Fragment::CountRows(predicate, options);
}

::arrow::Result<std::shared_ptr<::arrow::Schema>> LanceFragment::ReadPhysicalSchemaImpl() {
return schema()->ToArrow();
}
Expand Down Expand Up @@ -115,7 +133,13 @@ const std::shared_ptr<lance::format::DataFragment>& LanceFragment::data_fragment
return fragment_;
}

::arrow::Result<int64_t> LanceFragment::FastCountRow() const {
::arrow::Result<int32_t> LanceFragment::num_batches() const {
assert(!fragment_->data_files().empty());
ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(0));
return reader->num_batches();
}

::arrow::Result<std::optional<int64_t>> LanceFragment::FastCountRows() const {
assert(!fragment_->data_files().empty());
ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(0));
return reader->length();
Expand Down
8 changes: 7 additions & 1 deletion cpp/src/lance/arrow/fragment.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ class LanceFragment : public ::arrow::dataset::Fragment {
::arrow::Result<::arrow::RecordBatchGenerator> ScanBatchesAsync(
const std::shared_ptr<::arrow::dataset::ScanOptions>& options) override;

::arrow::Future<std::optional<int64_t>> CountRows(
::arrow::compute::Expression predicate,
const std::shared_ptr<::arrow::dataset::ScanOptions>& options) override;

std::string type_name() const override { return "lance"; }

/// Open Data files that contains the columns in the schema.
Expand All @@ -91,12 +95,14 @@ class LanceFragment : public ::arrow::dataset::Fragment {
/// Access the data fragment (POD).
const std::shared_ptr<lance::format::DataFragment>& data_fragment() const;

::arrow::Result<int32_t> num_batches() const;

protected:
::arrow::Result<std::shared_ptr<::arrow::Schema>> ReadPhysicalSchemaImpl() override;

private:
/// Fast path `CountRow()`, it only reads the metadata of one data file.
::arrow::Result<int64_t> FastCountRow() const;
::arrow::Result<std::optional<int64_t>> FastCountRows() const;

/// Open file reader on a data file.
///
Expand Down
49 changes: 35 additions & 14 deletions cpp/src/lance/cmd/lance.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@
// limitations under the License.

#include <arrow/dataset/api.h>
#include <arrow/status.h>
#include <fmt/format.h>
#include <fmt/ranges.h>

#include <argparse/argparse.hpp>
#include <iostream>
#include <optional>
#include <string>

#include "lance/arrow/dataset.h"
#include "lance/arrow/fragment.h"
#include "lance/arrow/type.h"
#include "lance/arrow/utils.h"
#include "lance/io/reader.h"
Expand All @@ -28,29 +32,39 @@ using std::string;

void PrintLine(int width = 40) { fmt::print("{:-^{}}\n", "", width); }

::arrow::Status PrintSchema(const std::shared_ptr<::arrow::dataset::FileSystemDataset>& dataset) {
auto files = dataset->files();
auto infile = dataset->filesystem()->OpenInputFile(files[0]).ValueOrDie();
auto reader = lance::io::FileReader::Make(infile).ValueOrDie();
auto schema = reader->schema();
::arrow::Status PrintSchema(const std::shared_ptr<lance::arrow::LanceDataset>& dataset) {
PrintLine();
fmt::print("Schema:\n", schema);
lance::format::Print(schema);
ARROW_ASSIGN_OR_RAISE(auto fragments, dataset->GetFragments());
ARROW_ASSIGN_OR_RAISE(auto first, fragments.Next());
assert(first != nullptr);
auto fragment = std::dynamic_pointer_cast<lance::arrow::LanceFragment>(first);
auto schema = fragment->schema();
fmt::print("Schema:\n");
lance::format::Print(*schema);
return ::arrow::Status::OK();
}

::arrow::Status PrintSummary(const std::shared_ptr<::arrow::dataset::FileSystemDataset>& dataset) {
::arrow::Status PrintSummary(const std::shared_ptr<lance::arrow::LanceDataset>& dataset) {
assert(dataset);
PrintLine();
fmt::print("Summary: \n");
int num_batches = 0;
int total = 0;
for (auto file : dataset->files()) {
ARROW_ASSIGN_OR_RAISE(auto infile, dataset->filesystem()->OpenInputFile(file));
ARROW_ASSIGN_OR_RAISE(auto reader, lance::io::FileReader::Make(infile));
num_batches += reader->num_batches();
total += reader->length();
auto scanner = dataset->NewScan().ValueOrDie()->Finish().ValueOrDie();
ARROW_ASSIGN_OR_RAISE(auto fragment_iter, dataset->GetFragments());
while (true) {
ARROW_ASSIGN_OR_RAISE(auto fragment, fragment_iter.Next());
if (!fragment) {
break;
}
auto lfragment = std::dynamic_pointer_cast<lance::arrow::LanceFragment>(fragment);
ARROW_ASSIGN_OR_RAISE(int32_t batches, lfragment->num_batches());
num_batches += batches;
auto fut = lfragment->CountRows(::arrow::compute::literal(true), scanner->options());
ARROW_ASSIGN_OR_RAISE(auto cnt, fut.result());
total += cnt.value_or(0);
}
fmt::print(" Number of versions: {}\n", dataset->versions().ValueOrDie().size());
fmt::print(" Total records: {}\n", total);
fmt::print(" Number of batches: {}\n", num_batches);
fmt::print(" Mean batch size: {}\n", total / num_batches + 1);
Expand All @@ -59,8 +73,14 @@ ::arrow::Status PrintSummary(const std::shared_ptr<::arrow::dataset::FileSystemD

::arrow::Status inspect(const argparse::ArgumentParser& args) {
auto uri = args.get<string>("uri");
std::optional<uint64_t> version;
if (args.is_used("--dataset-version")) {
version = args.get<uint64_t>("--dataset-version");
}
std::string path;
ARROW_ASSIGN_OR_RAISE(auto fs, ::arrow::fs::FileSystemFromUriOrPath(uri, &path));
fmt::print("Inspecting dataset: {}\n", uri);
ARROW_ASSIGN_OR_RAISE(auto dataset, lance::arrow::OpenDataset(uri));
ARROW_ASSIGN_OR_RAISE(auto dataset, lance::arrow::LanceDataset::Make(fs, path, version));
ARROW_RETURN_NOT_OK(PrintSummary(dataset));
ARROW_RETURN_NOT_OK(PrintSchema(dataset));

Expand All @@ -74,6 +94,7 @@ int main(int argc, char** argv) {
argparse::ArgumentParser inspect_parser("inspect");
inspect_parser.add_description("Inspect dataset");
inspect_parser.add_argument("uri").help("Dataset URI").required();
inspect_parser.add_argument("-V", "--dataset-version").help("specify the version");
parser.add_subparser(inspect_parser);

try {
Expand Down