Skip to content

Commit

Permalink
Fix shutdown panic (#980)
Browse files Browse the repository at this point in the history
* Fix shutdown panic

* Add TestRunStopping

* Add changelog entry

* Consolidate testing variables

* Add comment for wait/close
  • Loading branch information
MrAlias authored Aug 12, 2024
1 parent e9c44f7 commit 5daba70
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 39 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ OpenTelemetry Go Automatic Instrumentation adheres to [Semantic Versioning](http
- Support Go `v1.22.6`. ([#988](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/988))
- Support `golang.org/x/net` `v0.28.0`. ([#988](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/988))

### Fixed

- Fix dirty shutdown caused by panic. ([#980](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/980))

## [v0.14.0-alpha] - 2024-07-15

### Added
Expand Down
44 changes: 42 additions & 2 deletions instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"runtime"
"strconv"
"strings"
"sync"

"github.com/go-logr/logr"
"github.com/go-logr/stdr"
Expand Down Expand Up @@ -57,6 +58,10 @@ type Instrumentation struct {
target *process.TargetDetails
analyzer *process.Analyzer
manager *instrumentation.Manager

stopMu sync.Mutex
stop context.CancelFunc
stopped chan struct{}
}

// Error message returned when instrumentation is launched without a valid target
Expand Down Expand Up @@ -158,13 +163,48 @@ func NewInstrumentation(ctx context.Context, opts ...InstrumentationOption) (*In
}

// Run starts the instrumentation.
//
// This function will not return until either ctx is done, an unrecoverable
// error is encountered, or Close is called.
func (i *Instrumentation) Run(ctx context.Context) error {
return i.manager.Run(ctx, i.target)
ctx, err := i.newStop(ctx)
if err != nil {
return err
}

err = i.manager.Run(ctx, i.target)
close(i.stopped)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return nil
}
return err
}

func (i *Instrumentation) newStop(parent context.Context) (context.Context, error) {
i.stopMu.Lock()
defer i.stopMu.Unlock()

if i.stop != nil {
return parent, errors.New("instrumentation already running")
}

ctx, stop := context.WithCancel(parent)
i.stop, i.stopped = stop, make(chan struct{})
return ctx, nil
}

// Close closes the Instrumentation, cleaning up all used resources.
func (i *Instrumentation) Close() error {
return i.manager.Close()
i.stopMu.Lock()
defer i.stopMu.Unlock()

if i.stop != nil {
i.stop()
<-i.stopped

i.stop, i.stopped = nil, nil
}
return nil
}

// InstrumentationOption applies a configuration option to [Instrumentation].
Expand Down
63 changes: 26 additions & 37 deletions internal/pkg/instrumentation/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,20 @@ import (
"go.opentelemetry.io/auto/internal/pkg/process"
)

// Function variables overridden in testing.
var (
openExecutable = link.OpenExecutable
rlimitRemoveMemlock = rlimit.RemoveMemlock
bpffsMount = bpffs.Mount
bpffsCleanup = bpffs.Cleanup
)

// Manager handles the management of [probe.Probe] instances.
type Manager struct {
logger logr.Logger
probes map[probe.ID]probe.Probe
done chan bool
incomingEvents chan *probe.Event
otelController *opentelemetry.Controller
globalImpl bool
wg sync.WaitGroup
closingErrors chan error
loadedIndicator chan struct{}
}

Expand All @@ -46,11 +50,8 @@ func NewManager(logger logr.Logger, otelController *opentelemetry.Controller, gl
m := &Manager{
logger: logger,
probes: make(map[probe.ID]probe.Probe),
done: make(chan bool, 1),
incomingEvents: make(chan *probe.Event),
otelController: otelController,
globalImpl: globalImpl,
closingErrors: make(chan error, 1),
loadedIndicator: loadIndicator,
}

Expand Down Expand Up @@ -135,22 +136,21 @@ func (m *Manager) FilterUnusedProbes(target *process.TargetDetails) {
// Run runs the event processing loop for all managed probes.
func (m *Manager) Run(ctx context.Context, target *process.TargetDetails) error {
if len(m.probes) == 0 {
err := errors.New("no instrumentation for target process")
close(m.closingErrors)
return err
return errors.New("no instrumentation for target process")
}

err := m.load(target)
if err != nil {
close(m.closingErrors)
return err
}

m.wg.Add(len(m.probes))
eventCh := make(chan *probe.Event)
var wg sync.WaitGroup
for _, i := range m.probes {
wg.Add(1)
go func(p probe.Probe) {
defer m.wg.Done()
p.Run(m.incomingEvents)
defer wg.Done()
p.Run(eventCh)
}(i)
}

Expand All @@ -161,29 +161,27 @@ func (m *Manager) Run(ctx context.Context, target *process.TargetDetails) error
for {
select {
case <-ctx.Done():
m.logger.V(1).Info("shutting down all probes due to context cancellation")
err := m.cleanup(target)
err = errors.Join(err, ctx.Err())
m.closingErrors <- err
return nil
case <-m.done:
m.logger.V(1).Info("shutting down all probes due to signal")
m.logger.V(1).Info("Shutting down all probes")
err := m.cleanup(target)
m.closingErrors <- err
return nil
case e := <-m.incomingEvents:

// Wait for all probes to stop before closing the chan they send on.
wg.Wait()
close(eventCh)

return errors.Join(err, ctx.Err())
case e := <-eventCh:
m.otelController.Trace(e)
}
}
}

func (m *Manager) load(target *process.TargetDetails) error {
// Remove resource limits for kernels <5.11.
if err := rlimit.RemoveMemlock(); err != nil {
if err := rlimitRemoveMemlock(); err != nil {
return err
}

exe, err := link.OpenExecutable(fmt.Sprintf("/proc/%d/exe", target.PID))
exe, err := openExecutable(fmt.Sprintf("/proc/%d/exe", target.PID))
if err != nil {
return err
}
Expand Down Expand Up @@ -212,26 +210,17 @@ func (m *Manager) mount(target *process.TargetDetails) error {
} else {
m.logger.V(1).Info("Mounting bpffs")
}
return bpffs.Mount(target)
return bpffsMount(target)
}

func (m *Manager) cleanup(target *process.TargetDetails) error {
var err error
close(m.incomingEvents)
for _, i := range m.probes {
err = errors.Join(err, i.Close())
}

m.logger.V(1).Info("Cleaning bpffs")
return errors.Join(err, bpffs.Cleanup(target))
}

// Close closes m.
func (m *Manager) Close() error {
m.done <- true
err := <-m.closingErrors
m.wg.Wait()
return err
return errors.Join(err, bpffsCleanup(target))
}

func (m *Manager) registerProbes() error {
Expand Down
87 changes: 87 additions & 0 deletions internal/pkg/instrumentation/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
package instrumentation

import (
"context"
"log"
"os"
"testing"
"time"

"github.com/cilium/ebpf/link"
"github.com/go-logr/stdr"
"github.com/hashicorp/go-version"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -186,3 +189,87 @@ func fakeManager(t *testing.T) *Manager {

return m
}

func TestRunStopping(t *testing.T) {
probeStop := make(chan struct{})
p := newSlowProbe(probeStop)

logger := stdr.New(log.New(os.Stderr, "", log.LstdFlags))
logger = logger.WithName("Instrumentation")

m := &Manager{
logger: logger.WithName("Manager"),
probes: map[probe.ID]probe.Probe{{}: p},
}

origOpenExecutable := openExecutable
openExecutable = func(string) (*link.Executable, error) { return nil, nil }
t.Cleanup(func() { openExecutable = origOpenExecutable })

origRlimitRemoveMemlock := rlimitRemoveMemlock
rlimitRemoveMemlock = func() error { return nil }
t.Cleanup(func() { rlimitRemoveMemlock = origRlimitRemoveMemlock })

origBpffsMount := bpffsMount
bpffsMount = func(*process.TargetDetails) error { return nil }
t.Cleanup(func() { bpffsMount = origBpffsMount })

origBpffsCleanup := bpffsCleanup
bpffsCleanup = func(*process.TargetDetails) error { return nil }
t.Cleanup(func() { bpffsCleanup = origBpffsCleanup })

ctx, stopCtx := context.WithCancel(context.Background())
errCh := make(chan error, 1)
go func() { errCh <- m.Run(ctx, &process.TargetDetails{PID: 1000}) }()

assert.NotPanics(t, func() {
stopCtx()
assert.Eventually(t, func() bool {
select {
case <-p.closeSignal:
return true
default:
return false
}
}, time.Second, 10*time.Millisecond)
close(probeStop)
})

var err error
assert.Eventually(t, func() bool {
select {
case err = <-errCh:
return true
default:
return false
}
}, time.Second, 10*time.Millisecond)
assert.ErrorIs(t, err, context.Canceled, "Stopping Run error")
}

type slowProbe struct {
probe.Probe

closeSignal chan struct{}
stop chan struct{}
}

func newSlowProbe(stop chan struct{}) slowProbe {
return slowProbe{
closeSignal: make(chan struct{}),
stop: stop,
}
}

func (p slowProbe) Load(*link.Executable, *process.TargetDetails) error {
return nil
}

func (p slowProbe) Run(c chan<- *probe.Event) {
}

func (p slowProbe) Close() error {
p.closeSignal <- struct{}{}
<-p.stop
return nil
}

0 comments on commit 5daba70

Please sign in to comment.