Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: buffered parquet writer #1263

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ mod tests {
use std::io::Write;
use std::time::Duration;

use common_base::readable_size::ReadableSize;
use common_test_util::temp_dir::create_named_temp_file;
use datanode::datanode::{CompactionConfig, ObjectStoreConfig, RegionManifestConfig};
use servers::Mode;
Expand Down Expand Up @@ -266,6 +267,7 @@ mod tests {
max_inflight_tasks: 3,
max_files_in_level0: 7,
max_purge_tasks: 32,
sst_write_buffer_size: ReadableSize::mb(8),
},
options.storage.compaction,
);
Expand Down
4 changes: 4 additions & 0 deletions src/common/base/src/readable_size.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ impl ReadableSize {
pub const fn as_mb(self) -> u64 {
self.0 / MIB
}

pub const fn as_bytes(self) -> u64 {
self.0
}
}

impl Div<u64> for ReadableSize {
Expand Down
4 changes: 4 additions & 0 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ pub struct CompactionConfig {
pub max_files_in_level0: usize,
/// Max task number for SST purge task after compaction.
pub max_purge_tasks: usize,
/// Buffer threshold while writing SST files
pub sst_write_buffer_size: ReadableSize,
}

impl Default for CompactionConfig {
Expand All @@ -158,6 +160,7 @@ impl Default for CompactionConfig {
max_inflight_tasks: 4,
max_files_in_level0: 8,
max_purge_tasks: 32,
sst_write_buffer_size: ReadableSize::mb(8),
}
}
}
Expand All @@ -177,6 +180,7 @@ impl From<&DatanodeOptions> for StorageEngineConfig {
manifest_gc_duration: value.storage.manifest.gc_duration,
max_files_in_l0: value.storage.compaction.max_files_in_level0,
max_purge_tasks: value.storage.compaction.max_purge_tasks,
sst_write_buffer_size: value.storage.compaction.sst_write_buffer_size,
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,12 +439,6 @@ pub enum Error {
backtrace: Backtrace,
},

#[snafu(display("Failed to write parquet file, source: {}", source))]
WriteParquet {
source: parquet::errors::ParquetError,
backtrace: Backtrace,
},

#[snafu(display("Failed to poll stream, source: {}", source))]
PollStream {
source: datafusion_common::DataFusionError,
Expand Down Expand Up @@ -514,6 +508,12 @@ pub enum Error {
#[snafu(backtrace)]
source: BoxedError,
},

#[snafu(display("Failed to copy table to parquet file, source: {}", source))]
v0y4g3r marked this conversation as resolved.
Show resolved Hide resolved
WriteParquet {
#[snafu(backtrace)]
source: storage::error::Error,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down
108 changes: 11 additions & 97 deletions src/datanode/src/sql/copy_table_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::pin::Pin;

use common_datasource;
use common_datasource::object_store::{build_backend, parse_url};
use common_query::physical_plan::SessionContext;
use common_query::Output;
use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
use datafusion::parquet::arrow::ArrowWriter;
use datafusion::parquet::basic::{Compression, Encoding, ZstdLevel};
use datafusion::parquet::file::properties::WriterProperties;
use datafusion::physical_plan::RecordBatchStream;
use futures::TryStreamExt;
use object_store::ObjectStore;
use snafu::ResultExt;
use storage::sst::SstInfo;
use storage::{ParquetWriter, Source};
use table::engine::TableReference;
use table::requests::CopyTableRequest;

use crate::error::{self, Result};
use crate::error::{self, Result, WriteParquetSnafu};
use crate::sql::SqlHandler;

impl SqlHandler {
Expand All @@ -51,99 +44,20 @@ impl SqlHandler {
let stream = stream
.execute(0, SessionContext::default().task_ctx())
.context(error::TableScanExecSnafu)?;
let stream = Box::pin(DfRecordBatchStreamAdapter::new(stream));

let (_schema, _host, path) = parse_url(&req.location).context(error::ParseUrlSnafu)?;
let object_store =
build_backend(&req.location, req.connection).context(error::BuildBackendSnafu)?;

let mut parquet_writer = ParquetWriter::new(path.to_string(), stream, object_store);
// TODO(jiachun):
// For now, COPY is implemented synchronously.
// When copying large table, it will be blocked for a long time.
// Maybe we should make "copy" runs in background?
// Like PG: https://www.postgresql.org/docs/current/sql-copy.html
let rows = parquet_writer.flush().await?;

Ok(Output::AffectedRows(rows))
}
}

type DfRecordBatchStream = Pin<Box<DfRecordBatchStreamAdapter>>;

struct ParquetWriter {
file_name: String,
stream: DfRecordBatchStream,
object_store: ObjectStore,
max_row_group_size: usize,
max_rows_in_segment: usize,
}
let writer = ParquetWriter::new(&path, Source::Stream(stream), object_store);

impl ParquetWriter {
pub fn new(file_name: String, stream: DfRecordBatchStream, object_store: ObjectStore) -> Self {
Self {
file_name,
stream,
object_store,
// TODO(jiachun): make these configurable: WITH (max_row_group_size=xxx, max_rows_in_segment=xxx)
max_row_group_size: 4096,
max_rows_in_segment: 5000000, // default 5M rows per segment
}
}

pub async fn flush(&mut self) -> Result<usize> {
let schema = self.stream.as_ref().schema();
let writer_props = WriterProperties::builder()
.set_compression(Compression::ZSTD(ZstdLevel::default()))
.set_encoding(Encoding::PLAIN)
.set_max_row_group_size(self.max_row_group_size)
.build();
let mut total_rows = 0;
loop {
let mut buf = vec![];
let mut arrow_writer =
ArrowWriter::try_new(&mut buf, schema.clone(), Some(writer_props.clone()))
.context(error::WriteParquetSnafu)?;

let mut rows = 0;
let mut end_loop = true;
// TODO(hl & jiachun): Since OpenDAL's writer is async and ArrowWriter requires a `std::io::Write`,
// here we use a Vec<u8> to buffer all parquet bytes in memory and write to object store
// at a time. Maybe we should find a better way to bridge ArrowWriter and OpenDAL's object.
while let Some(batch) = self
.stream
.try_next()
.await
.context(error::PollStreamSnafu)?
{
arrow_writer
.write(&batch)
.context(error::WriteParquetSnafu)?;
rows += batch.num_rows();
if rows >= self.max_rows_in_segment {
end_loop = false;
break;
}
}

let start_row_num = total_rows + 1;
total_rows += rows;
arrow_writer.close().context(error::WriteParquetSnafu)?;

// if rows == 0, we just end up with an empty file.
//
// file_name like:
// "file_name_1_1000000" (row num: 1 ~ 1000000),
// "file_name_1000001_xxx" (row num: 1000001 ~ xxx)
let file_name = format!("{}_{}_{}", self.file_name, start_row_num, total_rows);
self.object_store
.write(&file_name, buf)
.await
.context(error::WriteObjectSnafu { path: file_name })?;
let rows_copied = writer
.write_sst(&storage::sst::WriteOptions::default())
.await
.context(WriteParquetSnafu)?
.map(|SstInfo { num_rows, .. }| num_rows)
.unwrap_or(0);

if end_loop {
return Ok(total_rows);
}
}
Ok(Output::AffectedRows(rows_copied))
}
}
2 changes: 1 addition & 1 deletion src/object-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub use opendal::raw::normalize_path as raw_normalize_path;
pub use opendal::raw::oio::Pager;
pub use opendal::{
layers, services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, Metakey,
Operator as ObjectStore, Result,
Operator as ObjectStore, Result, Writer,
};

pub mod cache_policy;
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/compaction/picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ impl<S: LogStore> Picker for SimplePicker<S> {
wal: req.wal.clone(),
manifest: req.manifest.clone(),
expired_ssts,
sst_write_buffer_size: req.sst_write_buffer_size,
}));
}

Expand Down
3 changes: 3 additions & 0 deletions src/storage/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::Arc;
use std::time::Duration;

use common_base::readable_size::ReadableSize;
use common_telemetry::{debug, error, info};
use store_api::logstore::LogStore;
use store_api::storage::RegionId;
Expand Down Expand Up @@ -60,6 +61,8 @@ pub struct CompactionRequestImpl<S: LogStore> {
pub ttl: Option<Duration>,
/// Compaction result sender.
pub sender: Option<Sender<Result<()>>>,

pub sst_write_buffer_size: ReadableSize,
}

impl<S: LogStore> CompactionRequestImpl<S> {
Expand Down
16 changes: 11 additions & 5 deletions src/storage/src/compaction/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::HashSet;
use std::fmt::{Debug, Formatter};

use common_base::readable_size::ReadableSize;
use common_telemetry::{error, info};
use store_api::logstore::LogStore;
use store_api::storage::RegionId;
Expand Down Expand Up @@ -46,6 +47,7 @@ pub struct CompactionTaskImpl<S: LogStore> {
pub wal: Wal<S>,
pub manifest: RegionManifest,
pub expired_ssts: Vec<FileHandle>,
pub sst_write_buffer_size: ReadableSize,
}

impl<S: LogStore> Debug for CompactionTaskImpl<S> {
Expand All @@ -71,14 +73,14 @@ impl<S: LogStore> CompactionTaskImpl<S> {
for output in self.outputs.drain(..) {
let schema = self.schema.clone();
let sst_layer = self.sst_layer.clone();
let sst_write_buffer_size = self.sst_write_buffer_size;
compacted_inputs.extend(output.inputs.iter().map(FileHandle::meta));

// TODO(hl): Maybe spawn to runtime to exploit in-job parallelism.
futs.push(async move {
match output.build(region_id, schema, sst_layer).await {
Ok(meta) => Ok(meta),
Err(e) => Err(e),
}
output
.build(region_id, schema, sst_layer, sst_write_buffer_size)
.await
});
}

Expand Down Expand Up @@ -172,6 +174,7 @@ impl CompactionOutput {
region_id: RegionId,
schema: RegionSchemaRef,
sst_layer: AccessLayerRef,
sst_write_buffer_size: ReadableSize,
) -> Result<Option<FileMeta>> {
let reader = build_sst_reader(
schema,
Expand All @@ -183,7 +186,9 @@ impl CompactionOutput {
.await?;

let output_file_id = FileId::random();
let opts = WriteOptions {};
let opts = WriteOptions {
sst_write_buffer_size,
};

Ok(sst_layer
.write_sst(output_file_id, Source::Reader(reader), &opts)
Expand All @@ -192,6 +197,7 @@ impl CompactionOutput {
|SstInfo {
time_range,
file_size,
..
}| FileMeta {
region_id,
file_id: output_file_id,
Expand Down
6 changes: 5 additions & 1 deletion src/storage/src/compaction/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ mod tests {
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

use common_base::readable_size::ReadableSize;
use common_test_util::temp_dir::create_temp_dir;
use common_time::Timestamp;
use datatypes::prelude::{LogicalTypeId, ScalarVector, ScalarVectorBuilder};
Expand Down Expand Up @@ -224,6 +225,7 @@ mod tests {
let SstInfo {
time_range,
file_size,
..
} = writer
.write_sst(&sst::WriteOptions::default())
.await
Expand Down Expand Up @@ -411,7 +413,9 @@ mod tests {
.await
.unwrap();

let opts = WriteOptions {};
let opts = WriteOptions {
sst_write_buffer_size: ReadableSize::mb(8),
};
let s1 = ParquetWriter::new(
&output_file_ids[0].as_parquet(),
Source::Reader(reader1),
Expand Down
4 changes: 4 additions & 0 deletions src/storage/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@

use std::time::Duration;

use common_base::readable_size::ReadableSize;

#[derive(Debug, Clone)]
pub struct EngineConfig {
pub manifest_checkpoint_margin: Option<u16>,
pub manifest_gc_duration: Option<Duration>,
pub max_files_in_l0: usize,
pub max_purge_tasks: usize,
pub sst_write_buffer_size: ReadableSize,
}

impl Default for EngineConfig {
Expand All @@ -31,6 +34,7 @@ impl Default for EngineConfig {
manifest_gc_duration: Some(Duration::from_secs(30)),
max_files_in_l0: 8,
max_purge_tasks: 32,
sst_write_buffer_size: ReadableSize::mb(8),
}
}
}
2 changes: 1 addition & 1 deletion src/storage/src/file_purger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ mod tests {
let sst_path = "table1";
let layer = Arc::new(FsAccessLayer::new(sst_path, os.clone()));
let sst_info = layer
.write_sst(sst_file_id, Source::Iter(iter), &WriteOptions {})
.write_sst(sst_file_id, Source::Iter(iter), &WriteOptions::default())
.await
.unwrap()
.unwrap();
Expand Down
Loading