Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(docker-driver): Propagate promtail's client.Stop properly #2898

Merged
merged 5 commits into from
Dec 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ helm-clean:
#################

# optionally set the tag or the arch suffix (-arm64)
LOKI_DOCKER_DRIVER ?= "grafana/loki-docker-driver"
PLUGIN_TAG ?= $(IMAGE_TAG)
PLUGIN_ARCH ?=

Expand All @@ -393,24 +394,24 @@ docker-driver: docker-driver-clean
(docker export $$ID | tar -x -C cmd/docker-driver/rootfs) && \
docker rm -vf $$ID
docker rmi rootfsimage -f
docker plugin create grafana/loki-docker-driver:$(PLUGIN_TAG)$(PLUGIN_ARCH) cmd/docker-driver
docker plugin create grafana/loki-docker-driver:latest$(PLUGIN_ARCH) cmd/docker-driver
docker plugin create $(LOKI_DOCKER_DRIVER):$(PLUGIN_TAG)$(PLUGIN_ARCH) cmd/docker-driver
docker plugin create $(LOKI_DOCKER_DRIVER):latest$(PLUGIN_ARCH) cmd/docker-driver

cmd/docker-driver/docker-driver: $(APP_GO_FILES)
CGO_ENABLED=0 go build $(GO_FLAGS) -o $@ ./$(@D)
$(NETGO_CHECK)

docker-driver-push: docker-driver
docker plugin push grafana/loki-docker-driver:$(PLUGIN_TAG)$(PLUGIN_ARCH)
docker plugin push grafana/loki-docker-driver:latest$(PLUGIN_ARCH)
docker plugin push $(LOKI_DOCKER_DRIVER):$(PLUGIN_TAG)$(PLUGIN_ARCH)
docker plugin push $(LOKI_DOCKER_DRIVER):latest$(PLUGIN_ARCH)

docker-driver-enable:
docker plugin enable grafana/loki-docker-driver:$(PLUGIN_TAG)$(PLUGIN_ARCH)
docker plugin enable $(LOKI_DOCKER_DRIVER):$(PLUGIN_TAG)$(PLUGIN_ARCH)

docker-driver-clean:
-docker plugin disable grafana/loki-docker-driver:$(PLUGIN_TAG)$(PLUGIN_ARCH)
-docker plugin rm grafana/loki-docker-driver:$(PLUGIN_TAG)$(PLUGIN_ARCH)
-docker plugin rm grafana/loki-docker-driver:latest$(PLUGIN_ARCH)
-docker plugin disable $(LOKI_DOCKER_DRIVER):$(PLUGIN_TAG)$(PLUGIN_ARCH)
-docker plugin rm $(LOKI_DOCKER_DRIVER):$(PLUGIN_TAG)$(PLUGIN_ARCH)
-docker plugin rm $(LOKI_DOCKER_DRIVER):latest$(PLUGIN_ARCH)
rm -rf cmd/docker-driver/rootfs

#####################
Expand Down
2 changes: 1 addition & 1 deletion cmd/docker-driver/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,6 @@ func (l *loki) Name() string {

// Log implements `logger.Logger`
func (l *loki) Close() error {
l.client.Stop()
l.client.StopNow()
return nil
}
4 changes: 3 additions & 1 deletion cmd/docker-driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ func main() {
}
logger := newLogger(logLevel)
level.Info(util.Logger).Log("msg", "Starting docker-plugin", "version", version.Info())

h := sdk.NewHandler(`{"Implements": ["LoggingDriver"]}`)

handlers(&h, newDriver(logger))

if err := h.ServeUnix(socketAddress, 0); err != nil {
panic(err)
}

}

func newLogger(lvl logging.Level) log.Logger {
Expand Down
6 changes: 6 additions & 0 deletions cmd/fluent-bit/dque.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ func (c *dqueClient) Stop() {
c.loki.Stop()
}

// Stop the client
func (c *dqueClient) StopNow() {
c.once.Do(func() { c.queue.Close() })
c.loki.StopNow()
}

// Handle implement EntryHandler; adds a new line to the next batch; send is async.
func (c *dqueClient) Handle(ls model.LabelSet, t time.Time, s string) error {
if err := c.queue.Enqueue(&dqueEntry{ls, t, s}); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions cmd/fluent-bit/loki_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ func (r *recorder) toEntry() *entry { return r.entry }

func (r *recorder) Stop() {}

func (r *recorder) StopNow() {}

var now = time.Now()

func Test_loki_sendRecord(t *testing.T) {
Expand Down
43 changes: 35 additions & 8 deletions pkg/promtail/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,19 +112,29 @@ type Client interface {
api.EntryHandler
// Stop goroutine sending batch of entries.
Stop()

// Stop goroutine sending batch of entries without retries.
StopNow()
}

// Client for pushing logs in snappy-compressed protos over HTTP.
type client struct {
logger log.Logger
cfg Config
client *http.Client
quit chan struct{}
logger log.Logger
cfg Config
client *http.Client

// quit chan is depricated. Will be removed. Use `client.ctx` and `client.cancel` instead.
quit chan struct{}

once sync.Once
entries chan entry
wg sync.WaitGroup

externalLabels model.LabelSet

// ctx is used in any upstream calls from the `client`.
ctx context.Context
cancel context.CancelFunc
}

type entry struct {
Expand All @@ -139,13 +149,17 @@ func New(cfg Config, logger log.Logger) (Client, error) {
return nil, errors.New("client needs target URL")
}

ctx, cancel := context.WithCancel(context.Background())

c := &client{
logger: log.With(logger, "component", "client", "host", cfg.URL.Host),
cfg: cfg,
quit: make(chan struct{}),
entries: make(chan entry),

externalLabels: cfg.ExternalLabels.LabelSet,
ctx: ctx,
cancel: cancel,
}

err := cfg.Client.Validate()
Expand Down Expand Up @@ -246,12 +260,13 @@ func (c *client) sendBatch(tenantID string, batch *batch) {
bufBytes := float64(len(buf))
encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)

ctx := context.Background()
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
backoff := util.NewBackoff(ctx, c.cfg.BackoffConfig)
backoff := util.NewBackoff(c.ctx, c.cfg.BackoffConfig)
var status int
for backoff.Ongoing() {
for {
start := time.Now()
status, err = c.send(ctx, tenantID, buf)
// send uses `timeout` internally, so `context.Background` is good enough.
status, err = c.send(context.Background(), tenantID, buf)

requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds())

if err == nil {
Expand Down Expand Up @@ -288,6 +303,11 @@ func (c *client) sendBatch(tenantID string, batch *batch) {
level.Warn(c.logger).Log("msg", "error sending batch, will retry", "status", status, "error", err)
batchRetries.WithLabelValues(c.cfg.URL.Host).Inc()
backoff.Wait()

// Make sure it sends at least once before checking for retry.
if !backoff.Ongoing() {
break
}
}

if err != nil {
Expand Down Expand Up @@ -353,6 +373,13 @@ func (c *client) Stop() {
c.wg.Wait()
}

// StopNow stops the client without retries
func (c *client) StopNow() {
// cancel any upstream calls made using client's `ctx`.
c.cancel()
c.Stop()
}

// Handle implement EntryHandler; adds a new line to the next batch; send is async.
func (c *client) Handle(ls model.LabelSet, t time.Time, s string) error {
if len(c.externalLabels) > 0 {
Expand Down
141 changes: 141 additions & 0 deletions pkg/promtail/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,147 @@ func TestClient_Handle(t *testing.T) {
}
}

func TestClient_StopNow(t *testing.T) {
cases := []struct {
name string
clientBatchSize int
clientBatchWait time.Duration
clientMaxRetries int
clientTenantID string
serverResponseStatus int
inputEntries []entry
inputDelay time.Duration
expectedReqs []receivedReq
expectedMetrics string
}{
{
name: "send requests shouldn't be cancelled after StopNow()",
clientBatchSize: 10,
clientBatchWait: 100 * time.Millisecond,
clientMaxRetries: 3,
serverResponseStatus: 200,
inputEntries: []entry{logEntries[0], logEntries[1], logEntries[2]},
expectedReqs: []receivedReq{
{
tenantID: "",
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry, logEntries[1].Entry}}}},
},
{
tenantID: "",
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[2].Entry}}}},
},
},
expectedMetrics: `
# HELP promtail_sent_entries_total Number of log entries sent to the ingester.
# TYPE promtail_sent_entries_total counter
promtail_sent_entries_total{host="__HOST__"} 3.0
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__"} 0
`,
},
{
name: "shouldn't retry after StopNow()",
clientBatchSize: 10,
clientBatchWait: 10 * time.Millisecond,
clientMaxRetries: 3,
serverResponseStatus: 429,
inputEntries: []entry{logEntries[0]},
expectedReqs: []receivedReq{
{
tenantID: "",
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},
},
},
expectedMetrics: `
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__"} 1.0
# HELP promtail_sent_entries_total Number of log entries sent to the ingester.
# TYPE promtail_sent_entries_total counter
promtail_sent_entries_total{host="__HOST__"} 0
`,
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
// Reset metrics
sentEntries.Reset()
droppedEntries.Reset()

// Create a buffer channel where we do enqueue received requests
receivedReqsChan := make(chan receivedReq, 10)

// Start a local HTTP server
server := httptest.NewServer(createServerHandler(receivedReqsChan, c.serverResponseStatus))
require.NotNil(t, server)
defer server.Close()

// Get the URL at which the local test server is listening to
serverURL := flagext.URLValue{}
err := serverURL.Set(server.URL)
require.NoError(t, err)

// Instance the client
cfg := Config{
URL: serverURL,
BatchWait: c.clientBatchWait,
BatchSize: c.clientBatchSize,
Client: config.HTTPClientConfig{},
BackoffConfig: util.BackoffConfig{MinBackoff: 5 * time.Second, MaxBackoff: 10 * time.Second, MaxRetries: c.clientMaxRetries},
ExternalLabels: lokiflag.LabelSet{},
Timeout: 1 * time.Second,
TenantID: c.clientTenantID,
}

cl, err := New(cfg, log.NewNopLogger())
require.NoError(t, err)

// Send all the input log entries
for i, logEntry := range c.inputEntries {
err = cl.Handle(logEntry.labels, logEntry.Timestamp, logEntry.Line)
require.NoError(t, err)

if c.inputDelay > 0 && i < len(c.inputEntries)-1 {
time.Sleep(c.inputDelay)
}
}

// Wait until the expected push requests are received (with a timeout)
deadline := time.Now().Add(1 * time.Second)
for len(receivedReqsChan) < len(c.expectedReqs) && time.Now().Before(deadline) {
time.Sleep(5 * time.Millisecond)
}

// StopNow should have cancelled client's ctx
cc := cl.(*client)
require.NoError(t, cc.ctx.Err())

// Stop the client: it waits until the current batch is sent
cl.StopNow()
close(receivedReqsChan)

require.Error(t, cc.ctx.Err()) // non-nil error if its cancelled.

// Get all push requests received on the server side
receivedReqs := make([]receivedReq, 0)
for req := range receivedReqsChan {
receivedReqs = append(receivedReqs, req)
}

// Due to implementation details (maps iteration ordering is random) we just check
// that the expected requests are equal to the received requests, without checking
// the exact order which is not guaranteed in case of multi-tenant
require.ElementsMatch(t, c.expectedReqs, receivedReqs)

expectedMetrics := strings.Replace(c.expectedMetrics, "__HOST__", serverURL.Host, -1)
err = testutil.GatherAndCompare(prometheus.DefaultGatherer, strings.NewReader(expectedMetrics), "promtail_sent_entries_total", "promtail_dropped_entries_total")
assert.NoError(t, err)
})
}
}

func createServerHandler(receivedReqsChan chan receivedReq, status int) http.HandlerFunc {
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
// Parse the request
Expand Down
5 changes: 5 additions & 0 deletions pkg/promtail/client/fake/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ func (c *Client) Stop() {
c.OnStop()
}

// StopNow implements client.Client
func (c *Client) StopNow() {
c.OnStop()
}

// Handle implements client.Client
func (c *Client) Handle(labels model.LabelSet, time time.Time, entry string) error {
return c.OnHandleEntry.Handle(labels, time, entry)
Expand Down
2 changes: 2 additions & 0 deletions pkg/promtail/client/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func NewLogger(log log.Logger, externalLabels lokiflag.LabelSet, cfgs ...Config)

func (*logger) Stop() {}

func (*logger) StopNow() {}

func (l *logger) Handle(labels model.LabelSet, time time.Time, entry string) error {
l.Lock()
defer l.Unlock()
Expand Down
7 changes: 7 additions & 0 deletions pkg/promtail/client/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,10 @@ func (m MultiClient) Stop() {
c.Stop()
}
}

// StopNow implements Client
func (m MultiClient) StopNow() {
for _, c := range m {
c.StopNow()
}
}