From fb71eec18f4215c093df3b929decc00a360dd5ca Mon Sep 17 00:00:00 2001 From: Johnny Graettinger Date: Thu, 10 Feb 2022 22:56:37 -0500 Subject: [PATCH 1/2] client: add new metric instrumentation for reads & appends Give particular focus to fragment reads, surfacing the number of client-initiated fragment reads and related size metrics. --- broker/client/appender.go | 10 ++++++--- broker/client/doc.go | 36 ++++++++++++++++++++++++++++++++ broker/client/reader.go | 39 +++++++++++++++++++++++++++++------ broker/client/retry_reader.go | 1 + 4 files changed, 77 insertions(+), 9 deletions(-) diff --git a/broker/client/appender.go b/broker/client/appender.go index 4bf5530b..a9bc6b12 100644 --- a/broker/client/appender.go +++ b/broker/client/appender.go @@ -7,6 +7,7 @@ import ( "time" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" pb "go.gazette.dev/core/broker/protocol" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -29,9 +30,10 @@ type Appender struct { Request pb.AppendRequest // AppendRequest of the Append. Response pb.AppendResponse // AppendResponse sent by broker. - ctx context.Context - client pb.RoutedJournalClient // Client against which Read is dispatched. - stream pb.Journal_AppendClient // Server stream. + ctx context.Context + client pb.RoutedJournalClient // Client against which Read is dispatched. + counter prometheus.Counter // Counter of appended bytes. + stream pb.Journal_AppendClient // Server stream. } // NewAppender returns an initialized Appender of the given AppendRequest. @@ -40,6 +42,7 @@ func NewAppender(ctx context.Context, client pb.RoutedJournalClient, req pb.Appe Request: req, ctx: ctx, client: client, + counter: appendBytes.WithLabelValues(req.Journal.String()), } return a } @@ -66,6 +69,7 @@ func (a *Appender) Write(p []byte) (n int, err error) { if err != nil { err = mapGRPCCtxErr(a.ctx, err) } + a.counter.Add(float64(n)) return } diff --git a/broker/client/doc.go b/broker/client/doc.go index 6a9a568d..a8f80e3a 100644 --- a/broker/client/doc.go +++ b/broker/client/doc.go @@ -59,3 +59,39 @@ // PolledList, which is an important building-block for applications scaling to // multiple journals. package client + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + appendBytes = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "gazette_append_bytes_total", + Help: "Total number of journal bytes appended.", + }, []string{"journal"}) + readBytes = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "gazette_read_bytes_total", + Help: "Total number of journal bytes read.", + }, []string{"journal"}) + fragmentOpen = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "gazette_fragment_open_total", + Help: "Total number of journal fragments opened for reading.", + }, []string{"journal", "codec"}) + fragmentOpenBytes = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "gazette_fragment_open_bytes_total", + Help: "Aggregate uncompressed bytes of journal fragments opened for reading.", + }, []string{"journal", "codec"}) + fragmentOpenContentLength = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "gazette_fragment_open_content_length_total", + Help: "Aggregate content-length of journal fragments opened for reading.", + }, []string{"journal", "codec"}) + readFragmentBytes = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "gazette_read_fragment_bytes_total", + Help: "Total number of journal fragment bytes read, excluding discarded bytes.", + }, []string{"journal", "codec"}) + discardFragmentBytes = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "gazette_discard_fragment_bytes_total", + Help: "Total number of journal fragment bytes discarded while seeking to desired offset.", + }, []string{"journal", "codec"}) +) diff --git a/broker/client/reader.go b/broker/client/reader.go index 082e0999..fc1e5583 100644 --- a/broker/client/reader.go +++ b/broker/client/reader.go @@ -9,6 +9,7 @@ import ( "io/ioutil" "net/http" + "github.com/prometheus/client_golang/prometheus" "go.gazette.dev/core/broker/codecs" pb "go.gazette.dev/core/broker/protocol" "google.golang.org/grpc/codes" @@ -39,10 +40,11 @@ type Reader struct { Request pb.ReadRequest // ReadRequest of the Reader. Response pb.ReadResponse // Most recent ReadResponse from broker. - ctx context.Context - client pb.RoutedJournalClient // Client against which Read is dispatched. - stream pb.Journal_ReadClient // Server stream. - direct io.ReadCloser // Directly opened Fragment URL. + ctx context.Context + client pb.RoutedJournalClient // Client against which Read is dispatched. + counter prometheus.Counter // Counter of read bytes. + stream pb.Journal_ReadClient // Server stream. + direct io.ReadCloser // Directly opened Fragment URL. } // NewReader returns an initialized Reader of the given ReadRequest. @@ -51,6 +53,7 @@ func NewReader(ctx context.Context, client pb.RoutedJournalClient, req pb.ReadRe Request: req, ctx: ctx, client: client, + counter: readBytes.WithLabelValues(req.Journal.String()), } return r } @@ -76,6 +79,7 @@ func (r *Reader) Read(p []byte) (n int, err error) { _ = r.direct.Close() } r.Request.Offset += int64(n) + r.counter.Add(float64(n)) return } @@ -83,6 +87,7 @@ func (r *Reader) Read(p []byte) (n int, err error) { if l, d := len(r.Response.Content), int(r.Request.Offset-r.Response.Offset); l != 0 && l > d { n = copy(p, r.Response.Content[d:]) r.Request.Offset += int64(n) + r.counter.Add(float64(n)) return } @@ -255,6 +260,15 @@ func OpenFragmentURL(ctx context.Context, fragment pb.Fragment, offset int64, ur fragment.CompressionCodec = pb.CompressionCodec_GZIP // Decompress client-side. } + + // Record metrics related to opening the fragment. + var labels = fragmentLabels(fragment) + fragmentOpen.With(labels).Inc() + fragmentOpenBytes.With(labels).Add(float64(fragment.End - fragment.Begin)) + if resp.ContentLength != -1 { + fragmentOpenContentLength.With(labels).Add(float64(resp.ContentLength)) + } + return NewFragmentReader(resp.Body, fragment, offset) } @@ -272,6 +286,7 @@ func NewFragmentReader(rc io.ReadCloser, fragment pb.Fragment, offset int64) (*F raw: rc, Fragment: fragment, Offset: fragment.Begin, + counter: discardFragmentBytes.With(fragmentLabels(fragment)), } // Attempt to seek to |offset| within the fragment. @@ -280,6 +295,9 @@ func NewFragmentReader(rc io.ReadCloser, fragment pb.Fragment, offset int64) (*F _ = fr.Close() return nil, err } + // We've finished discarding required bytes. + fr.counter = readFragmentBytes.With(fragmentLabels(fragment)) + return fr, nil } @@ -288,8 +306,9 @@ type FragmentReader struct { pb.Fragment // Fragment being read. Offset int64 // Next journal offset to be read, in range [Begin, End). - decomp io.ReadCloser - raw io.ReadCloser + decomp io.ReadCloser + raw io.ReadCloser + counter prometheus.Counter } // Read returns the next bytes of decompressed Fragment content. When Read @@ -310,6 +329,7 @@ func (fr *FragmentReader) Read(p []byte) (n int, err error) { // Did we read EOF before the reaching Fragment.End? err = io.ErrUnexpectedEOF } + fr.counter.Add(float64(n)) return } @@ -362,6 +382,13 @@ func mapGRPCCtxErr(ctx context.Context, err error) error { return err } +func fragmentLabels(fragment pb.Fragment) prometheus.Labels { + return prometheus.Labels{ + "journal": fragment.Journal.String(), + "codec": fragment.CompressionCodec.String(), + } +} + var ( // Map common broker error statuses into named errors. ErrInsufficientJournalBrokers = errors.New(pb.Status_INSUFFICIENT_JOURNAL_BROKERS.String()) diff --git a/broker/client/retry_reader.go b/broker/client/retry_reader.go index 96c5a1b4..a0e5f4a4 100644 --- a/broker/client/retry_reader.go +++ b/broker/client/retry_reader.go @@ -84,6 +84,7 @@ func (rr *RetryReader) Read(p []byte) (n int, err error) { Response: rr.Reader.Response, ctx: rr.Reader.ctx, client: rr.Reader.client, + counter: rr.Reader.counter, } var squelch bool From e7c18c1b78cc9b0b13effa8d2350c2943f48b85c Mon Sep 17 00:00:00 2001 From: Johnny Graettinger Date: Sat, 12 Feb 2022 10:13:22 -0500 Subject: [PATCH 2/2] consumer: add pprof.Labels to shard service routines --- consumer/shard.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/consumer/shard.go b/consumer/shard.go index daf4dab2..feb8eaf1 100644 --- a/consumer/shard.go +++ b/consumer/shard.go @@ -2,6 +2,7 @@ package consumer import ( "context" + "runtime/pprof" "sync" "time" @@ -73,7 +74,12 @@ type shard struct { } func newShard(svc *Service, item keyspace.KeyValue) *shard { - var ctx, cancel = context.WithCancel(context.Background()) + var spec = item.Decoded.(allocator.Item).ItemValue.(*pc.ShardSpec) + + var ctx, cancel = context.WithCancel( + pprof.WithLabels(context.Background(), pprof.Labels( + "shard", spec.Id.String(), + ))) var s = &shard{ svc: svc, @@ -84,7 +90,7 @@ func newShard(svc *Service, item keyspace.KeyValue) *shard { primary: client.NewAsyncOperation(), } s.resolved.fqn = string(item.Raw.Key) - s.resolved.spec = item.Decoded.(allocator.Item).ItemValue.(*pc.ShardSpec) + s.resolved.spec = spec s.resolved.RWMutex = &svc.State.KS.Mu // We grab this value only once (since RecoveryLog()'s value may change in @@ -166,6 +172,8 @@ var transition = func(s *shard, item, assignment keyspace.KeyValue) { // serveStandby recovers and tails the shard recovery log, until the Replica is // cancelled or promoted to primary. func serveStandby(s *shard) (err error) { + pprof.SetGoroutineLabels(s.ctx) + // Defer a trap which logs and updates Etcd status based on exit error. defer func() { if err != nil && s.ctx.Err() == nil { @@ -210,6 +218,8 @@ func serveStandby(s *shard) (err error) { // servePrimary completes playback of the recovery log, // performs initialization, and runs consumer transactions. func servePrimary(s *shard) (err error) { + pprof.SetGoroutineLabels(s.ctx) + // Defer a trap which logs and updates Etcd status based on exit error. defer func() { s.primary.Resolve(err)