Skip to content

Commit

Permalink
apacheGH-34051: [C++] GcsFileSystem lazily starts sequential reads (a…
Browse files Browse the repository at this point in the history
…pache#34052)

`OpenInputFile()` returns a `io::RandomAccessFile` which supports sequential reads as well as random access reads. The previous implementation eagerly started a sequential read, but many applications do not use that aspect of the API. Because GCS has fairly high latency, this can slow down applications that are only going to read data using `ReadAt()`. This includes applications using Parquet files via PyArrow.

Fixes apache#34051 

### What changes are included in this PR?

Change the GcsFileSystem class to lazily start the download used to implement the `io::InputFile` APIs.

### Are these changes tested?

I think so: the existing tests cover the affected functions.

### Are there any user-facing changes?

No.

* Closes: apache#34051

Authored-by: Carlos O'Ryan <coryan@google.com>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
  • Loading branch information
coryan authored and Mike Hancock committed Feb 17, 2023
1 parent ed53892 commit ec55a6d
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 24 deletions.
56 changes: 32 additions & 24 deletions cpp/src/arrow/filesystem/gcsfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -247,32 +247,41 @@ using InputStreamFactory = std::function<Result<std::shared_ptr<GcsInputStream>>

class GcsRandomAccessFile : public arrow::io::RandomAccessFile {
public:
GcsRandomAccessFile(InputStreamFactory factory, gcs::ObjectMetadata metadata,
std::shared_ptr<GcsInputStream> stream)
: factory_(std::move(factory)),
metadata_(std::move(metadata)),
stream_(std::move(stream)) {}
GcsRandomAccessFile(InputStreamFactory factory, gcs::ObjectMetadata metadata)
: factory_(std::move(factory)), metadata_(std::move(metadata)) {}
~GcsRandomAccessFile() override = default;

//@{
// @name FileInterface
Status Close() override { return stream_->Close(); }
Status Abort() override { return stream_->Abort(); }
Result<int64_t> Tell() const override { return stream_->TellOr(metadata_.size()); }
bool closed() const override { return stream_->closed(); }
Status Close() override {
ARROW_RETURN_NOT_OK(InitializeStream());
return stream_->Close();
}
Status Abort() override {
ARROW_RETURN_NOT_OK(InitializeStream());
return stream_->Abort();
}
Result<int64_t> Tell() const override {
ARROW_RETURN_NOT_OK(InitializeStream());
return stream_->TellOr(metadata_.size());
}
bool closed() const override {
auto status = InitializeStream();
if (!status.ok()) return true;
return stream_->closed();
}
//@}

//@{
// @name Readable
Result<int64_t> Read(int64_t nbytes, void* out) override {
ARROW_RETURN_NOT_OK(InitializeStream());
return stream_->Read(nbytes, out);
}
Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
ARROW_RETURN_NOT_OK(InitializeStream());
return stream_->Read(nbytes);
}
const arrow::io::IOContext& io_context() const override {
return stream_->io_context();
}
//@}

//@{
Expand Down Expand Up @@ -310,9 +319,16 @@ class GcsRandomAccessFile : public arrow::io::RandomAccessFile {
}

private:
Status InitializeStream() const {
if (!stream_) {
ARROW_ASSIGN_OR_RAISE(stream_, factory_(gcs::Generation(metadata_.generation()),
gcs::ReadFromOffset()));
}
return Status::OK();
}
InputStreamFactory factory_;
gcs::ObjectMetadata metadata_;
std::shared_ptr<GcsInputStream> stream_;
std::shared_ptr<GcsInputStream> mutable stream_;
};

google::cloud::Options AsGoogleCloudOptions(const GcsOptions& o) {
Expand Down Expand Up @@ -929,12 +945,9 @@ Result<std::shared_ptr<io::RandomAccessFile>> GcsFileSystem::OpenInputFile(
auto open_stream = [impl, p](gcs::Generation g, gcs::ReadFromOffset offset) {
return impl->OpenInputStream(p, g, offset);
};
ARROW_ASSIGN_OR_RAISE(auto stream,
impl_->OpenInputStream(p, gcs::Generation(metadata->generation()),
gcs::ReadFromOffset()));

return std::make_shared<GcsRandomAccessFile>(std::move(open_stream),
*std::move(metadata), std::move(stream));
*std::move(metadata));
}

Result<std::shared_ptr<io::RandomAccessFile>> GcsFileSystem::OpenInputFile(
Expand All @@ -947,16 +960,11 @@ Result<std::shared_ptr<io::RandomAccessFile>> GcsFileSystem::OpenInputFile(
ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(info.path()));
auto metadata = impl_->GetObjectMetadata(p);
ARROW_GCS_RETURN_NOT_OK(metadata.status());
auto impl = impl_;
auto open_stream = [impl, p](gcs::Generation g, gcs::ReadFromOffset offset) {
auto open_stream = [impl = impl_, p](gcs::Generation g, gcs::ReadFromOffset offset) {
return impl->OpenInputStream(p, g, offset);
};
ARROW_ASSIGN_OR_RAISE(auto stream,
impl_->OpenInputStream(p, gcs::Generation(metadata->generation()),
gcs::ReadFromOffset()));

return std::make_shared<GcsRandomAccessFile>(std::move(open_stream),
*std::move(metadata), std::move(stream));
*std::move(metadata));
}

Result<std::shared_ptr<io::OutputStream>> GcsFileSystem::OpenOutputStream(
Expand Down
16 changes: 16 additions & 0 deletions cpp/src/arrow/filesystem/gcsfs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1320,6 +1320,22 @@ TEST_F(GcsIntegrationTest, OpenInputFileRandomSeek) {
}
}

TEST_F(GcsIntegrationTest, OpenInputFileIoContext) {
auto fs = GcsFileSystem::Make(TestGcsOptions());

// Create a test file.
const auto path = PreexistingBucketPath() + "OpenInputFileIoContext/object-name";
std::shared_ptr<io::OutputStream> output;
ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {}));
const std::string contents = "The quick brown fox jumps over the lazy dog";
ASSERT_OK(output->Write(contents.data(), contents.size()));
ASSERT_OK(output->Close());

std::shared_ptr<io::RandomAccessFile> file;
ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(path));
EXPECT_EQ(fs->io_context().external_id(), file->io_context().external_id());
}

TEST_F(GcsIntegrationTest, OpenInputFileInfo) {
auto fs = GcsFileSystem::Make(TestGcsOptions());

Expand Down

0 comments on commit ec55a6d

Please sign in to comment.