Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: add new metric instrumentation for reads & appends #317

Merged
merged 2 commits into from
Feb 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions broker/client/appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
pb "go.gazette.dev/core/broker/protocol"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -29,9 +30,10 @@ type Appender struct {
Request pb.AppendRequest // AppendRequest of the Append.
Response pb.AppendResponse // AppendResponse sent by broker.

ctx context.Context
client pb.RoutedJournalClient // Client against which Read is dispatched.
stream pb.Journal_AppendClient // Server stream.
ctx context.Context
client pb.RoutedJournalClient // Client against which Read is dispatched.
counter prometheus.Counter // Counter of appended bytes.
stream pb.Journal_AppendClient // Server stream.
}

// NewAppender returns an initialized Appender of the given AppendRequest.
Expand All @@ -40,6 +42,7 @@ func NewAppender(ctx context.Context, client pb.RoutedJournalClient, req pb.Appe
Request: req,
ctx: ctx,
client: client,
counter: appendBytes.WithLabelValues(req.Journal.String()),
}
return a
}
Expand All @@ -66,6 +69,7 @@ func (a *Appender) Write(p []byte) (n int, err error) {
if err != nil {
err = mapGRPCCtxErr(a.ctx, err)
}
a.counter.Add(float64(n))
return
}

Expand Down
36 changes: 36 additions & 0 deletions broker/client/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,39 @@
// PolledList, which is an important building-block for applications scaling to
// multiple journals.
package client

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
appendBytes = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "gazette_append_bytes_total",
Help: "Total number of journal bytes appended.",
}, []string{"journal"})
readBytes = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "gazette_read_bytes_total",
Help: "Total number of journal bytes read.",
}, []string{"journal"})
fragmentOpen = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "gazette_fragment_open_total",
Help: "Total number of journal fragments opened for reading.",
}, []string{"journal", "codec"})
fragmentOpenBytes = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "gazette_fragment_open_bytes_total",
Help: "Aggregate uncompressed bytes of journal fragments opened for reading.",
}, []string{"journal", "codec"})
fragmentOpenContentLength = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "gazette_fragment_open_content_length_total",
Help: "Aggregate content-length of journal fragments opened for reading.",
}, []string{"journal", "codec"})
readFragmentBytes = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "gazette_read_fragment_bytes_total",
Help: "Total number of journal fragment bytes read, excluding discarded bytes.",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would probably be good for the description here to explicitly state that this is uncompressed, because I could see somebody reading "fragment bytes" as meaning "bytes of compressed fragment data from cloud storage".

}, []string{"journal", "codec"})
discardFragmentBytes = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "gazette_discard_fragment_bytes_total",
Help: "Total number of journal fragment bytes discarded while seeking to desired offset.",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also here.

}, []string{"journal", "codec"})
)
39 changes: 33 additions & 6 deletions broker/client/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io/ioutil"
"net/http"

"github.com/prometheus/client_golang/prometheus"
"go.gazette.dev/core/broker/codecs"
pb "go.gazette.dev/core/broker/protocol"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -39,10 +40,11 @@ type Reader struct {
Request pb.ReadRequest // ReadRequest of the Reader.
Response pb.ReadResponse // Most recent ReadResponse from broker.

ctx context.Context
client pb.RoutedJournalClient // Client against which Read is dispatched.
stream pb.Journal_ReadClient // Server stream.
direct io.ReadCloser // Directly opened Fragment URL.
ctx context.Context
client pb.RoutedJournalClient // Client against which Read is dispatched.
counter prometheus.Counter // Counter of read bytes.
stream pb.Journal_ReadClient // Server stream.
direct io.ReadCloser // Directly opened Fragment URL.
}

// NewReader returns an initialized Reader of the given ReadRequest.
Expand All @@ -51,6 +53,7 @@ func NewReader(ctx context.Context, client pb.RoutedJournalClient, req pb.ReadRe
Request: req,
ctx: ctx,
client: client,
counter: readBytes.WithLabelValues(req.Journal.String()),
}
return r
}
Expand All @@ -76,13 +79,15 @@ func (r *Reader) Read(p []byte) (n int, err error) {
_ = r.direct.Close()
}
r.Request.Offset += int64(n)
r.counter.Add(float64(n))
return
}

// Is there remaining content in the last ReadResponse?
if l, d := len(r.Response.Content), int(r.Request.Offset-r.Response.Offset); l != 0 && l > d {
n = copy(p, r.Response.Content[d:])
r.Request.Offset += int64(n)
r.counter.Add(float64(n))
return
}

Expand Down Expand Up @@ -255,6 +260,15 @@ func OpenFragmentURL(ctx context.Context, fragment pb.Fragment, offset int64, ur

fragment.CompressionCodec = pb.CompressionCodec_GZIP // Decompress client-side.
}

// Record metrics related to opening the fragment.
var labels = fragmentLabels(fragment)
fragmentOpen.With(labels).Inc()
fragmentOpenBytes.With(labels).Add(float64(fragment.End - fragment.Begin))
if resp.ContentLength != -1 {
fragmentOpenContentLength.With(labels).Add(float64(resp.ContentLength))
}

return NewFragmentReader(resp.Body, fragment, offset)
}

Expand All @@ -272,6 +286,7 @@ func NewFragmentReader(rc io.ReadCloser, fragment pb.Fragment, offset int64) (*F
raw: rc,
Fragment: fragment,
Offset: fragment.Begin,
counter: discardFragmentBytes.With(fragmentLabels(fragment)),
}

// Attempt to seek to |offset| within the fragment.
Expand All @@ -280,6 +295,9 @@ func NewFragmentReader(rc io.ReadCloser, fragment pb.Fragment, offset int64) (*F
_ = fr.Close()
return nil, err
}
// We've finished discarding required bytes.
fr.counter = readFragmentBytes.With(fragmentLabels(fragment))

return fr, nil
}

Expand All @@ -288,8 +306,9 @@ type FragmentReader struct {
pb.Fragment // Fragment being read.
Offset int64 // Next journal offset to be read, in range [Begin, End).

decomp io.ReadCloser
raw io.ReadCloser
decomp io.ReadCloser
raw io.ReadCloser
counter prometheus.Counter
}

// Read returns the next bytes of decompressed Fragment content. When Read
Expand All @@ -310,6 +329,7 @@ func (fr *FragmentReader) Read(p []byte) (n int, err error) {
// Did we read EOF before the reaching Fragment.End?
err = io.ErrUnexpectedEOF
}
fr.counter.Add(float64(n))
return
}

Expand Down Expand Up @@ -362,6 +382,13 @@ func mapGRPCCtxErr(ctx context.Context, err error) error {
return err
}

func fragmentLabels(fragment pb.Fragment) prometheus.Labels {
return prometheus.Labels{
"journal": fragment.Journal.String(),
"codec": fragment.CompressionCodec.String(),
}
}

var (
// Map common broker error statuses into named errors.
ErrInsufficientJournalBrokers = errors.New(pb.Status_INSUFFICIENT_JOURNAL_BROKERS.String())
Expand Down
1 change: 1 addition & 0 deletions broker/client/retry_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func (rr *RetryReader) Read(p []byte) (n int, err error) {
Response: rr.Reader.Response,
ctx: rr.Reader.ctx,
client: rr.Reader.client,
counter: rr.Reader.counter,
}

var squelch bool
Expand Down
14 changes: 12 additions & 2 deletions consumer/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package consumer

import (
"context"
"runtime/pprof"
"sync"
"time"

Expand Down Expand Up @@ -73,7 +74,12 @@ type shard struct {
}

func newShard(svc *Service, item keyspace.KeyValue) *shard {
var ctx, cancel = context.WithCancel(context.Background())
var spec = item.Decoded.(allocator.Item).ItemValue.(*pc.ShardSpec)

var ctx, cancel = context.WithCancel(
pprof.WithLabels(context.Background(), pprof.Labels(
"shard", spec.Id.String(),
)))

var s = &shard{
svc: svc,
Expand All @@ -84,7 +90,7 @@ func newShard(svc *Service, item keyspace.KeyValue) *shard {
primary: client.NewAsyncOperation(),
}
s.resolved.fqn = string(item.Raw.Key)
s.resolved.spec = item.Decoded.(allocator.Item).ItemValue.(*pc.ShardSpec)
s.resolved.spec = spec
s.resolved.RWMutex = &svc.State.KS.Mu

// We grab this value only once (since RecoveryLog()'s value may change in
Expand Down Expand Up @@ -166,6 +172,8 @@ var transition = func(s *shard, item, assignment keyspace.KeyValue) {
// serveStandby recovers and tails the shard recovery log, until the Replica is
// cancelled or promoted to primary.
func serveStandby(s *shard) (err error) {
pprof.SetGoroutineLabels(s.ctx)

// Defer a trap which logs and updates Etcd status based on exit error.
defer func() {
if err != nil && s.ctx.Err() == nil {
Expand Down Expand Up @@ -210,6 +218,8 @@ func serveStandby(s *shard) (err error) {
// servePrimary completes playback of the recovery log,
// performs initialization, and runs consumer transactions.
func servePrimary(s *shard) (err error) {
pprof.SetGoroutineLabels(s.ctx)

// Defer a trap which logs and updates Etcd status based on exit error.
defer func() {
s.primary.Resolve(err)
Expand Down