Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unexpected behavior in update() #123

Closed
dispanser opened this issue Mar 14, 2021 · 3 comments · Fixed by #124
Closed

Unexpected behavior in update() #123

dispanser opened this issue Mar 14, 2021 · 3 comments · Fixed by #124

Comments

@dispanser
Copy link
Contributor

dispanser commented Mar 14, 2021

I believe there's a subtle bug in the update() method, and how it decides on
either loading from a checkpoint or applying the log from json files.

Here's the important bits:

Ok(last_check_point) => {
	if self.last_check_point.is_none()
		|| self.last_check_point == Some(last_check_point)
	{
		self.last_check_point = Some(last_check_point);
		self.restore_checkpoint(last_check_point).await?;
		self.version = last_check_point.version + 1;
	}
}

What this is doing is

  • load from checkpoint if new checkpoint is the same as previously used checkpoint
  • if the new checkpoint is different, don't use it but instead apply the json deltas.

I believe what we want instead is use the json deltas if the new checkpoint is the
same as the previous one, and use the checkpoint if it is newer than what we previously
had loaded.

Note that the final result - an up to date delta table - is still achieved, but it
unnecessarily loads checkpoints or json deltas in either scenario, so it could be
more efficient.

I wasn't able to write a self-contained test inside rust, as I'm not sure about
the status of write support (and checkpointing in particular), but I validated
my assumptions by running a spark shell session to generate commits and a rust
session (with some println! sparkles) side by side, based on the following
"data generator" in scala:

def createCommits(numCommits: Int, deltaUrl: String): Unit =
     (0 until numCommits).foreach { c =>
		 Seq(c).toDF("version").write.format("delta").mode("append").save(deltaUrl)
	 }
  1. start: 5 commits createCommits(5, "<some delta path>")
reading delta table: "<some delta path>"
[apply_logs_after_current_version] applied 0
[apply_logs_after_current_version] applied 1
[apply_logs_after_current_version] applied 2
[apply_logs_after_current_version] applied 3
[apply_logs_after_current_version] applied 4
[apply_logs_after_current_version] end of log : 5, rollback
initial table loaded, version 4
waiting on keypress for update, current version 4

Init: loading the table, all good.

  1. 4 more commits: no checkpoints yet
updating ...
[update] process started
[update] no previous checkpoint, trying version = 5 ff
[apply_logs_after_current_version] applied 5
[apply_logs_after_current_version] applied 6
[apply_logs_after_current_version] applied 7
[apply_logs_after_current_version] applied 8
[apply_logs_after_current_version] end of log : 9, rollback
waiting on keypress for update, current version 8

Expected behavior: start with our current version and apply new comits in sequence.

  1. 4 more commits: new checkpoint at v10
updating ...
[update] process started
[update] retrieved checkpoint CheckPoint { version: 10, size: 13, parts: None }, previous = None
[update] determined to load a checkpoint: CheckPoint { version: 10, size: 13, parts: None }
[restore_checkpoint]: CheckPoint { version: 10, size: 13, parts: None }
[apply_logs_after_current_version] applied 11
[apply_logs_after_current_version] applied 12
[apply_logs_after_current_version] end of log : 13, rollback
waiting on keypress for update, current version 12

Expected behavior: load checkpoint, apply changes from there

  1. 4 more commits: no new checkpoint, still at v10
updating ...
[update] process started
[update] retrieved checkpoint CheckPoint { version: 10, size: 13, parts: None }, previous = Some(CheckPoint { version: 10, size: 13, parts: None })
[update] determined to load a checkpoint: CheckPoint { version: 10, size: 13, parts: None }
[restore_checkpoint]: CheckPoint { version: 10, size: 13, parts: None }
[apply_logs_after_current_version] applied 11
[apply_logs_after_current_version] applied 12
[apply_logs_after_current_version] applied 13
[apply_logs_after_current_version] applied 14
[apply_logs_after_current_version] applied 15
[apply_logs_after_current_version] applied 16
[apply_logs_after_current_version] end of log : 17, rollback
waiting on keypress for update, current version 16

Unexpected behavior: we already have read until version 12, but we reload from
checkpoint at version 10, applying more json than necessary (and a checkpoint
that does not help, either).

  1. no new commits, should do nothing
updating ...
[update] process started
[update] retrieved checkpoint CheckPoint { version: 10, size: 13, parts: None }, previous = Some(CheckPoint { version: 10, size: 13, parts: None })
[update] determined to load a checkpoint: CheckPoint { version: 10, size: 13, parts: None }
[restore_checkpoint]: CheckPoint { version: 10, size: 13, parts: None }
[apply_logs_after_current_version] applied 11
[apply_logs_after_current_version] applied 12
[apply_logs_after_current_version] applied 13
[apply_logs_after_current_version] applied 14
[apply_logs_after_current_version] applied 15
[apply_logs_after_current_version] applied 16
[apply_logs_after_current_version] end of log : 17, rollback
waiting on keypress for update, current version 16

Unexpected behavior: we reload starting from the previous checkpoint, even though
now new commits where added to the delta log.

  1. 4 more commits: new checkpoint at v20
updating ...
[update] process started
[update] retrieved checkpoint CheckPoint { version: 20, size: 23, parts: None }, previous = Some(CheckPoint { version: 10, size: 13, parts: None })
[apply_logs_after_current_version] applied 16
[apply_logs_after_current_version] applied 17
[apply_logs_after_current_version] applied 18
[apply_logs_after_current_version] applied 19
[apply_logs_after_current_version] applied 20
[apply_logs_after_current_version] end of log : 21, rollback
waiting on keypress for update, current version 20

Unexpected behavior: despite having a checkpoint at 20, we use json all
the way up for versions 16 .. 20.

I believe the expected behavior can be achieved with a single-character change
in the logical expression. I'd gladly provide a pull request.

@houqp
Copy link
Member

houqp commented Mar 14, 2021

Good catch @dispanser

@houqp
Copy link
Member

houqp commented Mar 14, 2021

Thanks @dispanser for the detailed analysis log :)

@xianwill
Copy link
Collaborator

@dispanser - re:

I wasn't able to write a self-contained test inside rust, as I'm not sure about
the status of write support (and checkpointing in particular)

Status of write support is very early (we just have the bits you've seen already regarding the transaction log so far), and checkpointing is not implemented at all yet. See #106 for the issue tracking checkpointing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants