Skip to content

Commit

Permalink
addworker notification for preventing goroutines leak (#894)
Browse files Browse the repository at this point in the history
  • Loading branch information
manc88 authored Dec 11, 2024
1 parent e132c54 commit 3e6309a
Show file tree
Hide file tree
Showing 11 changed files with 124 additions and 24 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,19 @@ The Sentry SDK team is happy to announce the immediate availability of Sentry Go

### Bug Fixes

- Add support for closing worker goroutines started by HTTPTranport to prevent goroutine leaks ([#894](https://github.com/getsentry/sentry-go/pull/894)).

```go
client, _ := sentry.NewClient()
defer client.Close()
```

Worker can be also closed by calling `Close()` method on the `HTTPTransport` instance. `Close` should be called after `Flush` and before terminating the program otherwise some events may be lost.

```go
transport := sentry.NewHTTPTransport()
defer transport.Close()
```
- Prevent panic in `fasthttp` and `fiber` integration in case a malformed URL has to be parsed ([#912](https://github.com/getsentry/sentry-go/pull/912))

### Misc
Expand Down
8 changes: 8 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,14 @@ func (client *Client) Flush(timeout time.Duration) bool {
return client.Transport.Flush(timeout)
}

// Close clean up underlying Transport resources.
//
// Close should be called after Flush and before terminating the program
// otherwise some events may be lost.
func (client *Client) Close() {
client.Transport.Close()
}

// EventFromMessage creates an event from the given message string.
func (client *Client) EventFromMessage(message string, level Level) *Event {
if message == "" {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/stretchr/testify v1.8.2
github.com/urfave/negroni/v3 v3.1.1
github.com/valyala/fasthttp v1.52.0
go.uber.org/goleak v1.3.0
golang.org/x/sys v0.18.0
golang.org/x/text v0.14.0
)
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ github.com/yudai/pp v2.0.1+incompatible h1:Q4//iY4pNF6yPLZIigmvcl7k/bPgrcTPIFIcm
github.com/yudai/pp v2.0.1+incompatible/go.mod h1:PuxR/8QJ7cyCkFp/aUDS+JY727OFEZkTdatxwunjIkc=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
Expand Down
2 changes: 2 additions & 0 deletions mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,5 @@ func (t *TransportMock) Events() []*Event {
defer t.mu.Unlock()
return t.events
}
func (t *TransportMock) Close() {}

2 changes: 2 additions & 0 deletions otel/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ go.opentelemetry.io/otel/sdk v1.11.0 h1:ZnKIL9V9Ztaq+ME43IUi/eo22mNsb6a7tGfzaOWB
go.opentelemetry.io/otel/sdk v1.11.0/go.mod h1:REusa8RsyKaq0OlyangWXaw97t2VogoO4SSEeKkSTAk=
go.opentelemetry.io/otel/trace v1.11.0 h1:20U/Vj42SX+mASlXLmSGBg6jpI1jQtv682lZtTAOVFI=
go.opentelemetry.io/otel/trace v1.11.0/go.mod h1:nyYjis9jy0gytE9LXGU+/m1sHTKbRY0fX0hulNNDP1U=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
Expand Down
2 changes: 2 additions & 0 deletions otel/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,6 @@ func (t *TransportMock) Events() []*sentry.Event {
return t.events
}

func (t *TransportMock) Close() {}

//
2 changes: 2 additions & 0 deletions slog/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
Expand Down
75 changes: 51 additions & 24 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Transport interface {
Flush(timeout time.Duration) bool
Configure(options ClientOptions)
SendEvent(event *Event)
Close()
}

func getProxyConfig(options ClientOptions) func(*http.Request) (*url.URL, error) {
Expand Down Expand Up @@ -289,13 +290,17 @@ type HTTPTransport struct {

mu sync.RWMutex
limits ratelimit.Map

// receiving signal will terminate worker.
done chan struct{}
}

// NewHTTPTransport returns a new pre-configured instance of HTTPTransport.
func NewHTTPTransport() *HTTPTransport {
transport := HTTPTransport{
BufferSize: defaultBufferSize,
Timeout: defaultTimeout,
done: make(chan struct{}),
}
return &transport
}
Expand Down Expand Up @@ -461,6 +466,15 @@ fail:
return false
}

// Close will terminate events sending loop.
// It useful to prevent goroutines leak in case of multiple HTTPTransport instances initiated.
//
// Close should be called after Flush and before terminating the program
// otherwise some events may be lost.
func (t *HTTPTransport) Close() {
close(t.done)
}

func (t *HTTPTransport) worker() {
for b := range t.buffer {
// Signal that processing of the current batch has started.
Expand All @@ -471,35 +485,44 @@ func (t *HTTPTransport) worker() {
t.buffer <- b

// Process all batch items.
for item := range b.items {
if t.disabled(item.category) {
continue
}
loop:
for {
select {
case <-t.done:
return
case item, open := <-b.items:
if !open {
break loop
}
if t.disabled(item.category) {
continue
}

response, err := t.client.Do(item.request)
if err != nil {
Logger.Printf("There was an issue with sending an event: %v", err)
continue
}
if response.StatusCode >= 400 && response.StatusCode <= 599 {
b, err := io.ReadAll(response.Body)
response, err := t.client.Do(item.request)
if err != nil {
Logger.Printf("Error while reading response code: %v", err)
Logger.Printf("There was an issue with sending an event: %v", err)
continue
}
if response.StatusCode >= 400 && response.StatusCode <= 599 {
b, err := io.ReadAll(response.Body)
if err != nil {
Logger.Printf("Error while reading response code: %v", err)
}
Logger.Printf("Sending %s failed with the following error: %s", eventType, string(b))
}
Logger.Printf("Sending %s failed with the following error: %s", eventType, string(b))
}

t.mu.Lock()
if t.limits == nil {
t.limits = make(ratelimit.Map)
}
t.limits.Merge(ratelimit.FromResponse(response))
t.mu.Unlock()
t.mu.Lock()
if t.limits == nil {
t.limits = make(ratelimit.Map)
}
t.limits.Merge(ratelimit.FromResponse(response))
t.mu.Unlock()

// Drain body up to a limit and close it, allowing the
// transport to reuse TCP connections.
_, _ = io.CopyN(io.Discard, response.Body, maxDrainResponseBytes)
response.Body.Close()
// Drain body up to a limit and close it, allowing the
// transport to reuse TCP connections.
_, _ = io.CopyN(io.Discard, response.Body, maxDrainResponseBytes)
response.Body.Close()
}
}

// Signal that processing of the batch is done.
Expand Down Expand Up @@ -587,6 +610,8 @@ func (t *HTTPSyncTransport) SendEvent(event *Event) {
t.SendEventWithContext(context.Background(), event)
}

func (t *HTTPSyncTransport) Close() {}

// SendEventWithContext assembles a new packet out of Event and sends it to the remote server.
func (t *HTTPSyncTransport) SendEventWithContext(ctx context.Context, event *Event) {
if t.dsn == nil {
Expand Down Expand Up @@ -680,3 +705,5 @@ func (noopTransport) SendEvent(*Event) {
func (noopTransport) Flush(time.Duration) bool {
return true
}

func (noopTransport) Close() {}
39 changes: 39 additions & 0 deletions transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/getsentry/sentry-go/internal/testutils"
"github.com/google/go-cmp/cmp"
"go.uber.org/goleak"
)

type unserializableType struct {
Expand Down Expand Up @@ -629,3 +630,41 @@ func testRateLimiting(t *testing.T, tr Transport) {
t.Errorf("got transactionEvent = %d, want %d", n, 1)
}
}

func TestHTTPTransportDoesntLeakGoroutines(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())

transport := NewHTTPTransport()
transport.Configure(ClientOptions{
Dsn: "https://test@foobar/1",
HTTPClient: http.DefaultClient,
})

transport.Flush(0)
transport.Close()
}

func TestHTTPTransportClose(t *testing.T) {
transport := NewHTTPTransport()
transport.Configure(ClientOptions{
Dsn: "https://test@foobar/1",
HTTPClient: http.DefaultClient,
})

transport.Close()

select {
case <-transport.done:
case <-time.After(time.Second):
t.Error("transport.done not closed after Close")
}
}

func TestHTTPSyncTransportClose(t *testing.T) {
// Close does not do anything for HTTPSyncTransport, added for coverage.
transport := HTTPSyncTransport{}
transport.Close()

tr := noopTransport{}
tr.Close()
}
2 changes: 2 additions & 0 deletions zerolog/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8=
github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down

0 comments on commit 3e6309a

Please sign in to comment.