From 002c23dc01f3cd360007ab7309d758a0e21be48b Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Wed, 11 Nov 2020 09:49:18 +0100 Subject: [PATCH] Add `StopNow()` method to stop the client without retries. Use `StopNow()` from the docker-driver --- cmd/docker-driver/config.go | 4 +++- cmd/docker-driver/loki.go | 2 +- cmd/docker-driver/main.go | 4 +++- cmd/fluent-bit/dque.go | 6 ++++++ cmd/fluent-bit/loki_test.go | 2 ++ pkg/promtail/client/client.go | 24 +++++++++++++++++------- pkg/promtail/client/fake/client.go | 5 +++++ pkg/promtail/client/logger.go | 2 ++ pkg/promtail/client/multi.go | 7 +++++++ 9 files changed, 46 insertions(+), 10 deletions(-) diff --git a/cmd/docker-driver/config.go b/cmd/docker-driver/config.go index 9676d5defe28..f34a28168943 100644 --- a/cmd/docker-driver/config.go +++ b/cmd/docker-driver/config.go @@ -74,7 +74,9 @@ var ( MaxBackoff: client.MaxBackoff, MaxRetries: client.MaxRetries, }, - Timeout: client.Timeout, + // Avoid blocking the docker-driver on the worst case + // https://github.com/grafana/loki/pull/2898#issuecomment-730218963 + Timeout: 5 * time.Second, } ) diff --git a/cmd/docker-driver/loki.go b/cmd/docker-driver/loki.go index 11a5a7b8c618..6496bf380e3f 100644 --- a/cmd/docker-driver/loki.go +++ b/cmd/docker-driver/loki.go @@ -70,6 +70,6 @@ func (l *loki) Name() string { // Log implements `logger.Logger` func (l *loki) Close() error { - l.client.Stop() + l.client.StopNow() return nil } diff --git a/cmd/docker-driver/main.go b/cmd/docker-driver/main.go index 6dd8d1f87b4f..3f76ed7fc687 100644 --- a/cmd/docker-driver/main.go +++ b/cmd/docker-driver/main.go @@ -30,12 +30,14 @@ func main() { } logger := newLogger(logLevel) level.Info(util.Logger).Log("msg", "Starting docker-plugin", "version", version.Info()) + h := sdk.NewHandler(`{"Implements": ["LoggingDriver"]}`) + handlers(&h, newDriver(logger)) + if err := h.ServeUnix(socketAddress, 0); err != nil { panic(err) } - } func newLogger(lvl logging.Level) log.Logger { diff --git a/cmd/fluent-bit/dque.go b/cmd/fluent-bit/dque.go index 66d2cb55ed9b..8636eb2ef4e5 100644 --- a/cmd/fluent-bit/dque.go +++ b/cmd/fluent-bit/dque.go @@ -109,6 +109,12 @@ func (c *dqueClient) Stop() { c.loki.Stop() } +// Stop the client +func (c *dqueClient) StopNow() { + c.once.Do(func() { c.queue.Close() }) + c.loki.StopNow() +} + // Handle implement EntryHandler; adds a new line to the next batch; send is async. func (c *dqueClient) Handle(ls model.LabelSet, t time.Time, s string) error { if err := c.queue.Enqueue(&dqueEntry{ls, t, s}); err != nil { diff --git a/cmd/fluent-bit/loki_test.go b/cmd/fluent-bit/loki_test.go index d687ee12df3f..3c3f437a75b1 100644 --- a/cmd/fluent-bit/loki_test.go +++ b/cmd/fluent-bit/loki_test.go @@ -33,6 +33,8 @@ func (r *recorder) toEntry() *entry { return r.entry } func (r *recorder) Stop() {} +func (r *recorder) StopNow() {} + var now = time.Now() func Test_loki_sendRecord(t *testing.T) { diff --git a/pkg/promtail/client/client.go b/pkg/promtail/client/client.go index a9de17e2e281..ca87c0881943 100644 --- a/pkg/promtail/client/client.go +++ b/pkg/promtail/client/client.go @@ -112,14 +112,20 @@ type Client interface { api.EntryHandler // Stop goroutine sending batch of entries. Stop() + + // Stop goroutine sending batch of entries without retries. + StopNow() } // Client for pushing logs in snappy-compressed protos over HTTP. type client struct { - logger log.Logger - cfg Config - client *http.Client - quit chan struct{} + logger log.Logger + cfg Config + client *http.Client + + // quit chan is depricated. Will be removed. Use `client.ctx` and `client.cancel` instead. + quit chan struct{} + once sync.Once entries chan entry wg sync.WaitGroup @@ -356,13 +362,17 @@ func (c *client) getTenantID(labels model.LabelSet) string { // Stop the client. func (c *client) Stop() { - // cancel any upstream calls made using client's `ctx`. - c.cancel() - c.once.Do(func() { close(c.quit) }) c.wg.Wait() } +// StopNow stops the client without retries +func (c *client) StopNow() { + // cancel any upstream calls made using client's `ctx`. + c.cancel() + c.Stop() +} + // Handle implement EntryHandler; adds a new line to the next batch; send is async. func (c *client) Handle(ls model.LabelSet, t time.Time, s string) error { if len(c.externalLabels) > 0 { diff --git a/pkg/promtail/client/fake/client.go b/pkg/promtail/client/fake/client.go index b104388912c2..ab7bd1686c00 100644 --- a/pkg/promtail/client/fake/client.go +++ b/pkg/promtail/client/fake/client.go @@ -19,6 +19,11 @@ func (c *Client) Stop() { c.OnStop() } +// StopNow implements client.Client +func (c *Client) StopNow() { + c.OnStop() +} + // Handle implements client.Client func (c *Client) Handle(labels model.LabelSet, time time.Time, entry string) error { return c.OnHandleEntry.Handle(labels, time, entry) diff --git a/pkg/promtail/client/logger.go b/pkg/promtail/client/logger.go index b2ec8cac6206..dc164fa1141b 100644 --- a/pkg/promtail/client/logger.go +++ b/pkg/promtail/client/logger.go @@ -58,6 +58,8 @@ func NewLogger(log log.Logger, externalLabels lokiflag.LabelSet, cfgs ...Config) func (*logger) Stop() {} +func (*logger) StopNow() {} + func (l *logger) Handle(labels model.LabelSet, time time.Time, entry string) error { l.Lock() defer l.Unlock() diff --git a/pkg/promtail/client/multi.go b/pkg/promtail/client/multi.go index a5f2c63e47b0..0e8720ca5da3 100644 --- a/pkg/promtail/client/multi.go +++ b/pkg/promtail/client/multi.go @@ -55,3 +55,10 @@ func (m MultiClient) Stop() { c.Stop() } } + +// StopNow implements Client +func (m MultiClient) StopNow() { + for _, c := range m { + c.StopNow() + } +}