From 299578a3b301d394d17fdfcb6c1ae9b42b5b3500 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Thu, 23 Mar 2023 00:30:11 +0300 Subject: [PATCH] Add notification ops and conversions --- connection.go | 66 +++++++++-------- conversions.go | 89 ++++++++++++++++++++++ fuseops/ops.go | 99 +++++++++++++++++++++++++ fuseutil/file_system.go | 4 + fuseutil/not_implemented_file_system.go | 3 + 5 files changed, 232 insertions(+), 29 deletions(-) diff --git a/connection.go b/connection.go index a3c193d..6172ac5 100644 --- a/connection.go +++ b/connection.go @@ -375,18 +375,33 @@ func (c *Connection) readMessage() (*buffer.InMessage, error) { } // Write the supplied message to the kernel. -func (c *Connection) writeMessage(msg []byte) error { - // Avoid the retry loop in os.File.Write. - n, err := syscall.Write(int(c.dev.Fd()), msg) - if err != nil { - return err +func (c *Connection) writeMessage(outMsg *buffer.OutMessage) error { + var err error + var n int + expectedLen := outMsg.Len() + if outMsg.Sglist != nil { + if fusekernel.IsPlatformFuseT { + // writev is not atomic on macos, restrict to fuse-t platform + writeLock.Lock() + defer writeLock.Unlock() + } + n, err = writev(int(c.dev.Fd()), outMsg.Sglist) + } else { + // Avoid the retry loop in os.File.Write. + n, err = syscall.Write(int(c.dev.Fd()), outMsg.OutHeaderBytes()) } - - if n != len(msg) { - return fmt.Errorf("Wrote %d bytes; expected %d", n, len(msg)) + if err == nil && n != expectedLen { + err = fmt.Errorf("Wrote %d bytes; expected %d", n, expectedLen) } - - return nil + if err != nil { + writeErrMsg := fmt.Sprintf("writeMessage: %v %v", err, outMsg.OutHeaderBytes()) + if c.errorLogger != nil { + c.errorLogger.Print(writeErrMsg) + } + return fmt.Errorf(writeErrMsg) + } + outMsg.Sglist = nil + return err } // ReadOp consumes the next op from the kernel process, returning the op and a @@ -527,25 +542,7 @@ func (c *Connection) Reply(ctx context.Context, opErr error) error { noResponse := c.kernelResponse(outMsg, inMsg.Header().Unique, op, opErr) if !noResponse { - var err error - if outMsg.Sglist != nil { - if fusekernel.IsPlatformFuseT { - // writev is not atomic on macos, restrict to fuse-t platform - writeLock.Lock() - defer writeLock.Unlock() - } - _, err = writev(int(c.dev.Fd()), outMsg.Sglist) - } else { - err = c.writeMessage(outMsg.OutHeaderBytes()) - } - if err != nil { - writeErrMsg := fmt.Sprintf("writeMessage: %v %v", err, outMsg.OutHeaderBytes()) - if c.errorLogger != nil { - c.errorLogger.Print(writeErrMsg) - } - return fmt.Errorf(writeErrMsg) - } - outMsg.Sglist = nil + c.writeMessage(outMsg) } return nil @@ -561,6 +558,17 @@ func (c *Connection) callbackForOp(op interface{}) func() { return nil } +// Send a notification to the kernel +// notification must be a pointer to one of fuseops.NotifyXXX structures +// To avoid a deadlock notifications must not be called in the execution path of a related filesytem operation or within any code that could hold a lock that could be needed to execute such an operation. As of kernel 4.18, a "related operation" is a lookup(), symlink(), mknod(), mkdir(), unlink(), rename(), link() or create() request for the parent, and a setattr(), unlink(), rmdir(), rename(), setxattr(), removexattr(), readdir() or readdirplus() request for the inode itself. +func (c *Connection) Notify(notification interface{}) error { + outMsg := c.getOutMessage() + defer c.putOutMessage(outMsg) + c.kernelNotification(outMsg, notification) + outMsg.OutHeader().Len = uint32(outMsg.Len()) + return c.writeMessage(outMsg) +} + // Close the connection. Must not be called until operations that were read // from the connection have been responded to. func (c *Connection) close() error { diff --git a/conversions.go b/conversions.go index b2b5d04..a77ced0 100644 --- a/conversions.go +++ b/conversions.go @@ -747,6 +747,26 @@ func convertInMessage( OpContext: fuseops.OpContext{Pid: inMsg.Header().Pid}, } + case fusekernel.OpNotifyReply: + type input fusekernel.NotifyRetrieveIn + in := (*input)(inMsg.Consume(unsafe.Sizeof(input{}))) + if in == nil { + return nil, errors.New("Corrupt OpNotifyReply") + } + + buf := inMsg.ConsumeBytes(inMsg.Len()) + if len(buf) < int(in.Size) { + return nil, errors.New("Corrupt OpNotifyReply") + } + + o = &fuseops.NotifyRetrieveReplyOp{ + Inode: fuseops.InodeID(inMsg.Header().Nodeid), + Unique: inMsg.Header().Unique, + Offset: in.Offset, + Length: in.Size, + OpContext: fuseops.OpContext{Pid: inMsg.Header().Pid}, + } + default: o = &unknownOp{ OpCode: inMsg.Header().Opcode, @@ -780,6 +800,9 @@ func (c *Connection) kernelResponse( case *fuseops.BatchForgetOp: return true + case *fuseops.NotifyRetrieveReplyOp: + return true + case *interruptOp: return true } @@ -1018,6 +1041,9 @@ func (c *Connection) kernelResponseForOp( out := (*fusekernel.PollOut)(m.Grow(int(unsafe.Sizeof(fusekernel.PollOut{})))) out.Revents = uint32(o.Revents) + case *fuseops.NotifyRetrieveReplyOp: + // Empty response + default: panic(fmt.Sprintf("Unexpected op: %#v", op)) } @@ -1025,6 +1051,69 @@ func (c *Connection) kernelResponseForOp( return } +// Like kernelResponse, but assumes the user replied with a nil error to the op. +func (c *Connection) kernelNotification( + m *buffer.OutMessage, + op interface{}) { + + h := m.OutHeader() + h.Unique = 0 + + // Create the appropriate output message + switch o := op.(type) { + case *fuseops.NotifyPollWakeup: + h.Error = fusekernel.NotifyCodePoll + out := (*fusekernel.NotifyPollWakeupOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyPollWakeupOut{})))) + out.Kh = o.Kh + + case *fuseops.NotifyInvalInode: + h.Error = fusekernel.NotifyCodeInvalInode + out := (*fusekernel.NotifyInvalInodeOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyInvalInodeOut{})))) + out.Ino = uint64(o.Inode) + out.Off = o.Offset + out.Len = o.Length + + case *fuseops.NotifyInvalEntry: + h.Error = fusekernel.NotifyCodeInvalEntry + out := (*fusekernel.NotifyInvalEntryOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyInvalEntryOut{})))) + out.Parent = uint64(o.Parent) + out.Namelen = uint32(len(o.Name)) + m.AppendString(o.Name) + m.AppendString("\x00") + + case *fuseops.NotifyDelete: + h.Error = fusekernel.NotifyCodeDelete + out := (*fusekernel.NotifyDeleteOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyDeleteOut{})))) + out.Parent = uint64(o.Parent) + out.Child = uint64(o.Child) + out.Namelen = uint32(len(o.Name)) + m.AppendString(o.Name) + m.AppendString("\x00") + + case *fuseops.NotifyStore: + h.Error = fusekernel.NotifyCodeStore + out := (*fusekernel.NotifyStoreOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyStoreOut{})))) + out.Nodeid = uint64(o.Inode) + out.Offset = o.Offset + out.Size = o.Length + m.Append(o.Data...) + m.ShrinkTo(buffer.OutMessageHeaderSize + int(unsafe.Sizeof(fusekernel.NotifyStoreOut{})) + int(o.Length)) + + case *fuseops.NotifyRetrieve: + h.Error = fusekernel.NotifyCodeRetrieve + out := (*fusekernel.NotifyRetrieveOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyRetrieveOut{})))) + out.Unique = o.Unique + out.Nodeid = uint64(o.Inode) + out.Offset = o.Offset + out.Size = o.Length + + default: + panic(fmt.Sprintf("Unexpected notification: %#v", op)) + } + + return +} + //////////////////////////////////////////////////////////////////////// // General conversions //////////////////////////////////////////////////////////////////////// diff --git a/fuseops/ops.go b/fuseops/ops.go index 4227c5b..00fdd6f 100644 --- a/fuseops/ops.go +++ b/fuseops/ops.go @@ -1034,3 +1034,102 @@ type PollOp struct { Revents fusekernel.PollEvents OpContext OpContext } + +// Notify consumers waiting for poll/epoll that events are incoming +// for the specified kernel handle. The kernel will send a PollOp request +// to get the event mask after receiving this notification +type NotifyPollWakeup struct { + Kh uint64 +} + +// Notify to invalidate cache for an inode. +// +// If the filesystem has writeback caching enabled, invalidating an inode +// will first trigger a writeback of all dirty pages. The call will block +// until all writeback requests have completed and the inode has been +// invalidated. It will, however, not wait for completion of pending writeback +// requests that have been issued before. +type NotifyInvalInode struct { + Inode InodeID + Offset int64 + Length int64 +} + +// Notify to invalidate parent attributes and the dentry matching parent/name +// +// To avoid a deadlock this request must not be sent in the execution path +// of a related filesytem operation or within any code that could hold a lock +// that could be needed to execute such an operation. As of kernel 4.18, a +// "related operation" is a lookup(), symlink(), mknod(), mkdir(), unlink(), +// rename(), link() or create() request for the parent, and a setattr(), +// unlink(), rmdir(), rename(), setxattr(), removexattr(), readdir() or +// readdirplus() request for the inode itself. +// +// When called correctly, it will never block. +type NotifyInvalEntry struct { + Parent InodeID + Name string +} + +// This request behaves like NotifyInvalEntry with the following additional +// effect (at least as of Linux kernel 4.8): +// +// If the provided child inode matches the inode that is currently associated +// with the cached dentry, and if there are any inotify watches registered for +// the dentry, then the watchers are informed that the dentry has been deleted. +// +// To avoid a deadlock this request must not be sent while executing a +// related filesytem operation or while holding a lock that could be needed to +// execute such an operation. +type NotifyDelete struct { + Parent InodeID + Child InodeID + Name string +} + +// Store data to the kernel buffers +// +// Synchronously store data in the kernel buffers belonging to the given inode. +// The stored data is marked up-to-date (no read will be performed against it, +// unless it's invalidated or evicted from the cache). +// +// If the stored data overflows the current file size, then the size is extended, +// similarly to a write(2) on the filesystem. +// +// If this request returns an error, then the store wasn't fully completed, but +// it may have been partially completed. +type NotifyStore struct { + Inode InodeID + Offset uint64 + Length uint32 + Data [][]byte +} + +// Retrieve data from the kernel buffers belonging to the given inode +// +// If successful then the kernel will send a NotifyRetrieveReplyOp as a reply. +// Only present pages are returned in the retrieve reply. Retrieving stops when it +// finds a non-present page and only data prior to that is returned. +// +// If this request returns an error, then the retrieve will not be completed and +// no reply will be sent. +// +// This request doesn't change the dirty state of pages in the kernel buffer. For +// dirty pages the write() method will be called regardless of having been retrieved +// previously. +type NotifyRetrieve struct { + Inode InodeID + Unique uint64 + Offset uint64 + Length uint32 +} + +// Matches the size of WriteIn +type NotifyRetrieveReplyOp struct { + Inode InodeID + Unique uint64 + Offset uint64 + Length uint32 + + OpContext OpContext +} diff --git a/fuseutil/file_system.go b/fuseutil/file_system.go index bc8a503..9aee3ac 100644 --- a/fuseutil/file_system.go +++ b/fuseutil/file_system.go @@ -66,6 +66,8 @@ type FileSystem interface { SyncFS(context.Context, *fuseops.SyncFSOp) error Poll(context.Context, *fuseops.PollOp) error + SetConnection(*fuse.Connection) + // Regard all inodes (including the root inode) as having their lookup counts // decremented to zero, and clean up any resources associated with the file // system. No further calls to the file system will be made. @@ -97,6 +99,8 @@ type fileSystemServer struct { } func (s *fileSystemServer) ServeOps(c *fuse.Connection) { + s.fs.SetConnection(c) + // When we are done, we clean up by waiting for all in-flight ops then // destroying the file system. defer func() { diff --git a/fuseutil/not_implemented_file_system.go b/fuseutil/not_implemented_file_system.go index 971f90b..d55ab72 100644 --- a/fuseutil/not_implemented_file_system.go +++ b/fuseutil/not_implemented_file_system.go @@ -216,5 +216,8 @@ func (fs *NotImplementedFileSystem) Poll( return fuse.ENOSYS } +func (fs *NotImplementedFileSystem) SetConnection(*fuse.Connection) { +} + func (fs *NotImplementedFileSystem) Destroy() { }