From dfebbda5671c1073fa58967de4cb6dcfe316c75e Mon Sep 17 00:00:00 2001 From: Mikhail Zvonov Date: Fri, 18 Oct 2024 20:42:31 +0300 Subject: [PATCH] feature/graceful transport worker termination --- client.go | 4 ++++ transport.go | 64 ++++++++++++++++++++++++++++++++-------------------- 2 files changed, 44 insertions(+), 24 deletions(-) diff --git a/client.go b/client.go index b5b11b31d..64ec1792b 100644 --- a/client.go +++ b/client.go @@ -226,6 +226,10 @@ type ClientOptions struct { MaxErrorDepth int // Default event tags. These are overridden by tags set on a scope. Tags map[string]string + + // Optional chan receiving from caller signal about termination. + // Useful to prevent goroutines leak in case of multiple Transport instances initiated. + Done <-chan struct{} } // Client is the underlying processor that is used by the main API and Hub diff --git a/transport.go b/transport.go index 02fc1d4f1..eddbfda90 100644 --- a/transport.go +++ b/transport.go @@ -350,6 +350,9 @@ type HTTPTransport struct { mu sync.RWMutex limits ratelimit.Map + + // receiving struct means caller terminates. + done <-chan struct{} } // NewHTTPTransport returns a new pre-configured instance of HTTPTransport. @@ -398,6 +401,10 @@ func (t *HTTPTransport) Configure(options ClientOptions) { } } + if options.Done != nil { + t.done = options.Done + } + t.start.Do(func() { go t.worker() }) @@ -532,35 +539,44 @@ func (t *HTTPTransport) worker() { t.buffer <- b // Process all batch items. - for item := range b.items { - if t.disabled(item.category) { - continue - } + loop: + for { + select { + case <-t.done: + return + case item, open := <-b.items: + if !open { + break loop + } + if t.disabled(item.category) { + continue + } - response, err := t.client.Do(item.request) - if err != nil { - Logger.Printf("There was an issue with sending an event: %v", err) - continue - } - if response.StatusCode >= 400 && response.StatusCode <= 599 { - b, err := io.ReadAll(response.Body) + response, err := t.client.Do(item.request) if err != nil { - Logger.Printf("Error while reading response code: %v", err) + Logger.Printf("There was an issue with sending an event: %v", err) + continue + } + if response.StatusCode >= 400 && response.StatusCode <= 599 { + b, err := io.ReadAll(response.Body) + if err != nil { + Logger.Printf("Error while reading response code: %v", err) + } + Logger.Printf("Sending %s failed with the following error: %s", eventType, string(b)) } - Logger.Printf("Sending %s failed with the following error: %s", eventType, string(b)) - } - t.mu.Lock() - if t.limits == nil { - t.limits = make(ratelimit.Map) - } - t.limits.Merge(ratelimit.FromResponse(response)) - t.mu.Unlock() + t.mu.Lock() + if t.limits == nil { + t.limits = make(ratelimit.Map) + } + t.limits.Merge(ratelimit.FromResponse(response)) + t.mu.Unlock() - // Drain body up to a limit and close it, allowing the - // transport to reuse TCP connections. - _, _ = io.CopyN(io.Discard, response.Body, maxDrainResponseBytes) - response.Body.Close() + // Drain body up to a limit and close it, allowing the + // transport to reuse TCP connections. + _, _ = io.CopyN(io.Discard, response.Body, maxDrainResponseBytes) + response.Body.Close() + } } // Signal that processing of the batch is done.