Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Flush to manually unblock Read/ReadInto #1491

Merged
merged 2 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 79 additions & 24 deletions internal/epoll/poller.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
package epoll

import (
"errors"
"fmt"
"math"
"os"
"runtime"
"slices"
"sync"
"time"

"github.com/cilium/ebpf/internal"
"github.com/cilium/ebpf/internal/unix"
)

var ErrFlushed = errors.New("data was flushed")

// Poller waits for readiness notifications from multiple file descriptors.
//
// The wait can be interrupted by calling Close.
Expand All @@ -21,27 +25,48 @@ type Poller struct {
epollMu sync.Mutex
epollFd int

eventMu sync.Mutex
event *eventFd
eventMu sync.Mutex
closeEvent *eventFd
flushEvent *eventFd
}

func New() (*Poller, error) {
func New() (_ *Poller, err error) {
closeFDOnError := func(fd int) {
if err != nil {
unix.Close(fd)
}
}
closeEventFDOnError := func(e *eventFd) {
if err != nil {
e.close()
}
}

epollFd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC)
if err != nil {
return nil, fmt.Errorf("create epoll fd: %v", err)
}
defer closeFDOnError(epollFd)

p := &Poller{epollFd: epollFd}
p.event, err = newEventFd()
p.closeEvent, err = newEventFd()
if err != nil {
return nil, err
}
defer closeEventFDOnError(p.closeEvent)

p.flushEvent, err = newEventFd()
if err != nil {
unix.Close(epollFd)
return nil, err
}
defer closeEventFDOnError(p.flushEvent)

if err := p.Add(p.closeEvent.raw, 0); err != nil {
return nil, fmt.Errorf("add close eventfd: %w", err)
}

if err := p.Add(p.event.raw, 0); err != nil {
unix.Close(epollFd)
p.event.close()
return nil, fmt.Errorf("add eventfd: %w", err)
if err := p.Add(p.flushEvent.raw, 0); err != nil {
return nil, fmt.Errorf("add flush eventfd: %w", err)
}

runtime.SetFinalizer(p, (*Poller).Close)
Expand All @@ -55,8 +80,8 @@ func New() (*Poller, error) {
func (p *Poller) Close() error {
runtime.SetFinalizer(p, nil)

// Interrupt Wait() via the event fd if it's currently blocked.
if err := p.wakeWait(); err != nil {
// Interrupt Wait() via the closeEvent fd if it's currently blocked.
if err := p.wakeWaitForClose(); err != nil {
return err
}

Expand All @@ -73,9 +98,14 @@ func (p *Poller) Close() error {
p.epollFd = -1
}

if p.event != nil {
p.event.close()
p.event = nil
if p.closeEvent != nil {
p.closeEvent.close()
p.closeEvent = nil
}

if p.flushEvent != nil {
p.flushEvent.close()
p.flushEvent = nil
}

return nil
Expand Down Expand Up @@ -118,8 +148,11 @@ func (p *Poller) Add(fd int, id int) error {

// Wait for events.
//
// Returns the number of pending events or an error wrapping os.ErrClosed if
// Close is called, or os.ErrDeadlineExceeded if EpollWait timeout.
// Returns the number of pending events and any errors.
//
// - [os.ErrClosed] if interrupted by [Close].
// - [ErrFlushed] if interrupted by [Flush].
// - [os.ErrDeadlineExceeded] if deadline is reached.
func (p *Poller) Wait(events []unix.EpollEvent, deadline time.Time) (int, error) {
p.epollMu.Lock()
defer p.epollMu.Unlock()
Expand Down Expand Up @@ -154,33 +187,55 @@ func (p *Poller) Wait(events []unix.EpollEvent, deadline time.Time) (int, error)
return 0, fmt.Errorf("epoll wait: %w", os.ErrDeadlineExceeded)
}

for _, event := range events[:n] {
if int(event.Fd) == p.event.raw {
// Since we don't read p.event the event is never cleared and
for i := 0; i < n; {
event := events[i]
if int(event.Fd) == p.closeEvent.raw {
// Since we don't read p.closeEvent the event is never cleared and
// we'll keep getting this wakeup until Close() acquires the
// lock and sets p.epollFd = -1.
return 0, fmt.Errorf("epoll wait: %w", os.ErrClosed)
}
if int(event.Fd) == p.flushEvent.raw {
// read event to prevent it from continuing to wake
p.flushEvent.read()
err = ErrFlushed
events = slices.Delete(events, i, i+1)
n -= 1
continue
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the behaviour is that Wait will return 0, nil for a flush. But what about a case where another ring becomes ready simultaneously with flushEvent? In that case we'd get two events from epoll, would discard one and then return 1, nil. In that case we drop an ErrFlushed from Reader.Read which the user is expecting.

How about removing the event here, and doing return n, ErrFlushed from Wait instead? Then ringbuf and perf re-export epoll.ErrFlushed and the rest can mostly stay the same.

Copy link
Contributor Author

@brycekahle brycekahle Jun 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case we drop an ErrFlushed from Reader.Read which the user is expecting.

I don't follow. Flush is the one that sets up a pending error of ErrFlushed, which will still be returned after all the rings have been read.

The n return value from Wait isn't even used. Only if an error is returned does a different code path happen. We want Flush to behave as if a regular ring epoll event happened, which is what it is doing.

Copy link
Contributor Author

@brycekahle brycekahle Jun 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we actually want to ensure we read all the events in case there is both a Flush and Close in the same wakeup.

}
i++
}

return n, nil
return n, err
}
}

type temporaryError interface {
Temporary() bool
}

// wakeWait unblocks Wait if it's epoll_wait.
func (p *Poller) wakeWait() error {
// wakeWaitForClose unblocks Wait if it's epoll_wait.
func (p *Poller) wakeWaitForClose() error {
p.eventMu.Lock()
defer p.eventMu.Unlock()

if p.closeEvent == nil {
return fmt.Errorf("epoll wake: %w", os.ErrClosed)
}

return p.closeEvent.add(1)
}

// Flush unblocks Wait if it's epoll_wait, for purposes of reading pending samples
func (p *Poller) Flush() error {
p.eventMu.Lock()
defer p.eventMu.Unlock()

if p.event == nil {
if p.flushEvent == nil {
return fmt.Errorf("epoll wake: %w", os.ErrClosed)
}

return p.event.add(1)
return p.flushEvent.add(1)
}

// eventFd wraps a Linux eventfd.
Expand Down
24 changes: 23 additions & 1 deletion internal/epoll/poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/cilium/ebpf/internal/unix"
"github.com/go-quicktest/qt"
)

func TestPoller(t *testing.T) {
Expand Down Expand Up @@ -101,12 +102,33 @@ func TestPollerDeadline(t *testing.T) {
}()

// Wait for the goroutine to enter the syscall.
time.Sleep(time.Second)
time.Sleep(500 * time.Microsecond)

poller.Close()
<-done
}

func TestPollerFlush(t *testing.T) {
t.Parallel()

_, poller := mustNewPoller(t)
events := make([]unix.EpollEvent, 1)

done := make(chan struct{})
go func() {
defer close(done)

_, err := poller.Wait(events, time.Time{})
qt.Check(t, qt.ErrorIs(err, ErrFlushed))
}()

// Wait for the goroutine to enter the syscall.
time.Sleep(500 * time.Microsecond)

poller.Flush()
<-done
}

func mustNewPoller(t *testing.T) (*eventFd, *Poller) {
t.Helper()

Expand Down
65 changes: 35 additions & 30 deletions perf/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ import (
)

var (
ErrClosed = os.ErrClosed
errEOR = errors.New("end of ring")
ErrClosed = os.ErrClosed
ErrFlushed = epoll.ErrFlushed
errEOR = errors.New("end of ring")
)

var perfEventHeaderSize = binary.Size(perfEventHeader{})
Expand Down Expand Up @@ -135,31 +136,27 @@ func readRawSample(rd io.Reader, buf, sampleBuf []byte) ([]byte, error) {
// Reader allows reading bpf_perf_event_output
// from user space.
type Reader struct {
poller *epoll.Poller
deadline time.Time
poller *epoll.Poller

// mu protects read/write access to the Reader structure with the
// exception of 'pauseFds', which is protected by 'pauseMu'.
// exception fields protected by 'pauseMu'.
// If locking both 'mu' and 'pauseMu', 'mu' must be locked first.
mu sync.Mutex

// Closing a PERF_EVENT_ARRAY removes all event fds
// stored in it, so we keep a reference alive.
array *ebpf.Map
rings []*perfEventRing
epollEvents []unix.EpollEvent
epollRings []*perfEventRing
eventHeader []byte
mu sync.Mutex
array *ebpf.Map
rings []*perfEventRing
epollEvents []unix.EpollEvent
epollRings []*perfEventRing
eventHeader []byte
deadline time.Time
overwritable bool
bufferSize int
pendingErr error

// pauseMu protects eventFds so that Pause / Resume can be invoked while
// Read is blocked.
pauseMu sync.Mutex
eventFds []*sys.FD

paused bool
overwritable bool

bufferSize int
paused bool
}

// ReaderOptions control the behaviour of the user
Expand Down Expand Up @@ -242,6 +239,8 @@ func NewReaderWithOptions(array *ebpf.Map, perCPUBuffer int, opts ReaderOptions)
}
}

// Closing a PERF_EVENT_ARRAY removes all event fds
// stored in it, so we keep a reference alive.
array, err = array.Clone()
if err != nil {
return nil, err
Expand Down Expand Up @@ -318,18 +317,18 @@ func (pr *Reader) SetDeadline(t time.Time) {

// Read the next record from the perf ring buffer.
//
// The function blocks until there are at least Watermark bytes in one
// The method blocks until there are at least Watermark bytes in one
// of the per CPU buffers. Records from buffers below the Watermark
// are not returned.
//
// Records can contain between 0 and 7 bytes of trailing garbage from the ring
// depending on the input sample's length.
//
// Calling Close interrupts the function.
// Calling [Close] interrupts the method with [os.ErrClosed]. Calling [Flush]
// makes it return all records currently in the ring buffer, followed by [ErrFlushed].
//
// Returns [os.ErrDeadlineExceeded] if a deadline was set and the perf ring buffer
// was empty. Otherwise returns a record and no error, even if the deadline was
// exceeded.
// Returns [os.ErrDeadlineExceeded] if a deadline was set and after all records
// have been read from the ring.
//
// See [Reader.ReadInto] for a more efficient version of this method.
func (pr *Reader) Read() (Record, error) {
Expand All @@ -356,13 +355,13 @@ func (pr *Reader) ReadInto(rec *Record) error {
return fmt.Errorf("perf ringbuffer: %w", ErrClosed)
}

deadlineWasExceeded := false
for {
if len(pr.epollRings) == 0 {
if deadlineWasExceeded {
// All rings were empty when the deadline expired, return
if pe := pr.pendingErr; pe != nil {
// All rings have been emptied since the error occurred, return
// appropriate error.
return os.ErrDeadlineExceeded
pr.pendingErr = nil
return pe
}

// NB: The deferred pauseMu.Unlock will panic if Wait panics, which
Expand All @@ -371,10 +370,10 @@ func (pr *Reader) ReadInto(rec *Record) error {
_, err := pr.poller.Wait(pr.epollEvents, pr.deadline)
pr.pauseMu.Lock()

if errors.Is(err, os.ErrDeadlineExceeded) {
if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, ErrFlushed) {
// We've hit the deadline, check whether there is any data in
// the rings that we've not been woken up for.
deadlineWasExceeded = true
pr.pendingErr = err
} else if err != nil {
return err
}
Expand Down Expand Up @@ -463,6 +462,12 @@ func (pr *Reader) BufferSize() int {
return pr.bufferSize
}

// Flush unblocks Read/ReadInto and successive Read/ReadInto calls will return pending samples at this point,
// until you receive a [ErrFlushed] error.
func (pr *Reader) Flush() error {
return pr.poller.Flush()
}

// NB: Has to be preceded by a call to ring.loadHead.
func (pr *Reader) readRecordFromRing(rec *Record, ring *perfEventRing) error {
defer ring.writeTail()
Expand Down
Loading
Loading