Skip to content

Commit

Permalink
Ensure trace propagation in our logs. (#1977)
Browse files Browse the repository at this point in the history
* Ensure trace propagation in our logs.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add missing import.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes a test.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fix function usage.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena authored Apr 27, 2020
1 parent c7a1a1f commit 577d8eb
Show file tree
Hide file tree
Showing 13 changed files with 62 additions and 46 deletions.
9 changes: 9 additions & 0 deletions pkg/helpers/logerror.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package helpers

import (
"context"

"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
)
Expand All @@ -11,3 +13,10 @@ func LogError(message string, f func() error) {
level.Error(util.Logger).Log("message", message, "error", err)
}
}

// LogError logs any error returned by f; useful when defering Close etc.
func LogErrorWithContext(ctx context.Context, message string, f func() error) {
if err := f(); err != nil {
level.Error(util.WithContext(ctx, util.Logger)).Log("message", message, "error", err)
}
}
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie

heapItr := iter.NewHeapIterator(ctx, itrs, req.Direction)

defer helpers.LogError("closing iterator", heapItr.Close)
defer helpers.LogErrorWithContext(ctx, "closing iterator", heapItr.Close)

return sendBatches(queryServer.Context(), heapItr, queryServer, req.Limit)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (s *stream) consumeChunk(_ context.Context, chunk *logproto.Chunk) error {
return nil
}

func (s *stream) Push(_ context.Context, entries []logproto.Entry, synchronizePeriod time.Duration, minUtilization float64) error {
func (s *stream) Push(ctx context.Context, entries []logproto.Entry, synchronizePeriod time.Duration, minUtilization float64) error {
var lastChunkTimestamp time.Time
if len(s.chunks) == 0 {
s.chunks = append(s.chunks, chunkDesc{
Expand Down Expand Up @@ -145,7 +145,7 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry, synchronizePe
if err != nil {
// This should be an unlikely situation, returning an error up the stack doesn't help much here
// so instead log this to help debug the issue if it ever arises.
level.Error(util.Logger).Log("msg", "failed to Close chunk", "err", err)
level.Error(util.WithContext(ctx, util.Logger)).Log("msg", "failed to Close chunk", "err", err)
}
chunk.closed = true

Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (t *tailer) loop() {
if err != nil {
// Don't log any error due to tail client closing the connection
if !util.IsConnCanceled(err) {
level.Error(cortex_util.Logger).Log("msg", "Error writing to tail client", "err", err)
level.Error(cortex_util.WithContext(t.conn.Context(), cortex_util.Logger)).Log("msg", "Error writing to tail client", "err", err)
}
t.close()
return
Expand Down
26 changes: 14 additions & 12 deletions pkg/ingester/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ var (
// TransferChunks receives all chunks from another ingester. The Ingester
// must be in PENDING state or else the call will fail.
func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer) error {
logger := util.WithContext(stream.Context(), util.Logger)
// Prevent a shutdown from happening until we've completely finished a handoff
// from a leaving ingester.
i.shutdownMtx.Lock()
Expand All @@ -54,12 +55,12 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer)
return
}

level.Error(util.Logger).Log("msg", "TransferChunks failed, not in ACTIVE state.", "state", state)
level.Error(logger).Log("msg", "TransferChunks failed, not in ACTIVE state.", "state", state)

// Enter PENDING state (only valid from JOINING)
if i.lifecycler.GetState() == ring.JOINING {
if err := i.lifecycler.ChangeState(stream.Context(), ring.PENDING); err != nil {
level.Error(util.Logger).Log("msg", "error rolling back failed TransferChunks", "err", err)
level.Error(logger).Log("msg", "error rolling back failed TransferChunks", "err", err)
os.Exit(1)
}
}
Expand All @@ -81,7 +82,7 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer)
// this loop.
if fromIngesterID == "" {
fromIngesterID = chunkSet.FromIngesterId
level.Info(util.Logger).Log("msg", "processing TransferChunks request", "from_ingester", fromIngesterID)
level.Info(logger).Log("msg", "processing TransferChunks request", "from_ingester", fromIngesterID)

// Before transfer, make sure 'from' ingester is in correct state to call ClaimTokensFor later
err := i.checkFromIngesterIsInLeavingState(stream.Context(), fromIngesterID)
Expand Down Expand Up @@ -109,10 +110,10 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer)
}

if seriesReceived == 0 {
level.Error(util.Logger).Log("msg", "received TransferChunks request with no series", "from_ingester", fromIngesterID)
level.Error(logger).Log("msg", "received TransferChunks request with no series", "from_ingester", fromIngesterID)
return fmt.Errorf("no series")
} else if fromIngesterID == "" {
level.Error(util.Logger).Log("msg", "received TransferChunks request with no ID from ingester")
level.Error(logger).Log("msg", "received TransferChunks request with no ID from ingester")
return fmt.Errorf("no ingester id")
}

Expand All @@ -127,10 +128,10 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer)
// Close the stream last, as this is what tells the "from" ingester that
// it's OK to shut down.
if err := stream.SendAndClose(&logproto.TransferChunksResponse{}); err != nil {
level.Error(util.Logger).Log("msg", "Error closing TransferChunks stream", "from_ingester", fromIngesterID, "err", err)
level.Error(logger).Log("msg", "Error closing TransferChunks stream", "from_ingester", fromIngesterID, "err", err)
return err
}
level.Info(util.Logger).Log("msg", "Successfully transferred chunks", "from_ingester", fromIngesterID, "series_received", seriesReceived)
level.Info(logger).Log("msg", "Successfully transferred chunks", "from_ingester", fromIngesterID, "series_received", seriesReceived)
return nil
}

Expand Down Expand Up @@ -188,26 +189,27 @@ func (i *Ingester) TransferOut(ctx context.Context) error {
return nil
}

level.Error(util.Logger).Log("msg", "transfer failed", "err", err)
level.Error(util.WithContext(ctx, util.Logger)).Log("msg", "transfer failed", "err", err)
backoff.Wait()
}

return backoff.Err()
}

func (i *Ingester) transferOut(ctx context.Context) error {
logger := util.WithContext(ctx, util.Logger)
targetIngester, err := i.findTransferTarget(ctx)
if err != nil {
return fmt.Errorf("cannot find ingester to transfer chunks to: %v", err)
}

level.Info(util.Logger).Log("msg", "sending chunks", "to_ingester", targetIngester.Addr)
level.Info(logger).Log("msg", "sending chunks", "to_ingester", targetIngester.Addr)
c, err := i.cfg.ingesterClientFactory(i.clientConfig, targetIngester.Addr)
if err != nil {
return err
}
if c, ok := c.(io.Closer); ok {
defer helpers.LogError("closing client", c.Close)
defer helpers.LogErrorWithContext(ctx, "closing client", c.Close)
}
ic := c.(logproto.IngesterClient)

Expand Down Expand Up @@ -244,7 +246,7 @@ func (i *Ingester) transferOut(ctx context.Context) error {
FromIngesterId: i.lifecycler.ID,
})
if err != nil {
level.Error(util.Logger).Log("msg", "failed sending stream's chunks to ingester", "to_ingester", targetIngester.Addr, "err", err)
level.Error(logger).Log("msg", "failed sending stream's chunks to ingester", "to_ingester", targetIngester.Addr, "err", err)
return err
}

Expand All @@ -262,7 +264,7 @@ func (i *Ingester) transferOut(ctx context.Context) error {
}
i.flushQueuesDone.Wait()

level.Info(util.Logger).Log("msg", "successfully sent chunks", "to_ingester", targetIngester.Addr)
level.Info(logger).Log("msg", "successfully sent chunks", "to_ingester", targetIngester.Addr)
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (ng *engine) exec(ctx context.Context, q *query) (promql.Value, error) {
if err != nil {
return nil, err
}
defer helpers.LogError("closing iterator", iter.Close)
defer helpers.LogErrorWithContext(ctx, "closing iterator", iter.Close)
streams, err := readStreams(iter, q.limit, q.direction, q.interval)
return streams, err
}
Expand All @@ -219,7 +219,7 @@ func (ng *engine) evalSample(ctx context.Context, expr SampleExpr, q *query) (pr
if err != nil {
return nil, err
}
defer helpers.LogError("closing SampleExpr", stepEvaluator.Close)
defer helpers.LogErrorWithContext(ctx, "closing SampleExpr", stepEvaluator.Close)

seriesIndex := map[uint64]*promql.Series{}

Expand Down
8 changes: 3 additions & 5 deletions pkg/logql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ var (
)

func RecordMetrics(ctx context.Context, p Params, status string, stats stats.Result) {
logger := util.WithContext(ctx, util.Logger)
queryType, err := QueryType(p.Query())
if err != nil {
level.Warn(util.Logger).Log("msg", "error parsing query type", "err", err)
level.Warn(logger).Log("msg", "error parsing query type", "err", err)
}
rt := string(GetRangeType(p))

Expand All @@ -77,10 +78,7 @@ func RecordMetrics(ctx context.Context, p Params, status string, stats stats.Res
}

// we also log queries, useful for troubleshooting slow queries.
level.Info(
// ensure we have traceID & orgId
util.WithContext(ctx, util.Logger),
).Log(
level.Info(logger).Log(
"latency", latencyType, // this can be used to filter log lines.
"query", p.Query(),
"query_type", queryType,
Expand Down
13 changes: 7 additions & 6 deletions pkg/logql/stats/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func CollectTrailer(ctx context.Context) grpc.CallOption {
func SendAsTrailer(ctx context.Context, stream grpc.ServerStream) {
trailer, err := encodeTrailer(ctx)
if err != nil {
level.Warn(util.Logger).Log("msg", "failed to encode trailer", "err", err)
level.Warn(util.WithContext(ctx, util.Logger)).Log("msg", "failed to encode trailer", "err", err)
return
}
stream.SetTrailer(trailer)
Expand Down Expand Up @@ -92,7 +92,7 @@ func decodeTrailers(ctx context.Context) Result {
}
res.Ingester.TotalReached = int32(len(collector.trailers))
for _, meta := range collector.trailers {
ing := decodeTrailer(meta)
ing := decodeTrailer(ctx, meta)
res.Ingester.TotalChunksMatched += ing.Ingester.TotalChunksMatched
res.Ingester.TotalBatches += ing.Ingester.TotalBatches
res.Ingester.TotalLinesSent += ing.Ingester.TotalLinesSent
Expand All @@ -109,26 +109,27 @@ func decodeTrailers(ctx context.Context) Result {
return res
}

func decodeTrailer(meta *metadata.MD) Result {
func decodeTrailer(ctx context.Context, meta *metadata.MD) Result {
logger := util.WithContext(ctx, util.Logger)
var ingData IngesterData
values := meta.Get(ingesterDataKey)
if len(values) == 1 {
if err := jsoniter.UnmarshalFromString(values[0], &ingData); err != nil {
level.Warn(util.Logger).Log("msg", "could not unmarshal ingester data", "err", err)
level.Warn(logger).Log("msg", "could not unmarshal ingester data", "err", err)
}
}
var chunkData ChunkData
values = meta.Get(chunkDataKey)
if len(values) == 1 {
if err := jsoniter.UnmarshalFromString(values[0], &chunkData); err != nil {
level.Warn(util.Logger).Log("msg", "could not unmarshal chunk data", "err", err)
level.Warn(logger).Log("msg", "could not unmarshal chunk data", "err", err)
}
}
var storeData StoreData
values = meta.Get(storeDataKey)
if len(values) == 1 {
if err := jsoniter.UnmarshalFromString(values[0], &storeData); err != nil {
level.Warn(util.Logger).Log("msg", "could not unmarshal chunk data", "err", err)
level.Warn(logger).Log("msg", "could not unmarshal chunk data", "err", err)
}
}
return Result{
Expand Down
25 changes: 13 additions & 12 deletions pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) {
upgrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
logger := util.WithContext(r.Context(), util.Logger)

req, err := loghttp.ParseTailQuery(r)
if err != nil {
Expand All @@ -185,26 +186,26 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) {

conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
level.Error(util.Logger).Log("msg", "Error in upgrading websocket", "err", err)
level.Error(logger).Log("msg", "Error in upgrading websocket", "err", err)
return
}

defer func() {
if err := conn.Close(); err != nil {
level.Error(util.Logger).Log("msg", "Error closing websocket", "err", err)
level.Error(logger).Log("msg", "Error closing websocket", "err", err)
}
}()

tailer, err := q.Tail(r.Context(), req)
if err != nil {
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(util.Logger).Log("msg", "Error connecting to ingesters for tailing", "err", err)
level.Error(logger).Log("msg", "Error connecting to ingesters for tailing", "err", err)
}
return
}
defer func() {
if err := tailer.close(); err != nil {
level.Error(util.Logger).Log("msg", "Error closing Tailer", "err", err)
level.Error(logger).Log("msg", "Error closing Tailer", "err", err)
}
}()

Expand All @@ -224,12 +225,12 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) {
if closeErr.Code == websocket.CloseNormalClosure {
break
}
level.Error(util.Logger).Log("msg", "Error from client", "err", err)
level.Error(logger).Log("msg", "Error from client", "err", err)
break
} else if tailer.stopped {
return
} else {
level.Error(util.Logger).Log("msg", "Unexpected error from client", "err", err)
level.Error(logger).Log("msg", "Unexpected error from client", "err", err)
break
}
}
Expand All @@ -247,25 +248,25 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) {
err = marshal_legacy.WriteTailResponseJSON(*response, conn)
}
if err != nil {
level.Error(util.Logger).Log("msg", "Error writing to websocket", "err", err)
level.Error(logger).Log("msg", "Error writing to websocket", "err", err)
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(util.Logger).Log("msg", "Error writing close message to websocket", "err", err)
level.Error(logger).Log("msg", "Error writing close message to websocket", "err", err)
}
return
}

case err := <-closeErrChan:
level.Error(util.Logger).Log("msg", "Error from iterator", "err", err)
level.Error(logger).Log("msg", "Error from iterator", "err", err)
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(util.Logger).Log("msg", "Error writing close message to websocket", "err", err)
level.Error(logger).Log("msg", "Error writing close message to websocket", "err", err)
}
return
case <-ticker.C:
// This is to periodically check whether connection is active, useful to clean up dead connections when there are no entries to send
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
level.Error(util.Logger).Log("msg", "Error writing ping message to websocket", "err", err)
level.Error(logger).Log("msg", "Error writing ping message to websocket", "err", err)
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(util.Logger).Log("msg", "Error writing close message to websocket", "err", err)
level.Error(logger).Log("msg", "Error writing close message to websocket", "err", err)
}
return
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/querier/querier_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ func (c *tailClientMock) CloseSend() error {
return nil
}

func (c *tailClientMock) Context() context.Context {
return context.Background()
}

func (c *tailClientMock) SendMsg(m interface{}) error {
return nil
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/querier/queryrange/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/go-kit/kit/log/level"
"github.com/weaveworks/common/middleware"
Expand Down Expand Up @@ -93,7 +92,7 @@ func StatsCollectorMiddleware() queryrange.Middleware {
case *LokiPromResponse:
statistics = &r.Statistics
default:
level.Warn(util.Logger).Log("msg", fmt.Sprintf("cannot compute stats, unexpected type: %T", resp))
level.Warn(logger).Log("msg", fmt.Sprintf("cannot compute stats, unexpected type: %T", resp))
}
}
if statistics != nil {
Expand Down
6 changes: 4 additions & 2 deletions pkg/querier/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,18 +197,20 @@ func (t *Tailer) readTailClient(addr string, querierTailClient logproto.Querier_
var resp *logproto.TailResponse
var err error
defer t.dropTailClient(addr)

logger := util.WithContext(querierTailClient.Context(), util.Logger)
for {
if t.stopped {
if err := querierTailClient.CloseSend(); err != nil {
level.Error(util.Logger).Log("msg", "Error closing grpc tail client", "err", err)
level.Error(logger).Log("msg", "Error closing grpc tail client", "err", err)
}
break
}
resp, err = querierTailClient.Recv()
if err != nil {
// We don't want to log error when its due to stopping the tail request
if !t.stopped {
level.Error(util.Logger).Log("msg", "Error receiving response from grpc tail client", "err", err)
level.Error(logger).Log("msg", "Error receiving response from grpc tail client", "err", err)
}
break
}
Expand Down
Loading

0 comments on commit 577d8eb

Please sign in to comment.