diff --git a/cpp/src/arrow/filesystem/gcsfs.cc b/cpp/src/arrow/filesystem/gcsfs.cc index ce11c0aa223cb..08099d94f96cb 100644 --- a/cpp/src/arrow/filesystem/gcsfs.cc +++ b/cpp/src/arrow/filesystem/gcsfs.cc @@ -247,32 +247,41 @@ using InputStreamFactory = std::function> class GcsRandomAccessFile : public arrow::io::RandomAccessFile { public: - GcsRandomAccessFile(InputStreamFactory factory, gcs::ObjectMetadata metadata, - std::shared_ptr stream) - : factory_(std::move(factory)), - metadata_(std::move(metadata)), - stream_(std::move(stream)) {} + GcsRandomAccessFile(InputStreamFactory factory, gcs::ObjectMetadata metadata) + : factory_(std::move(factory)), metadata_(std::move(metadata)) {} ~GcsRandomAccessFile() override = default; //@{ // @name FileInterface - Status Close() override { return stream_->Close(); } - Status Abort() override { return stream_->Abort(); } - Result Tell() const override { return stream_->TellOr(metadata_.size()); } - bool closed() const override { return stream_->closed(); } + Status Close() override { + ARROW_RETURN_NOT_OK(InitializeStream()); + return stream_->Close(); + } + Status Abort() override { + ARROW_RETURN_NOT_OK(InitializeStream()); + return stream_->Abort(); + } + Result Tell() const override { + ARROW_RETURN_NOT_OK(InitializeStream()); + return stream_->TellOr(metadata_.size()); + } + bool closed() const override { + auto status = InitializeStream(); + if (!status.ok()) return true; + return stream_->closed(); + } //@} //@{ // @name Readable Result Read(int64_t nbytes, void* out) override { + ARROW_RETURN_NOT_OK(InitializeStream()); return stream_->Read(nbytes, out); } Result> Read(int64_t nbytes) override { + ARROW_RETURN_NOT_OK(InitializeStream()); return stream_->Read(nbytes); } - const arrow::io::IOContext& io_context() const override { - return stream_->io_context(); - } //@} //@{ @@ -310,9 +319,16 @@ class GcsRandomAccessFile : public arrow::io::RandomAccessFile { } private: + Status InitializeStream() const { + if (!stream_) { + ARROW_ASSIGN_OR_RAISE(stream_, factory_(gcs::Generation(metadata_.generation()), + gcs::ReadFromOffset())); + } + return Status::OK(); + } InputStreamFactory factory_; gcs::ObjectMetadata metadata_; - std::shared_ptr stream_; + std::shared_ptr mutable stream_; }; google::cloud::Options AsGoogleCloudOptions(const GcsOptions& o) { @@ -929,12 +945,9 @@ Result> GcsFileSystem::OpenInputFile( auto open_stream = [impl, p](gcs::Generation g, gcs::ReadFromOffset offset) { return impl->OpenInputStream(p, g, offset); }; - ARROW_ASSIGN_OR_RAISE(auto stream, - impl_->OpenInputStream(p, gcs::Generation(metadata->generation()), - gcs::ReadFromOffset())); return std::make_shared(std::move(open_stream), - *std::move(metadata), std::move(stream)); + *std::move(metadata)); } Result> GcsFileSystem::OpenInputFile( @@ -947,16 +960,11 @@ Result> GcsFileSystem::OpenInputFile( ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(info.path())); auto metadata = impl_->GetObjectMetadata(p); ARROW_GCS_RETURN_NOT_OK(metadata.status()); - auto impl = impl_; - auto open_stream = [impl, p](gcs::Generation g, gcs::ReadFromOffset offset) { + auto open_stream = [impl = impl_, p](gcs::Generation g, gcs::ReadFromOffset offset) { return impl->OpenInputStream(p, g, offset); }; - ARROW_ASSIGN_OR_RAISE(auto stream, - impl_->OpenInputStream(p, gcs::Generation(metadata->generation()), - gcs::ReadFromOffset())); - return std::make_shared(std::move(open_stream), - *std::move(metadata), std::move(stream)); + *std::move(metadata)); } Result> GcsFileSystem::OpenOutputStream( diff --git a/cpp/src/arrow/filesystem/gcsfs_test.cc b/cpp/src/arrow/filesystem/gcsfs_test.cc index fb14f7b385d4f..af22087530201 100644 --- a/cpp/src/arrow/filesystem/gcsfs_test.cc +++ b/cpp/src/arrow/filesystem/gcsfs_test.cc @@ -1320,6 +1320,22 @@ TEST_F(GcsIntegrationTest, OpenInputFileRandomSeek) { } } +TEST_F(GcsIntegrationTest, OpenInputFileIoContext) { + auto fs = GcsFileSystem::Make(TestGcsOptions()); + + // Create a test file. + const auto path = PreexistingBucketPath() + "OpenInputFileIoContext/object-name"; + std::shared_ptr output; + ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {})); + const std::string contents = "The quick brown fox jumps over the lazy dog"; + ASSERT_OK(output->Write(contents.data(), contents.size())); + ASSERT_OK(output->Close()); + + std::shared_ptr file; + ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(path)); + EXPECT_EQ(fs->io_context().external_id(), file->io_context().external_id()); +} + TEST_F(GcsIntegrationTest, OpenInputFileInfo) { auto fs = GcsFileSystem::Make(TestGcsOptions());