Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(worker): Flush the stream writer on error (DGRAPH-2499) #6609

Merged
merged 6 commits into from
Nov 10, 2020
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions worker/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,23 @@ func (n *node) populateSnapshot(snap pb.Snapshot, pl *conn.Pool) (int, error) {
}

var writer badgerWriter
var flushed bool
if snap.SinceTs == 0 {
sw := pstore.NewStreamWriter()
if err := sw.Prepare(); err != nil {
return 0, err
}

writer = sw
defer func() {
if flushed {
return
}
// Flush the writer if it has not been flushed so that we can unblock the writes.
if err := writer.Flush(); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add a cancel API on stream writer to fix this in a more elegant way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ajeetdsouza is working on resolving this.

glog.Errorf("write flush failed: %s", err)
}
}()
} else {
writer = pstore.NewManagedWriteBatch()
}
Expand Down Expand Up @@ -105,6 +115,7 @@ func (n *node) populateSnapshot(snap pb.Snapshot, pl *conn.Pool) (int, error) {
if err := writer.Flush(); err != nil {
return 0, err
}
flushed = true

if err := deleteStalePreds(ctx, done); err != nil {
return count, err
Expand Down