Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Do not review] Sync/flush changes for buffered writes #2778

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 75 additions & 15 deletions internal/fs/inode/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,19 +500,26 @@ func (f *FileInode) Read(
func (f *FileInode) Write(
ctx context.Context,
data []byte,
offset int64) (err error) {
offset int64) error {
// For empty GCS files also we will trigger bufferedWrites flow.
if f.src.Size == 0 && f.config.Write.ExperimentalEnableStreamingWrites {
err = f.ensureBufferedWriteHandler(ctx)
err := f.ensureBufferedWriteHandler(ctx)
if err != nil {
return
return err
}
}

if f.bwh != nil {
return f.bwh.Write(data, offset)
return f.writeUsingBufferedWrites(ctx, data, offset)
}

return f.writeUsingTempFile(ctx, data, offset)
}

// Helper function to serve write for file using temp file.
//
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) writeUsingTempFile(ctx context.Context, data []byte, offset int64) (err error) {
// Make sure f.content != nil.
err = f.ensureContent(ctx)
if err != nil {
Expand All @@ -527,6 +534,49 @@ func (f *FileInode) Write(
return
}

// Helper function to serve write for file using buffered writes handler.
//
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) writeUsingBufferedWrites(ctx context.Context, data []byte, offset int64) error {
err := f.bwh.Write(data, offset)
if err == bufferedwrites.ErrOutOfOrderWrite || err == bufferedwrites.ErrUploadFailure {
// Finalize the object.
flushErr := f.flushUsingBufferedWriteHandler()
if flushErr != nil {
return fmt.Errorf("bwh.Write failed: %v, could not finalize what has been written so far: %w", err, flushErr)
}
}

// Fall back to temp file for Out-Of-Order Writes.
if err == bufferedwrites.ErrOutOfOrderWrite {
return f.writeUsingTempFile(ctx, data, offset)
}

return err
}

// Helper function to flush buffered writes handler and update inode state with
// new object.
//
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) flushUsingBufferedWriteHandler() error {
obj, err := f.bwh.Flush()

var preconditionErr *gcs.PreconditionError
if errors.As(err, &preconditionErr) {
return &gcsfuse_errors.FileClobberedError{
Err: fmt.Errorf("f.bwh.Flush(): %w", err),
}
}

if err != nil {
return fmt.Errorf("f.bwh.Flush(): %w", err)
}

f.updateInodeStateAfterSync(obj)
return nil
}

// Set the mtime for this file. May involve a round trip to GCS.
//
// LOCKS_REQUIRED(f.mu)
Expand Down Expand Up @@ -639,10 +689,15 @@ func (f *FileInode) fetchLatestGcsObject(ctx context.Context) (*gcs.Object, erro
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) Sync(ctx context.Context) (err error) {
// If we have not been dirtied, there is nothing to do.
if f.content == nil {
if f.content == nil && f.bwh == nil {
return
}

if f.bwh != nil {
// Finalize the object.
return f.flushUsingBufferedWriteHandler()
}

latestGcsObj, err := f.fetchLatestGcsObject(ctx)
if err != nil {
return
Expand All @@ -666,21 +721,26 @@ func (f *FileInode) Sync(ctx context.Context) (err error) {
err = fmt.Errorf("SyncObject: %w", err)
return
}

minObj := storageutil.ConvertObjToMinObject(newObj)
// If we wrote out a new object, we need to update our state.
if newObj != nil && !f.localFileCache {
var minObj gcs.MinObject
minObjPtr := storageutil.ConvertObjToMinObject(newObj)
if minObjPtr != nil {
minObj = *minObjPtr
}
f.src = minObj
f.updateInodeStateAfterSync(minObj)
return
}

func (f *FileInode) updateInodeStateAfterSync(minObj *gcs.MinObject) {
if minObj != nil && !f.localFileCache {
f.src = *minObj
// Convert localFile to nonLocalFile after it is synced to GCS.
if f.IsLocal() {
f.local = false
}
f.content.Destroy()
f.content = nil
if f.content != nil {
f.content.Destroy()
f.content = nil
}
if f.bwh != nil {
f.bwh = nil
}
}

return
Expand Down
Loading
Loading