Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure trace propagation in our logs. #1977

Merged
merged 6 commits into from
Apr 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/helpers/logerror.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package helpers

import (
"context"

"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
)
Expand All @@ -11,3 +13,10 @@ func LogError(message string, f func() error) {
level.Error(util.Logger).Log("message", message, "error", err)
}
}

// LogError logs any error returned by f; useful when defering Close etc.
func LogErrorWithContext(ctx context.Context, message string, f func() error) {
if err := f(); err != nil {
level.Error(util.WithContext(ctx, util.Logger)).Log("message", message, "error", err)
}
}
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie

heapItr := iter.NewHeapIterator(ctx, itrs, req.Direction)

defer helpers.LogError("closing iterator", heapItr.Close)
defer helpers.LogErrorWithContext(ctx, "closing iterator", heapItr.Close)

return sendBatches(queryServer.Context(), heapItr, queryServer, req.Limit)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (s *stream) consumeChunk(_ context.Context, chunk *logproto.Chunk) error {
return nil
}

func (s *stream) Push(_ context.Context, entries []logproto.Entry, synchronizePeriod time.Duration, minUtilization float64) error {
func (s *stream) Push(ctx context.Context, entries []logproto.Entry, synchronizePeriod time.Duration, minUtilization float64) error {
var lastChunkTimestamp time.Time
if len(s.chunks) == 0 {
s.chunks = append(s.chunks, chunkDesc{
Expand Down Expand Up @@ -145,7 +145,7 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry, synchronizePe
if err != nil {
// This should be an unlikely situation, returning an error up the stack doesn't help much here
// so instead log this to help debug the issue if it ever arises.
level.Error(util.Logger).Log("msg", "failed to Close chunk", "err", err)
level.Error(util.WithContext(ctx, util.Logger)).Log("msg", "failed to Close chunk", "err", err)
}
chunk.closed = true

Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (t *tailer) loop() {
if err != nil {
// Don't log any error due to tail client closing the connection
if !util.IsConnCanceled(err) {
level.Error(cortex_util.Logger).Log("msg", "Error writing to tail client", "err", err)
level.Error(cortex_util.WithContext(t.conn.Context(), cortex_util.Logger)).Log("msg", "Error writing to tail client", "err", err)
}
t.close()
return
Expand Down
26 changes: 14 additions & 12 deletions pkg/ingester/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ var (
// TransferChunks receives all chunks from another ingester. The Ingester
// must be in PENDING state or else the call will fail.
func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer) error {
logger := util.WithContext(stream.Context(), util.Logger)
// Prevent a shutdown from happening until we've completely finished a handoff
// from a leaving ingester.
i.shutdownMtx.Lock()
Expand All @@ -54,12 +55,12 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer)
return
}

level.Error(util.Logger).Log("msg", "TransferChunks failed, not in ACTIVE state.", "state", state)
level.Error(logger).Log("msg", "TransferChunks failed, not in ACTIVE state.", "state", state)

// Enter PENDING state (only valid from JOINING)
if i.lifecycler.GetState() == ring.JOINING {
if err := i.lifecycler.ChangeState(stream.Context(), ring.PENDING); err != nil {
level.Error(util.Logger).Log("msg", "error rolling back failed TransferChunks", "err", err)
level.Error(logger).Log("msg", "error rolling back failed TransferChunks", "err", err)
os.Exit(1)
}
}
Expand All @@ -81,7 +82,7 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer)
// this loop.
if fromIngesterID == "" {
fromIngesterID = chunkSet.FromIngesterId
level.Info(util.Logger).Log("msg", "processing TransferChunks request", "from_ingester", fromIngesterID)
level.Info(logger).Log("msg", "processing TransferChunks request", "from_ingester", fromIngesterID)

// Before transfer, make sure 'from' ingester is in correct state to call ClaimTokensFor later
err := i.checkFromIngesterIsInLeavingState(stream.Context(), fromIngesterID)
Expand Down Expand Up @@ -109,10 +110,10 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer)
}

if seriesReceived == 0 {
level.Error(util.Logger).Log("msg", "received TransferChunks request with no series", "from_ingester", fromIngesterID)
level.Error(logger).Log("msg", "received TransferChunks request with no series", "from_ingester", fromIngesterID)
return fmt.Errorf("no series")
} else if fromIngesterID == "" {
level.Error(util.Logger).Log("msg", "received TransferChunks request with no ID from ingester")
level.Error(logger).Log("msg", "received TransferChunks request with no ID from ingester")
return fmt.Errorf("no ingester id")
}

Expand All @@ -127,10 +128,10 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer)
// Close the stream last, as this is what tells the "from" ingester that
// it's OK to shut down.
if err := stream.SendAndClose(&logproto.TransferChunksResponse{}); err != nil {
level.Error(util.Logger).Log("msg", "Error closing TransferChunks stream", "from_ingester", fromIngesterID, "err", err)
level.Error(logger).Log("msg", "Error closing TransferChunks stream", "from_ingester", fromIngesterID, "err", err)
return err
}
level.Info(util.Logger).Log("msg", "Successfully transferred chunks", "from_ingester", fromIngesterID, "series_received", seriesReceived)
level.Info(logger).Log("msg", "Successfully transferred chunks", "from_ingester", fromIngesterID, "series_received", seriesReceived)
return nil
}

Expand Down Expand Up @@ -188,26 +189,27 @@ func (i *Ingester) TransferOut(ctx context.Context) error {
return nil
}

level.Error(util.Logger).Log("msg", "transfer failed", "err", err)
level.Error(util.WithContext(ctx, util.Logger)).Log("msg", "transfer failed", "err", err)
backoff.Wait()
}

return backoff.Err()
}

func (i *Ingester) transferOut(ctx context.Context) error {
logger := util.WithContext(ctx, util.Logger)
targetIngester, err := i.findTransferTarget(ctx)
if err != nil {
return fmt.Errorf("cannot find ingester to transfer chunks to: %v", err)
}

level.Info(util.Logger).Log("msg", "sending chunks", "to_ingester", targetIngester.Addr)
level.Info(logger).Log("msg", "sending chunks", "to_ingester", targetIngester.Addr)
c, err := i.cfg.ingesterClientFactory(i.clientConfig, targetIngester.Addr)
if err != nil {
return err
}
if c, ok := c.(io.Closer); ok {
defer helpers.LogError("closing client", c.Close)
defer helpers.LogErrorWithContext(ctx, "closing client", c.Close)
}
ic := c.(logproto.IngesterClient)

Expand Down Expand Up @@ -244,7 +246,7 @@ func (i *Ingester) transferOut(ctx context.Context) error {
FromIngesterId: i.lifecycler.ID,
})
if err != nil {
level.Error(util.Logger).Log("msg", "failed sending stream's chunks to ingester", "to_ingester", targetIngester.Addr, "err", err)
level.Error(logger).Log("msg", "failed sending stream's chunks to ingester", "to_ingester", targetIngester.Addr, "err", err)
return err
}

Expand All @@ -262,7 +264,7 @@ func (i *Ingester) transferOut(ctx context.Context) error {
}
i.flushQueuesDone.Wait()

level.Info(util.Logger).Log("msg", "successfully sent chunks", "to_ingester", targetIngester.Addr)
level.Info(logger).Log("msg", "successfully sent chunks", "to_ingester", targetIngester.Addr)
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (ng *engine) exec(ctx context.Context, q *query) (promql.Value, error) {
if err != nil {
return nil, err
}
defer helpers.LogError("closing iterator", iter.Close)
defer helpers.LogErrorWithContext(ctx, "closing iterator", iter.Close)
streams, err := readStreams(iter, q.limit, q.direction, q.interval)
return streams, err
}
Expand All @@ -219,7 +219,7 @@ func (ng *engine) evalSample(ctx context.Context, expr SampleExpr, q *query) (pr
if err != nil {
return nil, err
}
defer helpers.LogError("closing SampleExpr", stepEvaluator.Close)
defer helpers.LogErrorWithContext(ctx, "closing SampleExpr", stepEvaluator.Close)

seriesIndex := map[uint64]*promql.Series{}

Expand Down
8 changes: 3 additions & 5 deletions pkg/logql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ var (
)

func RecordMetrics(ctx context.Context, p Params, status string, stats stats.Result) {
logger := util.WithContext(ctx, util.Logger)
queryType, err := QueryType(p.Query())
if err != nil {
level.Warn(util.Logger).Log("msg", "error parsing query type", "err", err)
level.Warn(logger).Log("msg", "error parsing query type", "err", err)
}
rt := string(GetRangeType(p))

Expand All @@ -77,10 +78,7 @@ func RecordMetrics(ctx context.Context, p Params, status string, stats stats.Res
}

// we also log queries, useful for troubleshooting slow queries.
level.Info(
// ensure we have traceID & orgId
util.WithContext(ctx, util.Logger),
).Log(
level.Info(logger).Log(
"latency", latencyType, // this can be used to filter log lines.
"query", p.Query(),
"query_type", queryType,
Expand Down
13 changes: 7 additions & 6 deletions pkg/logql/stats/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func CollectTrailer(ctx context.Context) grpc.CallOption {
func SendAsTrailer(ctx context.Context, stream grpc.ServerStream) {
trailer, err := encodeTrailer(ctx)
if err != nil {
level.Warn(util.Logger).Log("msg", "failed to encode trailer", "err", err)
level.Warn(util.WithContext(ctx, util.Logger)).Log("msg", "failed to encode trailer", "err", err)
return
}
stream.SetTrailer(trailer)
Expand Down Expand Up @@ -92,7 +92,7 @@ func decodeTrailers(ctx context.Context) Result {
}
res.Ingester.TotalReached = int32(len(collector.trailers))
for _, meta := range collector.trailers {
ing := decodeTrailer(meta)
ing := decodeTrailer(ctx, meta)
res.Ingester.TotalChunksMatched += ing.Ingester.TotalChunksMatched
res.Ingester.TotalBatches += ing.Ingester.TotalBatches
res.Ingester.TotalLinesSent += ing.Ingester.TotalLinesSent
Expand All @@ -109,26 +109,27 @@ func decodeTrailers(ctx context.Context) Result {
return res
}

func decodeTrailer(meta *metadata.MD) Result {
func decodeTrailer(ctx context.Context, meta *metadata.MD) Result {
logger := util.WithContext(ctx, util.Logger)
var ingData IngesterData
values := meta.Get(ingesterDataKey)
if len(values) == 1 {
if err := jsoniter.UnmarshalFromString(values[0], &ingData); err != nil {
level.Warn(util.Logger).Log("msg", "could not unmarshal ingester data", "err", err)
level.Warn(logger).Log("msg", "could not unmarshal ingester data", "err", err)
}
}
var chunkData ChunkData
values = meta.Get(chunkDataKey)
if len(values) == 1 {
if err := jsoniter.UnmarshalFromString(values[0], &chunkData); err != nil {
level.Warn(util.Logger).Log("msg", "could not unmarshal chunk data", "err", err)
level.Warn(logger).Log("msg", "could not unmarshal chunk data", "err", err)
}
}
var storeData StoreData
values = meta.Get(storeDataKey)
if len(values) == 1 {
if err := jsoniter.UnmarshalFromString(values[0], &storeData); err != nil {
level.Warn(util.Logger).Log("msg", "could not unmarshal chunk data", "err", err)
level.Warn(logger).Log("msg", "could not unmarshal chunk data", "err", err)
}
}
return Result{
Expand Down
25 changes: 13 additions & 12 deletions pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) {
upgrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
logger := util.WithContext(r.Context(), util.Logger)

req, err := loghttp.ParseTailQuery(r)
if err != nil {
Expand All @@ -185,26 +186,26 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) {

conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
level.Error(util.Logger).Log("msg", "Error in upgrading websocket", "err", err)
level.Error(logger).Log("msg", "Error in upgrading websocket", "err", err)
return
}

defer func() {
if err := conn.Close(); err != nil {
level.Error(util.Logger).Log("msg", "Error closing websocket", "err", err)
level.Error(logger).Log("msg", "Error closing websocket", "err", err)
}
}()

tailer, err := q.Tail(r.Context(), req)
if err != nil {
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(util.Logger).Log("msg", "Error connecting to ingesters for tailing", "err", err)
level.Error(logger).Log("msg", "Error connecting to ingesters for tailing", "err", err)
}
return
}
defer func() {
if err := tailer.close(); err != nil {
level.Error(util.Logger).Log("msg", "Error closing Tailer", "err", err)
level.Error(logger).Log("msg", "Error closing Tailer", "err", err)
}
}()

Expand All @@ -224,12 +225,12 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) {
if closeErr.Code == websocket.CloseNormalClosure {
break
}
level.Error(util.Logger).Log("msg", "Error from client", "err", err)
level.Error(logger).Log("msg", "Error from client", "err", err)
break
} else if tailer.stopped {
return
} else {
level.Error(util.Logger).Log("msg", "Unexpected error from client", "err", err)
level.Error(logger).Log("msg", "Unexpected error from client", "err", err)
break
}
}
Expand All @@ -247,25 +248,25 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) {
err = marshal_legacy.WriteTailResponseJSON(*response, conn)
}
if err != nil {
level.Error(util.Logger).Log("msg", "Error writing to websocket", "err", err)
level.Error(logger).Log("msg", "Error writing to websocket", "err", err)
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(util.Logger).Log("msg", "Error writing close message to websocket", "err", err)
level.Error(logger).Log("msg", "Error writing close message to websocket", "err", err)
}
return
}

case err := <-closeErrChan:
level.Error(util.Logger).Log("msg", "Error from iterator", "err", err)
level.Error(logger).Log("msg", "Error from iterator", "err", err)
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(util.Logger).Log("msg", "Error writing close message to websocket", "err", err)
level.Error(logger).Log("msg", "Error writing close message to websocket", "err", err)
}
return
case <-ticker.C:
// This is to periodically check whether connection is active, useful to clean up dead connections when there are no entries to send
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
level.Error(util.Logger).Log("msg", "Error writing ping message to websocket", "err", err)
level.Error(logger).Log("msg", "Error writing ping message to websocket", "err", err)
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(util.Logger).Log("msg", "Error writing close message to websocket", "err", err)
level.Error(logger).Log("msg", "Error writing close message to websocket", "err", err)
}
return
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/querier/querier_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ func (c *tailClientMock) CloseSend() error {
return nil
}

func (c *tailClientMock) Context() context.Context {
return context.Background()
}

func (c *tailClientMock) SendMsg(m interface{}) error {
return nil
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/querier/queryrange/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/go-kit/kit/log/level"
"github.com/weaveworks/common/middleware"
Expand Down Expand Up @@ -93,7 +92,7 @@ func StatsCollectorMiddleware() queryrange.Middleware {
case *LokiPromResponse:
statistics = &r.Statistics
default:
level.Warn(util.Logger).Log("msg", fmt.Sprintf("cannot compute stats, unexpected type: %T", resp))
level.Warn(logger).Log("msg", fmt.Sprintf("cannot compute stats, unexpected type: %T", resp))
}
}
if statistics != nil {
Expand Down
6 changes: 4 additions & 2 deletions pkg/querier/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,18 +197,20 @@ func (t *Tailer) readTailClient(addr string, querierTailClient logproto.Querier_
var resp *logproto.TailResponse
var err error
defer t.dropTailClient(addr)

logger := util.WithContext(querierTailClient.Context(), util.Logger)
for {
if t.stopped {
if err := querierTailClient.CloseSend(); err != nil {
level.Error(util.Logger).Log("msg", "Error closing grpc tail client", "err", err)
level.Error(logger).Log("msg", "Error closing grpc tail client", "err", err)
}
break
}
resp, err = querierTailClient.Recv()
if err != nil {
// We don't want to log error when its due to stopping the tail request
if !t.stopped {
level.Error(util.Logger).Log("msg", "Error receiving response from grpc tail client", "err", err)
level.Error(logger).Log("msg", "Error receiving response from grpc tail client", "err", err)
}
break
}
Expand Down
Loading