-
Notifications
You must be signed in to change notification settings - Fork 35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[write stage0] add Transaction + commit path #370
base: main
Are you sure you want to change the base?
Conversation
8c11dc0
to
740d112
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #370 +/- ##
==========================================
+ Coverage 76.86% 77.19% +0.32%
==========================================
Files 47 48 +1
Lines 9436 9637 +201
Branches 9436 9637 +201
==========================================
+ Hits 7253 7439 +186
- Misses 1789 1794 +5
- Partials 394 404 +10 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no changes, just moved from kernel/src/transaction.rs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now in #386
f1dcda9
to
5779f06
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some style nits
&self, | ||
path: &Url, | ||
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send + '_>, | ||
overwrite: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When you say "overwrite isn't used for the existing commit flow", does that mean that overwrite will not sidestep the PutIfAbsent? If so, I think that could be clarified a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm. all i mean to say is that we don't actually use this param yet. (will be used in the future to support writing _last_checkpoint
)
kernel/src/transaction.rs
Outdated
let action_schema = Arc::new(engine_commit_info.as_ref().map_or( | ||
get_log_schema().clone(), | ||
|commit_info| { | ||
let mut action_fields = get_log_schema().fields().collect::<Vec<_>>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: This is a bit much, but if fields()
's signature was changed to return a DoubleEndedIterator
, you can also avoid materializing this, use next_back
, and remove the into_iter
down below. Tho I imagine action_fields
is pretty small anyway.
let mut action_fields = get_log_schema().fields();
let commit_info_field = action_fields
.next_back()
.expect("last field is commit_info in action schema");
//...
let action_fields = action_fields.cloned().chain(std::iter::once(
crate::schema::StructField::new(COMMIT_INFO_NAME, commit_info_schema, true),
));
StructType::new(action_fields.collect())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
haha nice I like this and will keep open to consider in the future but probably punting on it in this PR?
Ok(Table::new(table_path)) | ||
} | ||
|
||
#[tokio::test] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can setup tracing using #[test_log::test(tokio::test)]
and avoid the try_init. Relevant docs for test_log
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm I'm curious on peoples' thoughts on this.. seems like the init is a simple step but maybe it is nice to use test_log? I haven't been using it since it feels clearer for me to use typical #[tokio::test]
and just to manual init
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flushing comments at EOD... will try to finish the review ASAP.
.put_opts( | ||
&Path::from(path.path()), | ||
buffer.into(), | ||
object_store::PutMode::Create.into(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aside: Does object_store support the new S3 put-if-absent capability?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good question! theres a recent issue but I haven't followed up much yet on it: apache/arrow-rs#6285
fn write_json_file( | ||
&self, | ||
path: &url::Url, | ||
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send + '_>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, why does this need a lifetime specification?
It doesn't outlive the method call and isn't ever passed to any async code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also: can we get away with this as a workaround?
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send + '_>, | |
data: impl Iterator<Item = Box<dyn EngineData>> + Send, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lifetime specification is since it defaults to 'static
and we don't want to impose that requirement. seemed odd to me that it would default to static but didn't take too much time to dive into it yet. and secondarily I think if we use impl Iterator
it causes the trait not to be object-safe
Some(Transaction { | ||
app_id: "my-app2".to_owned(), | ||
version: 2, | ||
last_updated: None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aside: I'm surprised cargo fmt
didn't add a trailing comma here (and again below)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interesting. just reran it looks like you're right: it doesnt
impl std::fmt::Debug for Transaction { | ||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
f.write_str(&format!( | ||
"Transaction {{ read_snapshot version: {}, commit_info: {} }}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aside: Does this handle {:#?}
formatting?
(I've been wondering how custom Debug
would work in that case, ever since learning about it)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this specific implementation doesn't but it is all handled in the Debug
trait via Formatter's alternate()
. if we wanted to handle pretty printing it would be something like
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if f.alternate() {
// Pretty-print formatting when using {:#?}
} else {
// Normal formatting
f.write_str(...);
}
}
kernel/src/transaction.rs
Outdated
/// Create a new transaction from a snapshot. The snapshot will be used to read the current | ||
/// state of the table (e.g. to read the current version). | ||
/// | ||
/// Instead of using this API, the more typical API is | ||
/// [Table::new_transaction](crate::table::Table::new_transaction) to create a transaction from | ||
/// a table automatically backed by the latest snapshot. | ||
pub fn new(snapshot: impl Into<Arc<Snapshot>>) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't want people to use it, can we just make it private?
/// Create a new transaction from a snapshot. The snapshot will be used to read the current | |
/// state of the table (e.g. to read the current version). | |
/// | |
/// Instead of using this API, the more typical API is | |
/// [Table::new_transaction](crate::table::Table::new_transaction) to create a transaction from | |
/// a table automatically backed by the latest snapshot. | |
pub fn new(snapshot: impl Into<Arc<Snapshot>>) -> Self { | |
/// Create a new transaction from a snapshot. The snapshot will be used to read the current | |
/// state of the table (e.g. to read the current version). | |
fn new(snapshot: impl Into<Arc<Snapshot>>) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure probably makes sense to implement a smaller API for now and open this up if people ask for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(made pub(crate)
)
Co-authored-by: Ryan Johnson <scovich@users.noreply.github.com>
This PR does 4 main things:
reorganizeEDIT now in movetransaction.rs
so that the transaction action is now moved to actions moduletransaction
module intoactions/
and rename toset_transaction
#386Transaction
API which includes:a.
Table.new_transaction()
b.
Transaction.commit_info(data, schema)
c.
Transaction.commit() // consumes transaction
write_json_file(impl Iterator<Item = Box<dyn EngineData>>)
(and a default engine implementation for this)write.rs
to house many of our write tests as it's implementedresolves #378