Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] Does the dataset API support compression & appending to existing parquet files? #36834

Closed
zenithyr opened this issue Jul 24, 2023 · 5 comments
Labels
Component: C++ Type: usage Issue is a user question

Comments

@zenithyr
Copy link

Describe the usage question you have. Please include as many useful details as possible.

I am streaming some time series data to parquet files with the following aspects:

  • I write the data out batch by batch in order to save memory footprint
  • The data can optionally partitioned by {hour, minute}.

I have been using the parquet::arrow::FileWriter::WriteRecordBatch, which can append batch to existing parquet files but doesn't support partition.

So I tried the Dataset API. It seems that

  • ExistingDataBehavior can only overwrite or delete existing files, instead of appending
  • I don't know how to turn on the Snappy compression for the dataset file writer
arrow::Status append_a_batch(const std::shared_ptr<fs::FileSystem>& filesystem, const std::shared_ptr<arrow::RecordBatch>& batch) {
    // Write using the Dataset API.
    std::vector<std::shared_ptr<arrow::RecordBatch>> batches{batch};
    auto dataset = std::make_shared<ds::InMemoryDataset>(batch->schema(), batches);
    ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan());
    ARROW_ASSIGN_OR_RAISE(auto scanner, scanner_builder->Finish());
    // Deliberately not partition the data, in order to append to the same file.
    auto partition_schema = arrow::schema({});//arrow::schema({arrow::field("part", arrow::utf8())});

    auto partitioning = std::make_shared<ds::HivePartitioning>(partition_schema);

    auto format = std::make_shared<ds::ParquetFileFormat>();

    // Q1. how to enable compression?
    ds::FileSystemDatasetWriteOptions write_options;
    write_options.file_write_options = format->DefaultWriteOptions();
    write_options.filesystem = filesystem;
    write_options.base_dir = base_dir;
    write_options.partitioning = partitioning;
    write_options.basename_template = "part{i}.parquet";

    // Q2. how to append to existing files?
    write_options.existing_data_behavior = ds::ExistingDataBehavior::kDeleteMatchingPartitions;
    ARROW_RETURN_NOT_OK(ds::FileSystemDataset::Write(write_options, scanner));
}

arrow::Status main() {
    ARROW_ASSIGN_OR_RAISE(auto filesystem, fs::FileSystemFromUriOrPath(base_dir));
    ARROW_RETURN_NOT_OK(filesystem->CreateDir("/tmp/sample"));
    auto& batch = some_code_generating_one_batch();

    // writes to /tmp/sample/part0.parquet
    append_a_batch(filesystem, batch); 

    // can't append to /tmp/sample/part0.parquet
    append_a_batch(filesystem, batch); 
}

If that's not possible, I have to wait until I have a full partition in memory before flushing to the file, which uses more memory.

Please advise. Thanks.

Component(s)

C++

@zenithyr zenithyr added the Type: usage Issue is a user question label Jul 24, 2023
@mapleFU
Copy link
Member

mapleFU commented Jul 24, 2023

Just answer the second question, for compression:

class ARROW_DS_EXPORT ParquetFileWriteOptions : public FileWriteOptions {
 public:
  /// \brief Parquet writer properties.
  std::shared_ptr<parquet::WriterProperties> writer_properties;

  /// \brief Parquet Arrow writer properties.
  std::shared_ptr<parquet::ArrowWriterProperties> arrow_writer_properties;

 protected:
  explicit ParquetFileWriteOptions(std::shared_ptr<FileFormat> format)
      : FileWriteOptions(std::move(format)) {}

  friend class ParquetFileFormat;
};

Maybe you can config compression in parquet::WriterProperties.

@zenithyr
Copy link
Author

zenithyr commented Jul 24, 2023

Just answer the second question, for compression:

class ARROW_DS_EXPORT ParquetFileWriteOptions : public FileWriteOptions {
 public:
  /// \brief Parquet writer properties.
  std::shared_ptr<parquet::WriterProperties> writer_properties;

  /// \brief Parquet Arrow writer properties.
  std::shared_ptr<parquet::ArrowWriterProperties> arrow_writer_properties;

 protected:
  explicit ParquetFileWriteOptions(std::shared_ptr<FileFormat> format)
      : FileWriteOptions(std::move(format)) {}

  friend class ParquetFileFormat;
};

Maybe you can config compression in parquet::WriterProperties.

Much appreciated!
Configured compression below seems working.

ds::FileSystemDatasetWriteOptions write_options;

auto format = std::make_shared<ds::ParquetFileFormat>();
auto pq_options = std::dynamic_pointer_cast<arrow::dataset::ParquetFileWriteOptions>(format->DefaultWriteOptions());
pq_options->writer_properties = parquet::WriterProperties::Builder()
        .created_by("1.0")
        ->compression(arrow::Compression::SNAPPY)
        ->build();

write_options.file_write_options = pq_options;
write_options.filesystem = filesystem;
write_options.base_dir = base_dir;
write_options.partitioning = partitioning;
write_options.basename_template = "part{i}.parquet";

Still don't know if Q1 is possible.

@mapleFU
Copy link
Member

mapleFU commented Jul 24, 2023

As for Q1, I guess you can:

  1. TableScan(using dataset) with Partition
  2. TableSink(using dataset) with Partition

But I don't know if there are more convinient way to solve it

@westonpace
Copy link
Member

westonpace commented Jul 24, 2023

TableScan(using dataset) with Partition
TableSink(using dataset) with Partition

But I don't know if there are more convinient way to solve it

This would be the easiest way I think. It will require you to create a plan (scan -> write).

Otherwise you can use arrow::dataset::internal::DatasetWriter directly but it's not part of the public API and so there are no real guarantees for backwards compatibility (although there are no plans to change it at the moment that I am aware of).

Also, partitioning happens as part of the write node, and not part of the dataset writer. So you would need to implement that yourself as well. Much better to create a query plan combining the scan and the write I think. I will see if we have any examples.

@westonpace
Copy link
Member

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Component: C++ Type: usage Issue is a user question
Projects
None yet
Development

No branches or pull requests

3 participants