Skip to content

Commit

Permalink
Extend Writer to run arbitrary actions
Browse files Browse the repository at this point in the history
  • Loading branch information
n0gu committed Feb 22, 2022
1 parent 6a0e371 commit e5ed73f
Showing 1 changed file with 6 additions and 11 deletions.
17 changes: 6 additions & 11 deletions rust/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! Unlike the transaction API on DeltaTable, this higher level writer will also write out the
//! parquet files

use crate::action::Txn;
use crate::action::Action;
use crate::DeltaTableError;
use arrow::record_batch::RecordBatch;
use log::*;
Expand All @@ -24,7 +24,7 @@ pub struct BufferedJsonWriter {
buffer: HashMap<WriterPartition, Vec<Value>>,
schema: arrow::datatypes::SchemaRef,
partitions: Vec<String>,
txns: Vec<Txn>,
actions: Vec<Action>,
}

impl BufferedJsonWriter {
Expand All @@ -42,7 +42,7 @@ impl BufferedJsonWriter {
schema,
buffer: HashMap::new(),
partitions: metadata.partition_columns,
txns: vec![],
actions: vec![],
})
}

Expand All @@ -52,8 +52,8 @@ impl BufferedJsonWriter {
}

/// Add a txn action to the buffer
pub fn record_txn(&mut self, txn: Txn) {
self.txns.push(txn);
pub fn record_action(&mut self, action: Action) {
self.actions.push(action);
}

/// Write a new Value into the buffer
Expand Down Expand Up @@ -127,12 +127,7 @@ impl BufferedJsonWriter {
}
}

dtx.add_actions(
self.txns
.drain(0..)
.map(crate::action::Action::txn)
.collect(),
);
dtx.add_actions(self.actions.drain(0..).collect());

dtx.commit(None).await?;
self.buffer.clear();
Expand Down

0 comments on commit e5ed73f

Please sign in to comment.