From a091e493191e6affb5edb31a9546bc1bff606a29 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Thu, 23 Mar 2023 00:30:11 +0300 Subject: [PATCH] Add notification ops and conversions --- connection.go | 48 +++++++----- conversions.go | 89 ++++++++++++++++++++++ fuseops/ops.go | 99 +++++++++++++++++++++++++ fuseutil/file_system.go | 4 + fuseutil/not_implemented_file_system.go | 3 + 5 files changed, 223 insertions(+), 20 deletions(-) diff --git a/connection.go b/connection.go index a970316c..8d3e2e4f 100644 --- a/connection.go +++ b/connection.go @@ -369,18 +369,24 @@ func (c *Connection) readMessage() (*buffer.InMessage, error) { } // Write the supplied message to the kernel. -func (c *Connection) writeMessage(msg []byte) error { - // Avoid the retry loop in os.File.Write. - n, err := syscall.Write(int(c.dev.Fd()), msg) - if err != nil { - return err +func (c *Connection) writeMessage(outMsg *buffer.OutMessage) error { + var err error + var n int + expectedLen := outMsg.Len() + if outMsg.Sglist != nil { + n, err = writev(int(c.dev.Fd()), outMsg.Sglist) + } else { + // Avoid the retry loop in os.File.Write. + n, err = syscall.Write(int(c.dev.Fd()), outMsg.OutHeaderBytes()) } - - if n != len(msg) { - return fmt.Errorf("Wrote %d bytes; expected %d", n, len(msg)) + if err == nil && n != expectedLen { + err = fmt.Errorf("Wrote %d bytes; expected %d", n, expectedLen) } - - return nil + if err != nil && c.errorLogger != nil { + c.errorLogger.Printf("writeMessage: %v %v", err, outMsg.OutHeaderBytes()) + } + outMsg.Sglist = nil + return err } // ReadOp consumes the next op from the kernel process, returning the op and a @@ -510,19 +516,21 @@ func (c *Connection) Reply(ctx context.Context, opErr error) { noResponse := c.kernelResponse(outMsg, inMsg.Header().Unique, op, opErr) if !noResponse { - var err error - if outMsg.Sglist != nil { - _, err = writev(int(c.dev.Fd()), outMsg.Sglist) - } else { - err = c.writeMessage(outMsg.OutHeaderBytes()) - } - if err != nil && c.errorLogger != nil { - c.errorLogger.Printf("writeMessage: %v %v", err, outMsg.OutHeaderBytes()) - } - outMsg.Sglist = nil + c.writeMessage(outMsg) } } +// Send a notification to the kernel +// notification must be a pointer to one of fuseops.NotifyXXX structures +// To avoid a deadlock notifications must not be called in the execution path of a related filesytem operation or within any code that could hold a lock that could be needed to execute such an operation. As of kernel 4.18, a "related operation" is a lookup(), symlink(), mknod(), mkdir(), unlink(), rename(), link() or create() request for the parent, and a setattr(), unlink(), rmdir(), rename(), setxattr(), removexattr(), readdir() or readdirplus() request for the inode itself. +func (c *Connection) Notify(notification interface{}) error { + outMsg := c.getOutMessage() + defer c.putOutMessage(outMsg) + c.kernelNotification(outMsg, notification) + outMsg.OutHeader().Len = uint32(outMsg.Len()) + return c.writeMessage(outMsg) +} + // Close the connection. Must not be called until operations that were read // from the connection have been responded to. func (c *Connection) close() error { diff --git a/conversions.go b/conversions.go index b0c25d5f..3c14f92e 100644 --- a/conversions.go +++ b/conversions.go @@ -704,6 +704,26 @@ func convertInMessage( OpContext: fuseops.OpContext{Pid: inMsg.Header().Pid}, } + case fusekernel.OpNotifyReply: + type input fusekernel.NotifyRetrieveIn + in := (*input)(inMsg.Consume(unsafe.Sizeof(input{}))) + if in == nil { + return nil, errors.New("Corrupt OpNotifyReply") + } + + buf := inMsg.ConsumeBytes(inMsg.Len()) + if len(buf) < int(in.Size) { + return nil, errors.New("Corrupt OpNotifyReply") + } + + o = &fuseops.NotifyRetrieveReplyOp{ + Inode: fuseops.InodeID(inMsg.Header().Nodeid), + Unique: inMsg.Header().Unique, + Offset: in.Offset, + Length: in.Size, + OpContext: fuseops.OpContext{Pid: inMsg.Header().Pid}, + } + default: o = &unknownOp{ OpCode: inMsg.Header().Opcode, @@ -737,6 +757,9 @@ func (c *Connection) kernelResponse( case *fuseops.BatchForgetOp: return true + case *fuseops.NotifyRetrieveReplyOp: + return true + case *interruptOp: return true } @@ -964,6 +987,9 @@ func (c *Connection) kernelResponseForOp( out := (*fusekernel.PollOut)(m.Grow(int(unsafe.Sizeof(fusekernel.PollOut{})))) out.Revents = uint32(o.Revents) + case *fuseops.NotifyRetrieveReplyOp: + // Empty response + default: panic(fmt.Sprintf("Unexpected op: %#v", op)) } @@ -971,6 +997,69 @@ func (c *Connection) kernelResponseForOp( return } +// Like kernelResponse, but assumes the user replied with a nil error to the op. +func (c *Connection) kernelNotification( + m *buffer.OutMessage, + op interface{}) { + + h := m.OutHeader() + h.Unique = 0 + + // Create the appropriate output message + switch o := op.(type) { + case *fuseops.NotifyPollWakeup: + h.Error = fusekernel.NotifyCodePoll + out := (*fusekernel.NotifyPollWakeupOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyPollWakeupOut{})))) + out.Kh = o.Kh + + case *fuseops.NotifyInvalInode: + h.Error = fusekernel.NotifyCodeInvalInode + out := (*fusekernel.NotifyInvalInodeOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyInvalInodeOut{})))) + out.Ino = uint64(o.Inode) + out.Off = o.Offset + out.Len = o.Length + + case *fuseops.NotifyInvalEntry: + h.Error = fusekernel.NotifyCodeInvalEntry + out := (*fusekernel.NotifyInvalEntryOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyInvalEntryOut{})))) + out.Parent = uint64(o.Parent) + out.Namelen = uint32(len(o.Name)) + m.AppendString(o.Name) + m.AppendString("\x00") + + case *fuseops.NotifyDelete: + h.Error = fusekernel.NotifyCodeDelete + out := (*fusekernel.NotifyDeleteOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyDeleteOut{})))) + out.Parent = uint64(o.Parent) + out.Child = uint64(o.Child) + out.Namelen = uint32(len(o.Name)) + m.AppendString(o.Name) + m.AppendString("\x00") + + case *fuseops.NotifyStore: + h.Error = fusekernel.NotifyCodeStore + out := (*fusekernel.NotifyStoreOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyStoreOut{})))) + out.Nodeid = uint64(o.Inode) + out.Offset = o.Offset + out.Size = o.Length + m.Append(o.Data...) + m.ShrinkTo(buffer.OutMessageHeaderSize + int(unsafe.Sizeof(fusekernel.NotifyStoreOut{})) + int(o.Length)) + + case *fuseops.NotifyRetrieve: + h.Error = fusekernel.NotifyCodeRetrieve + out := (*fusekernel.NotifyRetrieveOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyRetrieveOut{})))) + out.Unique = o.Unique + out.Nodeid = uint64(o.Inode) + out.Offset = o.Offset + out.Size = o.Length + + default: + panic(fmt.Sprintf("Unexpected notification: %#v", op)) + } + + return +} + //////////////////////////////////////////////////////////////////////// // General conversions //////////////////////////////////////////////////////////////////////// diff --git a/fuseops/ops.go b/fuseops/ops.go index 9001b746..335416bb 100644 --- a/fuseops/ops.go +++ b/fuseops/ops.go @@ -990,3 +990,102 @@ type PollOp struct { Revents fusekernel.PollEvents OpContext OpContext } + +// Notify consumers waiting for poll/epoll that events are incoming +// for the specified kernel handle. The kernel will send a PollOp request +// to get the event mask after receiving this notification +type NotifyPollWakeup struct { + Kh uint64 +} + +// Notify to invalidate cache for an inode. +// +// If the filesystem has writeback caching enabled, invalidating an inode +// will first trigger a writeback of all dirty pages. The call will block +// until all writeback requests have completed and the inode has been +// invalidated. It will, however, not wait for completion of pending writeback +// requests that have been issued before. +type NotifyInvalInode struct { + Inode InodeID + Offset int64 + Length int64 +} + +// Notify to invalidate parent attributes and the dentry matching parent/name +// +// To avoid a deadlock this request must not be sent in the execution path +// of a related filesytem operation or within any code that could hold a lock +// that could be needed to execute such an operation. As of kernel 4.18, a +// "related operation" is a lookup(), symlink(), mknod(), mkdir(), unlink(), +// rename(), link() or create() request for the parent, and a setattr(), +// unlink(), rmdir(), rename(), setxattr(), removexattr(), readdir() or +// readdirplus() request for the inode itself. +// +// When called correctly, it will never block. +type NotifyInvalEntry struct { + Parent InodeID + Name string +} + +// This request behaves like NotifyInvalEntry with the following additional +// effect (at least as of Linux kernel 4.8): +// +// If the provided child inode matches the inode that is currently associated +// with the cached dentry, and if there are any inotify watches registered for +// the dentry, then the watchers are informed that the dentry has been deleted. +// +// To avoid a deadlock this request must not be sent while executing a +// related filesytem operation or while holding a lock that could be needed to +// execute such an operation. +type NotifyDelete struct { + Parent InodeID + Child InodeID + Name string +} + +// Store data to the kernel buffers +// +// Synchronously store data in the kernel buffers belonging to the given inode. +// The stored data is marked up-to-date (no read will be performed against it, +// unless it's invalidated or evicted from the cache). +// +// If the stored data overflows the current file size, then the size is extended, +// similarly to a write(2) on the filesystem. +// +// If this request returns an error, then the store wasn't fully completed, but +// it may have been partially completed. +type NotifyStore struct { + Inode InodeID + Offset uint64 + Length uint32 + Data [][]byte +} + +// Retrieve data from the kernel buffers belonging to the given inode +// +// If successful then the kernel will send a NotifyRetrieveReplyOp as a reply. +// Only present pages are returned in the retrieve reply. Retrieving stops when it +// finds a non-present page and only data prior to that is returned. +// +// If this request returns an error, then the retrieve will not be completed and +// no reply will be sent. +// +// This request doesn't change the dirty state of pages in the kernel buffer. For +// dirty pages the write() method will be called regardless of having been retrieved +// previously. +type NotifyRetrieve struct { + Inode InodeID + Unique uint64 + Offset uint64 + Length uint32 +} + +// Matches the size of WriteIn +type NotifyRetrieveReplyOp struct { + Inode InodeID + Unique uint64 + Offset uint64 + Length uint32 + + OpContext OpContext +} diff --git a/fuseutil/file_system.go b/fuseutil/file_system.go index bef2145a..94789040 100644 --- a/fuseutil/file_system.go +++ b/fuseutil/file_system.go @@ -65,6 +65,8 @@ type FileSystem interface { Fallocate(context.Context, *fuseops.FallocateOp) error Poll(context.Context, *fuseops.PollOp) error + SetConnection(*fuse.Connection) + // Regard all inodes (including the root inode) as having their lookup counts // decremented to zero, and clean up any resources associated with the file // system. No further calls to the file system will be made. @@ -96,6 +98,8 @@ type fileSystemServer struct { } func (s *fileSystemServer) ServeOps(c *fuse.Connection) { + s.fs.SetConnection(c) + // When we are done, we clean up by waiting for all in-flight ops then // destroying the file system. defer func() { diff --git a/fuseutil/not_implemented_file_system.go b/fuseutil/not_implemented_file_system.go index 356327ef..622c3914 100644 --- a/fuseutil/not_implemented_file_system.go +++ b/fuseutil/not_implemented_file_system.go @@ -210,5 +210,8 @@ func (fs *NotImplementedFileSystem) Poll( return fuse.ENOSYS } +func (fs *NotImplementedFileSystem) SetConnection(*fuse.Connection) { +} + func (fs *NotImplementedFileSystem) Destroy() { }