Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf add timeout argument #736

Merged
merged 5 commits into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions internal/epoll/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"runtime"
"sync"
"time"

"github.com/cilium/ebpf/internal"
"github.com/cilium/ebpf/internal/unix"
Expand Down Expand Up @@ -118,8 +119,8 @@ func (p *Poller) Add(fd int, id int) error {
// Wait for events.
//
// Returns the number of pending events or an error wrapping os.ErrClosed if
// Close is called.
func (p *Poller) Wait(events []unix.EpollEvent) (int, error) {
// Close is called, or os.ErrDeadlineExceeded if EpollWait timeout.
func (p *Poller) Wait(events []unix.EpollEvent, deadline time.Time) (int, error) {
lmb marked this conversation as resolved.
Show resolved Hide resolved
p.epollMu.Lock()
defer p.epollMu.Unlock()

Expand All @@ -128,7 +129,12 @@ func (p *Poller) Wait(events []unix.EpollEvent) (int, error) {
}

for {
n, err := unix.EpollWait(p.epollFd, events, -1)
msec := int(-1)
if !deadline.IsZero() {
msec = int(time.Until(deadline).Milliseconds())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug in my own code: this doesn't handle deadlines in the past or so far in the future that msec is overflowed.

I'll fix that up in a follow up.

}

n, err := unix.EpollWait(p.epollFd, events, msec)
if temp, ok := err.(temporaryError); ok && temp.Temporary() {
// Retry the syscall if we were interrupted, see https://github.com/golang/go/issues/20400
continue
Expand All @@ -138,6 +144,10 @@ func (p *Poller) Wait(events []unix.EpollEvent) (int, error) {
return 0, err
}

if n == 0 && msec != -1 {
return 0, fmt.Errorf("poll: %w", os.ErrDeadlineExceeded)
}

for _, event := range events[:n] {
if int(event.Fd) == p.event.raw {
// Since we don't read p.event the event is never cleared and
Expand Down
17 changes: 12 additions & 5 deletions internal/epoll/poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ func TestPoller(t *testing.T) {
}

done := make(chan struct{}, 1)
read := func() {
read := func(timeOut time.Time) {
defer func() {
done <- struct{}{}
}()

events := make([]unix.EpollEvent, 1)

n, err := poller.Wait(events)
if errors.Is(err, os.ErrClosed) {
n, err := poller.Wait(events, timeOut)
if errors.Is(err, os.ErrClosed) || errors.Is(err, os.ErrDeadlineExceeded) {
return
}

Expand All @@ -57,7 +57,7 @@ func TestPoller(t *testing.T) {
t.Fatal(err)
}

go read()
go read(time.Time{})
select {
case <-done:
case <-time.After(time.Second):
Expand All @@ -68,7 +68,14 @@ func TestPoller(t *testing.T) {
t.Fatal(err)
}

go read()
go read(time.Now().Add(200 * time.Millisecond))
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("Timed out")
}

go read(time.Time{})
select {
case <-done:
t.Fatal("Wait doesn't block")
Expand Down
19 changes: 17 additions & 2 deletions perf/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"runtime"
"sync"
"time"

"github.com/cilium/ebpf"
"github.com/cilium/ebpf/internal"
Expand Down Expand Up @@ -133,7 +134,8 @@ func readRawSample(rd io.Reader, buf, sampleBuf []byte) ([]byte, error) {
// Reader allows reading bpf_perf_event_output
// from user space.
type Reader struct {
poller *epoll.Poller
poller *epoll.Poller
deadline time.Time

// mu protects read/write access to the Reader structure with the
// exception of 'pauseFds', which is protected by 'pauseMu'.
Expand Down Expand Up @@ -237,6 +239,7 @@ func NewReaderWithOptions(array *ebpf.Map, perCPUBuffer int, opts ReaderOptions)
array: array,
rings: rings,
poller: poller,
deadline: time.Time{},
epollEvents: make([]unix.EpollEvent, len(rings)),
epollRings: make([]*perfEventRing, 0, len(rings)),
eventHeader: make([]byte, perfEventHeaderSize),
Expand Down Expand Up @@ -280,6 +283,16 @@ func (pr *Reader) Close() error {
return nil
}

// SetDeadline controls how long Read and ReadInto will block waiting for samples.
//
// Passing a zero time.Time will remove the deadline.
func (pr *Reader) SetDeadline(t time.Time) {
pr.mu.Lock()
defer pr.mu.Unlock()

pr.deadline = t
}

// Read the next record from the perf ring buffer.
//
// The function blocks until there are at least Watermark bytes in one
Expand All @@ -290,6 +303,8 @@ func (pr *Reader) Close() error {
// depending on the input sample's length.
//
// Calling Close interrupts the function.
//
// If SetDeadline the Read deadline, Read might return os.ErrDeadlineExceeded.
func (pr *Reader) Read() (Record, error) {
var r Record
return r, pr.ReadInto(&r)
Expand All @@ -306,7 +321,7 @@ func (pr *Reader) ReadInto(rec *Record) error {

for {
if len(pr.epollRings) == 0 {
nEvents, err := pr.poller.Wait(pr.epollEvents)
nEvents, err := pr.poller.Wait(pr.epollEvents, pr.deadline)
if err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions perf/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func TestPerfReader(t *testing.T) {
if record.CPU < 0 {
t.Error("Record has invalid CPU number")
}

rd.SetDeadline(time.Now().Add(4 * time.Millisecond))
_, err = rd.Read()
qt.Assert(t, errors.Is(err, os.ErrDeadlineExceeded), qt.IsTrue, qt.Commentf("expected os.ErrDeadlineExceeded"))
}

func outputSamplesProg(sampleSizes ...int) (*ebpf.Program, *ebpf.Map, error) {
Expand Down
3 changes: 2 additions & 1 deletion ringbuf/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"os"
"sync"
"time"

"github.com/cilium/ebpf"
"github.com/cilium/ebpf/internal"
Expand Down Expand Up @@ -190,7 +191,7 @@ func (r *Reader) ReadInto(rec *Record) error {

for {
if !r.haveData {
_, err := r.poller.Wait(r.epollEvents[:cap(r.epollEvents)])
_, err := r.poller.Wait(r.epollEvents[:cap(r.epollEvents)], time.Time{})
if err != nil {
return err
}
Expand Down