Skip to content

Commit

Permalink
impr(promtail/client): Add cancelling mechanism for batchSend
Browse files Browse the repository at this point in the history
stores `context` and its `cancel` in client and uses it to
make upstream calls that need `ctx`.

Also properly cancel's all the upstream calls that using `ctx` when
client's Stop is called.
  • Loading branch information
kavirajk committed Nov 23, 2020
1 parent e30a37f commit 129ca28
Showing 1 changed file with 13 additions and 3 deletions.
16 changes: 13 additions & 3 deletions pkg/promtail/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ type client struct {
wg sync.WaitGroup

externalLabels model.LabelSet

// ctx is used in any upstream calls from the `client`.
ctx context.Context
cancel context.CancelFunc
}

type entry struct {
Expand All @@ -139,13 +143,17 @@ func New(cfg Config, logger log.Logger) (Client, error) {
return nil, errors.New("client needs target URL")
}

ctx, cancel := context.WithCancel(context.Background())

c := &client{
logger: log.With(logger, "component", "client", "host", cfg.URL.Host),
cfg: cfg,
quit: make(chan struct{}),
entries: make(chan entry),

externalLabels: cfg.ExternalLabels.LabelSet,
ctx: ctx,
cancel: cancel,
}

err := cfg.Client.Validate()
Expand Down Expand Up @@ -246,12 +254,11 @@ func (c *client) sendBatch(tenantID string, batch *batch) {
bufBytes := float64(len(buf))
encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)

ctx := context.Background()
backoff := util.NewBackoff(ctx, c.cfg.BackoffConfig)
backoff := util.NewBackoff(c.ctx, c.cfg.BackoffConfig)
var status int
for backoff.Ongoing() {
start := time.Now()
status, err = c.send(ctx, tenantID, buf)
status, err = c.send(c.ctx, tenantID, buf)
requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds())

if err == nil {
Expand Down Expand Up @@ -349,6 +356,9 @@ func (c *client) getTenantID(labels model.LabelSet) string {

// Stop the client.
func (c *client) Stop() {
// cancel any upstream calls made using client's `ctx`.
c.cancel()

c.once.Do(func() { close(c.quit) })
c.wg.Wait()
}
Expand Down

0 comments on commit 129ca28

Please sign in to comment.