-
Notifications
You must be signed in to change notification settings - Fork 433
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: rewrite operations #852
Conversation
) | ||
}; | ||
|
||
// TODO configure more permissive versions based on configuration. Also how should this ideally be handled? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should write a function that looks at a list of actions and detects which features are used, then determine the protocol versions. It will be much easier if that logic is the responsibility of a single function than spread out.
I think for updates we only need to run that function on new actions and merge the result with the existing protocol versions on the table. There's probably also some concurrency resolution that needs to happen too, but I haven't yet thought about that part much..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might even say we should let clients set the protocol; we should consider that the responsibility of the library. What do you think? Are there use cases where they should set it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you are right - having the user choose the protocol and defaulting to "what spark uses" seems like the way to go. WE may in that case have to check option compatibility though?
if table.object_store().is_delta_table_location().await? { | ||
match mode { | ||
SaveMode::ErrorIfExists => return Err(CreateError::TableAlreadyExists.into()), | ||
SaveMode::Append => return Err(CreateError::AppendNotAllowed.into()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is append not allowed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's what spark does :)
rust/src/operations/create.rs
Outdated
SaveMode::Overwrite => { | ||
let curr_files = | ||
flatten_list_stream(table.object_store().as_ref(), None).await?; | ||
table.object_store().delete_batch(&curr_files).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why delete the current files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well .. in this case I was not sure, but conceptually I felt we are not overwriting the data in a table and creating a new version, but creating an entirely new table at version 0. If we were to support also updating table metadata, schema etc via this route, I guess there is more work to be done here validating all changes.
But this one I was definitely unsure about.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had a look at the spark implementation - seems they agree with you and updating the metadata and evoling the table is the way to go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the metadata update is a larger operation, I opted for raising not implemented for now in this PR. I opened #917 to track this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really like these new Builder APIs so far.
I do still like the idea of a having a three-tier API: (action-based, engine-agnostic, DataFusion-based). I think for clarity, it would be best to have those in separate modules of the crate. So, for example, I don't think the Create command and the Load/Write commands belong in the same module, since the first you pass in actions (low-level) while the others are more high-level and deal with data. I'll think about this some more though, since I'm not 100% sure this makes sense.
Okay I think I mis-read the purpose of Create: it's not a low-level API, it's just to create a table, so there's isn't any data writing interaction involved. So we should just think of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm only halfway through and will try to wrap up reviewing tomorrow. Have some initial comments / suggestions.
/// Create a new [`DeltaOps`] instance, backed by an un-initialized in memory table | ||
/// | ||
/// Using this will not persist any changes beyond the lifetime of the table object. | ||
/// THe main purpose of in-memory tables is for use in testing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// THe main purpose of in-memory tables is for use in testing. | |
/// The main purpose of in-memory tables is for use in testing. |
/// let ops = DeltaOps::new_in_memory(); | ||
/// ``` | ||
#[must_use] | ||
pub fn new_in_memory() -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very cool!
/// | ||
/// let ops = DeltaOps::new_in_memory(); | ||
/// ``` | ||
#[must_use] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is new to me. Why #[must_use]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also only fairly recently learned about must use. The idea is, if you do not consume the result of this call, clippy (or even the compiler) will complain. which to me makes sense, since if not using the return it literally does nothing and should be removed.
I think futures for instance may also be must_use since if not consumed they also so nothing ...
#[error("Tried committing existing table version: {0}")] | ||
VersionAlreadyExists(DeltaDataTypeVersion), | ||
|
||
/// Error returned when reading the delta log object failed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this description accurate? It's duplicated below, and reading
and serializing seem like opposites.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Finished looking through, and few more comments mostly around clean up and follow up issues.
/// Low-level transaction API. Creates a temporary commit file. Once created, | ||
/// the transaction object could be dropped and the actual commit could be executed | ||
/// with `DeltaTable.try_commit_transaction`. | ||
async fn prepare_commit( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this meant to replace DeltaTransaction::PrepareCommit
? Or is it different?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is correct. I wanted to iterate a bit more on commits (as well as the writer) before adopting it in the main code paths. Next I wanted to finally look into the conflict resolution again, where I expect more changes to the commit behavior.
} | ||
} | ||
|
||
impl ExecutionPlan for WriteCommand { | ||
fn as_any(&self) -> &dyn Any { | ||
impl WriteBuilder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we also allow passing the Parquet WriterProperties
into this builder? I think users would want to be able to control the max_row_group_size
and other options from there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went back and forth a bit on this. If we eventually want to have support for both parquet implementations, maybe we should not expose the specific option structs directly, on the other hand there is more options to consider...
if batches.is_empty() { | ||
Err(WriteError::MissingData) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we instead early return if there is no data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess ... :). if we do we need to either make it a no-op or add some logic to handle the case when we do not have an explicitly defined table schema, in case we need to create the table.
}?; | ||
|
||
let plan = if let Some(plan) = this.input { | ||
Ok(plan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the plan guaranteed to be partitioned correctly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, it is now ... or rather I did think about wrapping that, but this moved to the writer now ... I'll check more explicitly that we handle this correctly.
Ok(_) => Ok(true), | ||
Err(ObjectStoreError::NotFound { .. }) => Ok(false), | ||
Err(err) => Err(err), | ||
// TODO We should really be using HEAD here, but this fails in windows tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you created a ticket for this in object-store? I can look into this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No I have not yet - Windows gives a permission denied error, and I wanted to investigate if that is a general / expected behavior. At least in principle there should not be any permission issues, since the tests run in a generated temp folder...
Co-authored-by: Will Jones <willjones127@gmail.com>
[profile.dev] | ||
split-debuginfo = "unpacked" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not entirely sure why why had this option. however it seems it is only stable on macOS, and was causing build issues for Windows after 1,65 was released, so I removed it.
Signed-off-by: Robert Pack <robstar.pack@gmail.com>
@wjones127 - sorry for letting this sit for so long. Did some minor tweaks, mainly related to so deprecations in latest chrono and resolved some conflicts with main. could you re-approve? :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No worries. Thanks for coming back to it. I'm excited to get this merged :)
Description
This PR incorporates some of the learnings with regards to how datafusion "should" be used (or what I think I understood so far how it should be used) and how this applies to our operations module. It also embraces the
IntoFuture
trait stabilized in rust1.64
.More specifically:
IntoFuture
DeltaTableError
(the top level error variants likely have to be refined, for now, many command errors map to aGenericError
, which at least shows meaningful error messages.)PartitionWriter
implementation that allows more fine graunlar control on how data is writtenSome of choices or trials here mainly (hopefully) make sense when viewed as preparation for what's to come.
To keep it somewhat simpler to review I tried to keep the major changes contained in the operations module. If we adopt this the idea is to migrate all existing operations (create, vacuum, optimize) to the builder pattern and into the operations module, and have the methods on
DeltaTable
just return a pre-populated builder. I think this is whereIntoFuture
shines, as we can await them just like before.Implementing this, one of the things I found most convenient, is that we now have the
memory://
store available. Coupled with thesync_stores
helper, it makes setting up and validating test cases that mutate an existing table very convenient - yayobject_store
😆.There are some things I want to clean up, but it would be great to get some feedback on if this is where we want to go - @houqp @wjones127 @fvaleye.
Related Issue(s)
Documentation