From ec55a6d4b0607e44519327f91427c093f5d9a14a Mon Sep 17 00:00:00 2001 From: Carlos O'Ryan Date: Tue, 7 Feb 2023 00:11:35 +0000 Subject: [PATCH] GH-34051: [C++] GcsFileSystem lazily starts sequential reads (#34052) `OpenInputFile()` returns a `io::RandomAccessFile` which supports sequential reads as well as random access reads. The previous implementation eagerly started a sequential read, but many applications do not use that aspect of the API. Because GCS has fairly high latency, this can slow down applications that are only going to read data using `ReadAt()`. This includes applications using Parquet files via PyArrow. Fixes #34051 ### What changes are included in this PR? Change the GcsFileSystem class to lazily start the download used to implement the `io::InputFile` APIs. ### Are these changes tested? I think so: the existing tests cover the affected functions. ### Are there any user-facing changes? No. * Closes: #34051 Authored-by: Carlos O'Ryan Signed-off-by: Sutou Kouhei --- cpp/src/arrow/filesystem/gcsfs.cc | 56 +++++++++++++++----------- cpp/src/arrow/filesystem/gcsfs_test.cc | 16 ++++++++ 2 files changed, 48 insertions(+), 24 deletions(-) diff --git a/cpp/src/arrow/filesystem/gcsfs.cc b/cpp/src/arrow/filesystem/gcsfs.cc index ce11c0aa223cb..08099d94f96cb 100644 --- a/cpp/src/arrow/filesystem/gcsfs.cc +++ b/cpp/src/arrow/filesystem/gcsfs.cc @@ -247,32 +247,41 @@ using InputStreamFactory = std::function> class GcsRandomAccessFile : public arrow::io::RandomAccessFile { public: - GcsRandomAccessFile(InputStreamFactory factory, gcs::ObjectMetadata metadata, - std::shared_ptr stream) - : factory_(std::move(factory)), - metadata_(std::move(metadata)), - stream_(std::move(stream)) {} + GcsRandomAccessFile(InputStreamFactory factory, gcs::ObjectMetadata metadata) + : factory_(std::move(factory)), metadata_(std::move(metadata)) {} ~GcsRandomAccessFile() override = default; //@{ // @name FileInterface - Status Close() override { return stream_->Close(); } - Status Abort() override { return stream_->Abort(); } - Result Tell() const override { return stream_->TellOr(metadata_.size()); } - bool closed() const override { return stream_->closed(); } + Status Close() override { + ARROW_RETURN_NOT_OK(InitializeStream()); + return stream_->Close(); + } + Status Abort() override { + ARROW_RETURN_NOT_OK(InitializeStream()); + return stream_->Abort(); + } + Result Tell() const override { + ARROW_RETURN_NOT_OK(InitializeStream()); + return stream_->TellOr(metadata_.size()); + } + bool closed() const override { + auto status = InitializeStream(); + if (!status.ok()) return true; + return stream_->closed(); + } //@} //@{ // @name Readable Result Read(int64_t nbytes, void* out) override { + ARROW_RETURN_NOT_OK(InitializeStream()); return stream_->Read(nbytes, out); } Result> Read(int64_t nbytes) override { + ARROW_RETURN_NOT_OK(InitializeStream()); return stream_->Read(nbytes); } - const arrow::io::IOContext& io_context() const override { - return stream_->io_context(); - } //@} //@{ @@ -310,9 +319,16 @@ class GcsRandomAccessFile : public arrow::io::RandomAccessFile { } private: + Status InitializeStream() const { + if (!stream_) { + ARROW_ASSIGN_OR_RAISE(stream_, factory_(gcs::Generation(metadata_.generation()), + gcs::ReadFromOffset())); + } + return Status::OK(); + } InputStreamFactory factory_; gcs::ObjectMetadata metadata_; - std::shared_ptr stream_; + std::shared_ptr mutable stream_; }; google::cloud::Options AsGoogleCloudOptions(const GcsOptions& o) { @@ -929,12 +945,9 @@ Result> GcsFileSystem::OpenInputFile( auto open_stream = [impl, p](gcs::Generation g, gcs::ReadFromOffset offset) { return impl->OpenInputStream(p, g, offset); }; - ARROW_ASSIGN_OR_RAISE(auto stream, - impl_->OpenInputStream(p, gcs::Generation(metadata->generation()), - gcs::ReadFromOffset())); return std::make_shared(std::move(open_stream), - *std::move(metadata), std::move(stream)); + *std::move(metadata)); } Result> GcsFileSystem::OpenInputFile( @@ -947,16 +960,11 @@ Result> GcsFileSystem::OpenInputFile( ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(info.path())); auto metadata = impl_->GetObjectMetadata(p); ARROW_GCS_RETURN_NOT_OK(metadata.status()); - auto impl = impl_; - auto open_stream = [impl, p](gcs::Generation g, gcs::ReadFromOffset offset) { + auto open_stream = [impl = impl_, p](gcs::Generation g, gcs::ReadFromOffset offset) { return impl->OpenInputStream(p, g, offset); }; - ARROW_ASSIGN_OR_RAISE(auto stream, - impl_->OpenInputStream(p, gcs::Generation(metadata->generation()), - gcs::ReadFromOffset())); - return std::make_shared(std::move(open_stream), - *std::move(metadata), std::move(stream)); + *std::move(metadata)); } Result> GcsFileSystem::OpenOutputStream( diff --git a/cpp/src/arrow/filesystem/gcsfs_test.cc b/cpp/src/arrow/filesystem/gcsfs_test.cc index fb14f7b385d4f..af22087530201 100644 --- a/cpp/src/arrow/filesystem/gcsfs_test.cc +++ b/cpp/src/arrow/filesystem/gcsfs_test.cc @@ -1320,6 +1320,22 @@ TEST_F(GcsIntegrationTest, OpenInputFileRandomSeek) { } } +TEST_F(GcsIntegrationTest, OpenInputFileIoContext) { + auto fs = GcsFileSystem::Make(TestGcsOptions()); + + // Create a test file. + const auto path = PreexistingBucketPath() + "OpenInputFileIoContext/object-name"; + std::shared_ptr output; + ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {})); + const std::string contents = "The quick brown fox jumps over the lazy dog"; + ASSERT_OK(output->Write(contents.data(), contents.size())); + ASSERT_OK(output->Close()); + + std::shared_ptr file; + ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(path)); + EXPECT_EQ(fs->io_context().external_id(), file->io_context().external_id()); +} + TEST_F(GcsIntegrationTest, OpenInputFileInfo) { auto fs = GcsFileSystem::Make(TestGcsOptions());