Skip to content

Commit

Permalink
Add notification ops and conversions
Browse files Browse the repository at this point in the history
  • Loading branch information
vitalif committed Sep 9, 2024
1 parent 5231ede commit 299578a
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 29 deletions.
66 changes: 37 additions & 29 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,18 +375,33 @@ func (c *Connection) readMessage() (*buffer.InMessage, error) {
}

// Write the supplied message to the kernel.
func (c *Connection) writeMessage(msg []byte) error {
// Avoid the retry loop in os.File.Write.
n, err := syscall.Write(int(c.dev.Fd()), msg)
if err != nil {
return err
func (c *Connection) writeMessage(outMsg *buffer.OutMessage) error {
var err error
var n int
expectedLen := outMsg.Len()
if outMsg.Sglist != nil {
if fusekernel.IsPlatformFuseT {
// writev is not atomic on macos, restrict to fuse-t platform
writeLock.Lock()
defer writeLock.Unlock()
}
n, err = writev(int(c.dev.Fd()), outMsg.Sglist)
} else {
// Avoid the retry loop in os.File.Write.
n, err = syscall.Write(int(c.dev.Fd()), outMsg.OutHeaderBytes())
}

if n != len(msg) {
return fmt.Errorf("Wrote %d bytes; expected %d", n, len(msg))
if err == nil && n != expectedLen {
err = fmt.Errorf("Wrote %d bytes; expected %d", n, expectedLen)
}

return nil
if err != nil {
writeErrMsg := fmt.Sprintf("writeMessage: %v %v", err, outMsg.OutHeaderBytes())
if c.errorLogger != nil {
c.errorLogger.Print(writeErrMsg)
}
return fmt.Errorf(writeErrMsg)
}
outMsg.Sglist = nil
return err
}

// ReadOp consumes the next op from the kernel process, returning the op and a
Expand Down Expand Up @@ -527,25 +542,7 @@ func (c *Connection) Reply(ctx context.Context, opErr error) error {
noResponse := c.kernelResponse(outMsg, inMsg.Header().Unique, op, opErr)

if !noResponse {
var err error
if outMsg.Sglist != nil {
if fusekernel.IsPlatformFuseT {
// writev is not atomic on macos, restrict to fuse-t platform
writeLock.Lock()
defer writeLock.Unlock()
}
_, err = writev(int(c.dev.Fd()), outMsg.Sglist)
} else {
err = c.writeMessage(outMsg.OutHeaderBytes())
}
if err != nil {
writeErrMsg := fmt.Sprintf("writeMessage: %v %v", err, outMsg.OutHeaderBytes())
if c.errorLogger != nil {
c.errorLogger.Print(writeErrMsg)
}
return fmt.Errorf(writeErrMsg)
}
outMsg.Sglist = nil
c.writeMessage(outMsg)
}

return nil
Expand All @@ -561,6 +558,17 @@ func (c *Connection) callbackForOp(op interface{}) func() {
return nil
}

// Send a notification to the kernel
// notification must be a pointer to one of fuseops.NotifyXXX structures
// To avoid a deadlock notifications must not be called in the execution path of a related filesytem operation or within any code that could hold a lock that could be needed to execute such an operation. As of kernel 4.18, a "related operation" is a lookup(), symlink(), mknod(), mkdir(), unlink(), rename(), link() or create() request for the parent, and a setattr(), unlink(), rmdir(), rename(), setxattr(), removexattr(), readdir() or readdirplus() request for the inode itself.
func (c *Connection) Notify(notification interface{}) error {
outMsg := c.getOutMessage()
defer c.putOutMessage(outMsg)
c.kernelNotification(outMsg, notification)
outMsg.OutHeader().Len = uint32(outMsg.Len())
return c.writeMessage(outMsg)
}

// Close the connection. Must not be called until operations that were read
// from the connection have been responded to.
func (c *Connection) close() error {
Expand Down
89 changes: 89 additions & 0 deletions conversions.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,26 @@ func convertInMessage(
OpContext: fuseops.OpContext{Pid: inMsg.Header().Pid},
}

case fusekernel.OpNotifyReply:
type input fusekernel.NotifyRetrieveIn
in := (*input)(inMsg.Consume(unsafe.Sizeof(input{})))
if in == nil {
return nil, errors.New("Corrupt OpNotifyReply")
}

buf := inMsg.ConsumeBytes(inMsg.Len())
if len(buf) < int(in.Size) {
return nil, errors.New("Corrupt OpNotifyReply")
}

o = &fuseops.NotifyRetrieveReplyOp{
Inode: fuseops.InodeID(inMsg.Header().Nodeid),
Unique: inMsg.Header().Unique,
Offset: in.Offset,
Length: in.Size,
OpContext: fuseops.OpContext{Pid: inMsg.Header().Pid},
}

default:
o = &unknownOp{
OpCode: inMsg.Header().Opcode,
Expand Down Expand Up @@ -780,6 +800,9 @@ func (c *Connection) kernelResponse(
case *fuseops.BatchForgetOp:
return true

case *fuseops.NotifyRetrieveReplyOp:
return true

case *interruptOp:
return true
}
Expand Down Expand Up @@ -1018,13 +1041,79 @@ func (c *Connection) kernelResponseForOp(
out := (*fusekernel.PollOut)(m.Grow(int(unsafe.Sizeof(fusekernel.PollOut{}))))
out.Revents = uint32(o.Revents)

case *fuseops.NotifyRetrieveReplyOp:
// Empty response

default:
panic(fmt.Sprintf("Unexpected op: %#v", op))
}

return
}

// Like kernelResponse, but assumes the user replied with a nil error to the op.
func (c *Connection) kernelNotification(
m *buffer.OutMessage,
op interface{}) {

h := m.OutHeader()
h.Unique = 0

// Create the appropriate output message
switch o := op.(type) {
case *fuseops.NotifyPollWakeup:
h.Error = fusekernel.NotifyCodePoll
out := (*fusekernel.NotifyPollWakeupOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyPollWakeupOut{}))))
out.Kh = o.Kh

case *fuseops.NotifyInvalInode:
h.Error = fusekernel.NotifyCodeInvalInode
out := (*fusekernel.NotifyInvalInodeOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyInvalInodeOut{}))))
out.Ino = uint64(o.Inode)
out.Off = o.Offset
out.Len = o.Length

case *fuseops.NotifyInvalEntry:
h.Error = fusekernel.NotifyCodeInvalEntry
out := (*fusekernel.NotifyInvalEntryOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyInvalEntryOut{}))))
out.Parent = uint64(o.Parent)
out.Namelen = uint32(len(o.Name))
m.AppendString(o.Name)
m.AppendString("\x00")

case *fuseops.NotifyDelete:
h.Error = fusekernel.NotifyCodeDelete
out := (*fusekernel.NotifyDeleteOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyDeleteOut{}))))
out.Parent = uint64(o.Parent)
out.Child = uint64(o.Child)
out.Namelen = uint32(len(o.Name))
m.AppendString(o.Name)
m.AppendString("\x00")

case *fuseops.NotifyStore:
h.Error = fusekernel.NotifyCodeStore
out := (*fusekernel.NotifyStoreOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyStoreOut{}))))
out.Nodeid = uint64(o.Inode)
out.Offset = o.Offset
out.Size = o.Length
m.Append(o.Data...)
m.ShrinkTo(buffer.OutMessageHeaderSize + int(unsafe.Sizeof(fusekernel.NotifyStoreOut{})) + int(o.Length))

case *fuseops.NotifyRetrieve:
h.Error = fusekernel.NotifyCodeRetrieve
out := (*fusekernel.NotifyRetrieveOut)(m.Grow(int(unsafe.Sizeof(fusekernel.NotifyRetrieveOut{}))))
out.Unique = o.Unique
out.Nodeid = uint64(o.Inode)
out.Offset = o.Offset
out.Size = o.Length

default:
panic(fmt.Sprintf("Unexpected notification: %#v", op))
}

return
}

////////////////////////////////////////////////////////////////////////
// General conversions
////////////////////////////////////////////////////////////////////////
Expand Down
99 changes: 99 additions & 0 deletions fuseops/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -1034,3 +1034,102 @@ type PollOp struct {
Revents fusekernel.PollEvents
OpContext OpContext
}

// Notify consumers waiting for poll/epoll that events are incoming
// for the specified kernel handle. The kernel will send a PollOp request
// to get the event mask after receiving this notification
type NotifyPollWakeup struct {
Kh uint64
}

// Notify to invalidate cache for an inode.
//
// If the filesystem has writeback caching enabled, invalidating an inode
// will first trigger a writeback of all dirty pages. The call will block
// until all writeback requests have completed and the inode has been
// invalidated. It will, however, not wait for completion of pending writeback
// requests that have been issued before.
type NotifyInvalInode struct {
Inode InodeID
Offset int64
Length int64
}

// Notify to invalidate parent attributes and the dentry matching parent/name
//
// To avoid a deadlock this request must not be sent in the execution path
// of a related filesytem operation or within any code that could hold a lock
// that could be needed to execute such an operation. As of kernel 4.18, a
// "related operation" is a lookup(), symlink(), mknod(), mkdir(), unlink(),
// rename(), link() or create() request for the parent, and a setattr(),
// unlink(), rmdir(), rename(), setxattr(), removexattr(), readdir() or
// readdirplus() request for the inode itself.
//
// When called correctly, it will never block.
type NotifyInvalEntry struct {
Parent InodeID
Name string
}

// This request behaves like NotifyInvalEntry with the following additional
// effect (at least as of Linux kernel 4.8):
//
// If the provided child inode matches the inode that is currently associated
// with the cached dentry, and if there are any inotify watches registered for
// the dentry, then the watchers are informed that the dentry has been deleted.
//
// To avoid a deadlock this request must not be sent while executing a
// related filesytem operation or while holding a lock that could be needed to
// execute such an operation.
type NotifyDelete struct {
Parent InodeID
Child InodeID
Name string
}

// Store data to the kernel buffers
//
// Synchronously store data in the kernel buffers belonging to the given inode.
// The stored data is marked up-to-date (no read will be performed against it,
// unless it's invalidated or evicted from the cache).
//
// If the stored data overflows the current file size, then the size is extended,
// similarly to a write(2) on the filesystem.
//
// If this request returns an error, then the store wasn't fully completed, but
// it may have been partially completed.
type NotifyStore struct {
Inode InodeID
Offset uint64
Length uint32
Data [][]byte
}

// Retrieve data from the kernel buffers belonging to the given inode
//
// If successful then the kernel will send a NotifyRetrieveReplyOp as a reply.
// Only present pages are returned in the retrieve reply. Retrieving stops when it
// finds a non-present page and only data prior to that is returned.
//
// If this request returns an error, then the retrieve will not be completed and
// no reply will be sent.
//
// This request doesn't change the dirty state of pages in the kernel buffer. For
// dirty pages the write() method will be called regardless of having been retrieved
// previously.
type NotifyRetrieve struct {
Inode InodeID
Unique uint64
Offset uint64
Length uint32
}

// Matches the size of WriteIn
type NotifyRetrieveReplyOp struct {
Inode InodeID
Unique uint64
Offset uint64
Length uint32

OpContext OpContext
}
4 changes: 4 additions & 0 deletions fuseutil/file_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ type FileSystem interface {
SyncFS(context.Context, *fuseops.SyncFSOp) error
Poll(context.Context, *fuseops.PollOp) error

SetConnection(*fuse.Connection)

// Regard all inodes (including the root inode) as having their lookup counts
// decremented to zero, and clean up any resources associated with the file
// system. No further calls to the file system will be made.
Expand Down Expand Up @@ -97,6 +99,8 @@ type fileSystemServer struct {
}

func (s *fileSystemServer) ServeOps(c *fuse.Connection) {
s.fs.SetConnection(c)

// When we are done, we clean up by waiting for all in-flight ops then
// destroying the file system.
defer func() {
Expand Down
3 changes: 3 additions & 0 deletions fuseutil/not_implemented_file_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,5 +216,8 @@ func (fs *NotImplementedFileSystem) Poll(
return fuse.ENOSYS
}

func (fs *NotImplementedFileSystem) SetConnection(*fuse.Connection) {
}

func (fs *NotImplementedFileSystem) Destroy() {
}

0 comments on commit 299578a

Please sign in to comment.