diff --git a/pkg/helpers/logerror.go b/pkg/helpers/logerror.go index 3a2fc25b2aa7..3083f97d39a7 100644 --- a/pkg/helpers/logerror.go +++ b/pkg/helpers/logerror.go @@ -1,6 +1,8 @@ package helpers import ( + "context" + "github.com/cortexproject/cortex/pkg/util" "github.com/go-kit/kit/log/level" ) @@ -11,3 +13,10 @@ func LogError(message string, f func() error) { level.Error(util.Logger).Log("message", message, "error", err) } } + +// LogError logs any error returned by f; useful when defering Close etc. +func LogErrorWithContext(ctx context.Context, message string, f func() error) { + if err := f(); err != nil { + level.Error(util.WithContext(ctx, util.Logger)).Log("message", message, "error", err) + } +} diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 78bfe3c148f5..41e5fa8cdf02 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -298,7 +298,7 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie heapItr := iter.NewHeapIterator(ctx, itrs, req.Direction) - defer helpers.LogError("closing iterator", heapItr.Close) + defer helpers.LogErrorWithContext(ctx, "closing iterator", heapItr.Close) return sendBatches(queryServer.Context(), heapItr, queryServer, req.Limit) } diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index a5fd390acec3..2e8de1b05bb5 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -109,7 +109,7 @@ func (s *stream) consumeChunk(_ context.Context, chunk *logproto.Chunk) error { return nil } -func (s *stream) Push(_ context.Context, entries []logproto.Entry, synchronizePeriod time.Duration, minUtilization float64) error { +func (s *stream) Push(ctx context.Context, entries []logproto.Entry, synchronizePeriod time.Duration, minUtilization float64) error { var lastChunkTimestamp time.Time if len(s.chunks) == 0 { s.chunks = append(s.chunks, chunkDesc{ @@ -145,7 +145,7 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry, synchronizePe if err != nil { // This should be an unlikely situation, returning an error up the stack doesn't help much here // so instead log this to help debug the issue if it ever arises. - level.Error(util.Logger).Log("msg", "failed to Close chunk", "err", err) + level.Error(util.WithContext(ctx, util.Logger)).Log("msg", "failed to Close chunk", "err", err) } chunk.closed = true diff --git a/pkg/ingester/tailer.go b/pkg/ingester/tailer.go index 6c2313a5bdc8..647ce0822aab 100644 --- a/pkg/ingester/tailer.go +++ b/pkg/ingester/tailer.go @@ -94,7 +94,7 @@ func (t *tailer) loop() { if err != nil { // Don't log any error due to tail client closing the connection if !util.IsConnCanceled(err) { - level.Error(cortex_util.Logger).Log("msg", "Error writing to tail client", "err", err) + level.Error(cortex_util.WithContext(t.conn.Context(), cortex_util.Logger)).Log("msg", "Error writing to tail client", "err", err) } t.close() return diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index 9cc91ec20d61..9aa3e384a68a 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -36,6 +36,7 @@ var ( // TransferChunks receives all chunks from another ingester. The Ingester // must be in PENDING state or else the call will fail. func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer) error { + logger := util.WithContext(stream.Context(), util.Logger) // Prevent a shutdown from happening until we've completely finished a handoff // from a leaving ingester. i.shutdownMtx.Lock() @@ -54,12 +55,12 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer) return } - level.Error(util.Logger).Log("msg", "TransferChunks failed, not in ACTIVE state.", "state", state) + level.Error(logger).Log("msg", "TransferChunks failed, not in ACTIVE state.", "state", state) // Enter PENDING state (only valid from JOINING) if i.lifecycler.GetState() == ring.JOINING { if err := i.lifecycler.ChangeState(stream.Context(), ring.PENDING); err != nil { - level.Error(util.Logger).Log("msg", "error rolling back failed TransferChunks", "err", err) + level.Error(logger).Log("msg", "error rolling back failed TransferChunks", "err", err) os.Exit(1) } } @@ -81,7 +82,7 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer) // this loop. if fromIngesterID == "" { fromIngesterID = chunkSet.FromIngesterId - level.Info(util.Logger).Log("msg", "processing TransferChunks request", "from_ingester", fromIngesterID) + level.Info(logger).Log("msg", "processing TransferChunks request", "from_ingester", fromIngesterID) // Before transfer, make sure 'from' ingester is in correct state to call ClaimTokensFor later err := i.checkFromIngesterIsInLeavingState(stream.Context(), fromIngesterID) @@ -109,10 +110,10 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer) } if seriesReceived == 0 { - level.Error(util.Logger).Log("msg", "received TransferChunks request with no series", "from_ingester", fromIngesterID) + level.Error(logger).Log("msg", "received TransferChunks request with no series", "from_ingester", fromIngesterID) return fmt.Errorf("no series") } else if fromIngesterID == "" { - level.Error(util.Logger).Log("msg", "received TransferChunks request with no ID from ingester") + level.Error(logger).Log("msg", "received TransferChunks request with no ID from ingester") return fmt.Errorf("no ingester id") } @@ -127,10 +128,10 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer) // Close the stream last, as this is what tells the "from" ingester that // it's OK to shut down. if err := stream.SendAndClose(&logproto.TransferChunksResponse{}); err != nil { - level.Error(util.Logger).Log("msg", "Error closing TransferChunks stream", "from_ingester", fromIngesterID, "err", err) + level.Error(logger).Log("msg", "Error closing TransferChunks stream", "from_ingester", fromIngesterID, "err", err) return err } - level.Info(util.Logger).Log("msg", "Successfully transferred chunks", "from_ingester", fromIngesterID, "series_received", seriesReceived) + level.Info(logger).Log("msg", "Successfully transferred chunks", "from_ingester", fromIngesterID, "series_received", seriesReceived) return nil } @@ -188,7 +189,7 @@ func (i *Ingester) TransferOut(ctx context.Context) error { return nil } - level.Error(util.Logger).Log("msg", "transfer failed", "err", err) + level.Error(util.WithContext(ctx, util.Logger)).Log("msg", "transfer failed", "err", err) backoff.Wait() } @@ -196,18 +197,19 @@ func (i *Ingester) TransferOut(ctx context.Context) error { } func (i *Ingester) transferOut(ctx context.Context) error { + logger := util.WithContext(ctx, util.Logger) targetIngester, err := i.findTransferTarget(ctx) if err != nil { return fmt.Errorf("cannot find ingester to transfer chunks to: %v", err) } - level.Info(util.Logger).Log("msg", "sending chunks", "to_ingester", targetIngester.Addr) + level.Info(logger).Log("msg", "sending chunks", "to_ingester", targetIngester.Addr) c, err := i.cfg.ingesterClientFactory(i.clientConfig, targetIngester.Addr) if err != nil { return err } if c, ok := c.(io.Closer); ok { - defer helpers.LogError("closing client", c.Close) + defer helpers.LogErrorWithContext(ctx, "closing client", c.Close) } ic := c.(logproto.IngesterClient) @@ -244,7 +246,7 @@ func (i *Ingester) transferOut(ctx context.Context) error { FromIngesterId: i.lifecycler.ID, }) if err != nil { - level.Error(util.Logger).Log("msg", "failed sending stream's chunks to ingester", "to_ingester", targetIngester.Addr, "err", err) + level.Error(logger).Log("msg", "failed sending stream's chunks to ingester", "to_ingester", targetIngester.Addr, "err", err) return err } @@ -262,7 +264,7 @@ func (i *Ingester) transferOut(ctx context.Context) error { } i.flushQueuesDone.Wait() - level.Info(util.Logger).Log("msg", "successfully sent chunks", "to_ingester", targetIngester.Addr) + level.Info(logger).Log("msg", "successfully sent chunks", "to_ingester", targetIngester.Addr) return nil } diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index 31404287deeb..5a09b2022f8c 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -201,7 +201,7 @@ func (ng *engine) exec(ctx context.Context, q *query) (promql.Value, error) { if err != nil { return nil, err } - defer helpers.LogError("closing iterator", iter.Close) + defer helpers.LogErrorWithContext(ctx, "closing iterator", iter.Close) streams, err := readStreams(iter, q.limit, q.direction, q.interval) return streams, err } @@ -219,7 +219,7 @@ func (ng *engine) evalSample(ctx context.Context, expr SampleExpr, q *query) (pr if err != nil { return nil, err } - defer helpers.LogError("closing SampleExpr", stepEvaluator.Close) + defer helpers.LogErrorWithContext(ctx, "closing SampleExpr", stepEvaluator.Close) seriesIndex := map[uint64]*promql.Series{} diff --git a/pkg/logql/metrics.go b/pkg/logql/metrics.go index 0dd392d2a1db..6eb2703e63e9 100644 --- a/pkg/logql/metrics.go +++ b/pkg/logql/metrics.go @@ -63,9 +63,10 @@ var ( ) func RecordMetrics(ctx context.Context, p Params, status string, stats stats.Result) { + logger := util.WithContext(ctx, util.Logger) queryType, err := QueryType(p.Query()) if err != nil { - level.Warn(util.Logger).Log("msg", "error parsing query type", "err", err) + level.Warn(logger).Log("msg", "error parsing query type", "err", err) } rt := string(GetRangeType(p)) @@ -77,10 +78,7 @@ func RecordMetrics(ctx context.Context, p Params, status string, stats stats.Res } // we also log queries, useful for troubleshooting slow queries. - level.Info( - // ensure we have traceID & orgId - util.WithContext(ctx, util.Logger), - ).Log( + level.Info(logger).Log( "latency", latencyType, // this can be used to filter log lines. "query", p.Query(), "query_type", queryType, diff --git a/pkg/logql/stats/grpc.go b/pkg/logql/stats/grpc.go index e1640fda5edc..4af89670b247 100644 --- a/pkg/logql/stats/grpc.go +++ b/pkg/logql/stats/grpc.go @@ -47,7 +47,7 @@ func CollectTrailer(ctx context.Context) grpc.CallOption { func SendAsTrailer(ctx context.Context, stream grpc.ServerStream) { trailer, err := encodeTrailer(ctx) if err != nil { - level.Warn(util.Logger).Log("msg", "failed to encode trailer", "err", err) + level.Warn(util.WithContext(ctx, util.Logger)).Log("msg", "failed to encode trailer", "err", err) return } stream.SetTrailer(trailer) @@ -92,7 +92,7 @@ func decodeTrailers(ctx context.Context) Result { } res.Ingester.TotalReached = int32(len(collector.trailers)) for _, meta := range collector.trailers { - ing := decodeTrailer(meta) + ing := decodeTrailer(ctx, meta) res.Ingester.TotalChunksMatched += ing.Ingester.TotalChunksMatched res.Ingester.TotalBatches += ing.Ingester.TotalBatches res.Ingester.TotalLinesSent += ing.Ingester.TotalLinesSent @@ -109,26 +109,27 @@ func decodeTrailers(ctx context.Context) Result { return res } -func decodeTrailer(meta *metadata.MD) Result { +func decodeTrailer(ctx context.Context, meta *metadata.MD) Result { + logger := util.WithContext(ctx, util.Logger) var ingData IngesterData values := meta.Get(ingesterDataKey) if len(values) == 1 { if err := jsoniter.UnmarshalFromString(values[0], &ingData); err != nil { - level.Warn(util.Logger).Log("msg", "could not unmarshal ingester data", "err", err) + level.Warn(logger).Log("msg", "could not unmarshal ingester data", "err", err) } } var chunkData ChunkData values = meta.Get(chunkDataKey) if len(values) == 1 { if err := jsoniter.UnmarshalFromString(values[0], &chunkData); err != nil { - level.Warn(util.Logger).Log("msg", "could not unmarshal chunk data", "err", err) + level.Warn(logger).Log("msg", "could not unmarshal chunk data", "err", err) } } var storeData StoreData values = meta.Get(storeDataKey) if len(values) == 1 { if err := jsoniter.UnmarshalFromString(values[0], &storeData); err != nil { - level.Warn(util.Logger).Log("msg", "could not unmarshal chunk data", "err", err) + level.Warn(logger).Log("msg", "could not unmarshal chunk data", "err", err) } } return Result{ diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 501f1cb66711..939619181c87 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -170,6 +170,7 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) { upgrader := websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } + logger := util.WithContext(r.Context(), util.Logger) req, err := loghttp.ParseTailQuery(r) if err != nil { @@ -185,26 +186,26 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) { conn, err := upgrader.Upgrade(w, r, nil) if err != nil { - level.Error(util.Logger).Log("msg", "Error in upgrading websocket", "err", err) + level.Error(logger).Log("msg", "Error in upgrading websocket", "err", err) return } defer func() { if err := conn.Close(); err != nil { - level.Error(util.Logger).Log("msg", "Error closing websocket", "err", err) + level.Error(logger).Log("msg", "Error closing websocket", "err", err) } }() tailer, err := q.Tail(r.Context(), req) if err != nil { if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil { - level.Error(util.Logger).Log("msg", "Error connecting to ingesters for tailing", "err", err) + level.Error(logger).Log("msg", "Error connecting to ingesters for tailing", "err", err) } return } defer func() { if err := tailer.close(); err != nil { - level.Error(util.Logger).Log("msg", "Error closing Tailer", "err", err) + level.Error(logger).Log("msg", "Error closing Tailer", "err", err) } }() @@ -224,12 +225,12 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) { if closeErr.Code == websocket.CloseNormalClosure { break } - level.Error(util.Logger).Log("msg", "Error from client", "err", err) + level.Error(logger).Log("msg", "Error from client", "err", err) break } else if tailer.stopped { return } else { - level.Error(util.Logger).Log("msg", "Unexpected error from client", "err", err) + level.Error(logger).Log("msg", "Unexpected error from client", "err", err) break } } @@ -247,25 +248,25 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) { err = marshal_legacy.WriteTailResponseJSON(*response, conn) } if err != nil { - level.Error(util.Logger).Log("msg", "Error writing to websocket", "err", err) + level.Error(logger).Log("msg", "Error writing to websocket", "err", err) if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil { - level.Error(util.Logger).Log("msg", "Error writing close message to websocket", "err", err) + level.Error(logger).Log("msg", "Error writing close message to websocket", "err", err) } return } case err := <-closeErrChan: - level.Error(util.Logger).Log("msg", "Error from iterator", "err", err) + level.Error(logger).Log("msg", "Error from iterator", "err", err) if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil { - level.Error(util.Logger).Log("msg", "Error writing close message to websocket", "err", err) + level.Error(logger).Log("msg", "Error writing close message to websocket", "err", err) } return case <-ticker.C: // This is to periodically check whether connection is active, useful to clean up dead connections when there are no entries to send if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { - level.Error(util.Logger).Log("msg", "Error writing ping message to websocket", "err", err) + level.Error(logger).Log("msg", "Error writing ping message to websocket", "err", err) if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil { - level.Error(util.Logger).Log("msg", "Error writing close message to websocket", "err", err) + level.Error(logger).Log("msg", "Error writing close message to websocket", "err", err) } return } diff --git a/pkg/querier/querier_mock_test.go b/pkg/querier/querier_mock_test.go index 730cfcc6798b..b7cccee3d9a2 100644 --- a/pkg/querier/querier_mock_test.go +++ b/pkg/querier/querier_mock_test.go @@ -171,6 +171,10 @@ func (c *tailClientMock) CloseSend() error { return nil } +func (c *tailClientMock) Context() context.Context { + return context.Background() +} + func (c *tailClientMock) SendMsg(m interface{}) error { return nil } diff --git a/pkg/querier/queryrange/stats.go b/pkg/querier/queryrange/stats.go index 5078d94dcb10..85538adf019b 100644 --- a/pkg/querier/queryrange/stats.go +++ b/pkg/querier/queryrange/stats.go @@ -10,7 +10,6 @@ import ( "time" "github.com/cortexproject/cortex/pkg/querier/queryrange" - "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/go-kit/kit/log/level" "github.com/weaveworks/common/middleware" @@ -93,7 +92,7 @@ func StatsCollectorMiddleware() queryrange.Middleware { case *LokiPromResponse: statistics = &r.Statistics default: - level.Warn(util.Logger).Log("msg", fmt.Sprintf("cannot compute stats, unexpected type: %T", resp)) + level.Warn(logger).Log("msg", fmt.Sprintf("cannot compute stats, unexpected type: %T", resp)) } } if statistics != nil { diff --git a/pkg/querier/tail.go b/pkg/querier/tail.go index 73f8e4576a58..7dd3d896b3cf 100644 --- a/pkg/querier/tail.go +++ b/pkg/querier/tail.go @@ -197,10 +197,12 @@ func (t *Tailer) readTailClient(addr string, querierTailClient logproto.Querier_ var resp *logproto.TailResponse var err error defer t.dropTailClient(addr) + + logger := util.WithContext(querierTailClient.Context(), util.Logger) for { if t.stopped { if err := querierTailClient.CloseSend(); err != nil { - level.Error(util.Logger).Log("msg", "Error closing grpc tail client", "err", err) + level.Error(logger).Log("msg", "Error closing grpc tail client", "err", err) } break } @@ -208,7 +210,7 @@ func (t *Tailer) readTailClient(addr string, querierTailClient logproto.Querier_ if err != nil { // We don't want to log error when its due to stopping the tail request if !t.stopped { - level.Error(util.Logger).Log("msg", "Error receiving response from grpc tail client", "err", err) + level.Error(logger).Log("msg", "Error receiving response from grpc tail client", "err", err) } break } diff --git a/pkg/storage/iterator.go b/pkg/storage/iterator.go index 1fb96949b951..a29d78c527d3 100644 --- a/pkg/storage/iterator.go +++ b/pkg/storage/iterator.go @@ -127,7 +127,7 @@ func newBatchChunkIterator(ctx context.Context, chunks []*chunkenc.LazyChunk, ba } err = next.Close() if err != nil { - level.Error(util.Logger).Log("msg", "Failed to close the pre-fetched iterator when pre-fetching was canceled", "err", err) + level.Error(util.WithContext(ctx, util.Logger)).Log("msg", "Failed to close the pre-fetched iterator when pre-fetching was canceled", "err", err) } return case res.next <- &struct {