Skip to content

Commit

Permalink
[C++] Upgrade lq cmd tool to be able to inspect new versioned format (
Browse files Browse the repository at this point in the history
#334)

* change cmd to support versioned dataset

* link fast counts

* revert
  • Loading branch information
eddyxu authored Nov 28, 2022
1 parent 8899d86 commit 1adda78
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 16 deletions.
26 changes: 25 additions & 1 deletion cpp/src/lance/arrow/fragment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,24 @@ ::arrow::Result<::arrow::RecordBatchGenerator> LanceFragment::ScanBatchesAsync(
return ::arrow::RecordBatchGenerator(std::move(batch_reader));
}

::arrow::Future<std::optional<int64_t>> LanceFragment::CountRows(
::arrow::compute::Expression predicate,
const std::shared_ptr<::arrow::dataset::ScanOptions>& options) {
if (!::arrow::compute::ExpressionHasFieldRefs(predicate)) {
auto result = options->io_context.executor()->Submit(
[=](auto fragment) -> ::arrow::Result<std::optional<int64_t>> {
return fragment->FastCountRows();
},
this);
if (!result.ok()) {
return result.status();
}
return result.ValueOrDie();
}
// Fallback to default method
return ::arrow::dataset::Fragment::CountRows(predicate, options);
}

::arrow::Result<std::shared_ptr<::arrow::Schema>> LanceFragment::ReadPhysicalSchemaImpl() {
return schema()->ToArrow();
}
Expand Down Expand Up @@ -115,7 +133,13 @@ const std::shared_ptr<lance::format::DataFragment>& LanceFragment::data_fragment
return fragment_;
}

::arrow::Result<int64_t> LanceFragment::FastCountRow() const {
::arrow::Result<int32_t> LanceFragment::num_batches() const {
assert(!fragment_->data_files().empty());
ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(0));
return reader->num_batches();
}

::arrow::Result<std::optional<int64_t>> LanceFragment::FastCountRows() const {
assert(!fragment_->data_files().empty());
ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(0));
return reader->length();
Expand Down
8 changes: 7 additions & 1 deletion cpp/src/lance/arrow/fragment.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ class LanceFragment : public ::arrow::dataset::Fragment {
::arrow::Result<::arrow::RecordBatchGenerator> ScanBatchesAsync(
const std::shared_ptr<::arrow::dataset::ScanOptions>& options) override;

::arrow::Future<std::optional<int64_t>> CountRows(
::arrow::compute::Expression predicate,
const std::shared_ptr<::arrow::dataset::ScanOptions>& options) override;

std::string type_name() const override { return "lance"; }

/// Open Data files that contains the columns in the schema.
Expand All @@ -91,12 +95,14 @@ class LanceFragment : public ::arrow::dataset::Fragment {
/// Access the data fragment (POD).
const std::shared_ptr<lance::format::DataFragment>& data_fragment() const;

::arrow::Result<int32_t> num_batches() const;

protected:
::arrow::Result<std::shared_ptr<::arrow::Schema>> ReadPhysicalSchemaImpl() override;

private:
/// Fast path `CountRow()`, it only reads the metadata of one data file.
::arrow::Result<int64_t> FastCountRow() const;
::arrow::Result<std::optional<int64_t>> FastCountRows() const;

/// Open file reader on a data file.
///
Expand Down
49 changes: 35 additions & 14 deletions cpp/src/lance/cmd/lance.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@
// limitations under the License.

#include <arrow/dataset/api.h>
#include <arrow/status.h>
#include <fmt/format.h>
#include <fmt/ranges.h>

#include <argparse/argparse.hpp>
#include <iostream>
#include <optional>
#include <string>

#include "lance/arrow/dataset.h"
#include "lance/arrow/fragment.h"
#include "lance/arrow/type.h"
#include "lance/arrow/utils.h"
#include "lance/io/reader.h"
Expand All @@ -28,29 +32,39 @@ using std::string;

void PrintLine(int width = 40) { fmt::print("{:-^{}}\n", "", width); }

::arrow::Status PrintSchema(const std::shared_ptr<::arrow::dataset::FileSystemDataset>& dataset) {
auto files = dataset->files();
auto infile = dataset->filesystem()->OpenInputFile(files[0]).ValueOrDie();
auto reader = lance::io::FileReader::Make(infile).ValueOrDie();
auto schema = reader->schema();
::arrow::Status PrintSchema(const std::shared_ptr<lance::arrow::LanceDataset>& dataset) {
PrintLine();
fmt::print("Schema:\n", schema);
lance::format::Print(schema);
ARROW_ASSIGN_OR_RAISE(auto fragments, dataset->GetFragments());
ARROW_ASSIGN_OR_RAISE(auto first, fragments.Next());
assert(first != nullptr);
auto fragment = std::dynamic_pointer_cast<lance::arrow::LanceFragment>(first);
auto schema = fragment->schema();
fmt::print("Schema:\n");
lance::format::Print(*schema);
return ::arrow::Status::OK();
}

::arrow::Status PrintSummary(const std::shared_ptr<::arrow::dataset::FileSystemDataset>& dataset) {
::arrow::Status PrintSummary(const std::shared_ptr<lance::arrow::LanceDataset>& dataset) {
assert(dataset);
PrintLine();
fmt::print("Summary: \n");
int num_batches = 0;
int total = 0;
for (auto file : dataset->files()) {
ARROW_ASSIGN_OR_RAISE(auto infile, dataset->filesystem()->OpenInputFile(file));
ARROW_ASSIGN_OR_RAISE(auto reader, lance::io::FileReader::Make(infile));
num_batches += reader->num_batches();
total += reader->length();
auto scanner = dataset->NewScan().ValueOrDie()->Finish().ValueOrDie();
ARROW_ASSIGN_OR_RAISE(auto fragment_iter, dataset->GetFragments());
while (true) {
ARROW_ASSIGN_OR_RAISE(auto fragment, fragment_iter.Next());
if (!fragment) {
break;
}
auto lfragment = std::dynamic_pointer_cast<lance::arrow::LanceFragment>(fragment);
ARROW_ASSIGN_OR_RAISE(int32_t batches, lfragment->num_batches());
num_batches += batches;
auto fut = lfragment->CountRows(::arrow::compute::literal(true), scanner->options());
ARROW_ASSIGN_OR_RAISE(auto cnt, fut.result());
total += cnt.value_or(0);
}
fmt::print(" Number of versions: {}\n", dataset->versions().ValueOrDie().size());
fmt::print(" Total records: {}\n", total);
fmt::print(" Number of batches: {}\n", num_batches);
fmt::print(" Mean batch size: {}\n", total / num_batches + 1);
Expand All @@ -59,8 +73,14 @@ ::arrow::Status PrintSummary(const std::shared_ptr<::arrow::dataset::FileSystemD

::arrow::Status inspect(const argparse::ArgumentParser& args) {
auto uri = args.get<string>("uri");
std::optional<uint64_t> version;
if (args.is_used("--dataset-version")) {
version = args.get<uint64_t>("--dataset-version");
}
std::string path;
ARROW_ASSIGN_OR_RAISE(auto fs, ::arrow::fs::FileSystemFromUriOrPath(uri, &path));
fmt::print("Inspecting dataset: {}\n", uri);
ARROW_ASSIGN_OR_RAISE(auto dataset, lance::arrow::OpenDataset(uri));
ARROW_ASSIGN_OR_RAISE(auto dataset, lance::arrow::LanceDataset::Make(fs, path, version));
ARROW_RETURN_NOT_OK(PrintSummary(dataset));
ARROW_RETURN_NOT_OK(PrintSchema(dataset));

Expand All @@ -74,6 +94,7 @@ int main(int argc, char** argv) {
argparse::ArgumentParser inspect_parser("inspect");
inspect_parser.add_description("Inspect dataset");
inspect_parser.add_argument("uri").help("Dataset URI").required();
inspect_parser.add_argument("-V", "--dataset-version").help("specify the version");
parser.add_subparser(inspect_parser);

try {
Expand Down

0 comments on commit 1adda78

Please sign in to comment.