Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
70502: util/tracing: simplify log messages in trace recordings r=andreimatei a=andreimatei

Before this patch, the RecordedSpan proto stored log messages in a very
awkward way: each message was stored as a collection of key/values, with
only one such pair present (using a well-known key). This was confusing,
unnecessary, hard to work with and hard to track for figuring out what
keys and values are in there (with the answer being only one key). This
patch simplifies the log messages, making them be represented by a
single string as nature intended. A bunch of code gets simplified in
consequence.

Release note: None

70539: base: optimize {NodeIDContainer,StoreIDContainer}.String() r=andreimatei a=andreimatei

These String() methods were implemented in terms of their respective
SafeFormat, which was pretty expensive: upwards of 750ns and between 4-7
allocations depending on the node id. This cost caused at least two
workarounds, that the patch annotates.

The patch makes stringifying cheap by precomputing the value and moving
from SafeFormatter to SafeValue. Moving away from SafeFormatter to a
more down-to-earth implementation brings the cost down to between 0 and
1 allocations. But I went further and precomputed the value because
these containers are used as logging tags and so can easily end up
being stringified very frequently.

Release note: None

70647: changefeedccl: Add metrics to changefeed throttle. r=miretskiy a=miretskiy

Add metrics to changefeed traffic throttler.

Release Justification: Small observability changes to the existing functionality.
Release Notes: None

Co-authored-by: Andrei Matei <andrei@cockroachlabs.com>
Co-authored-by: Yevgeniy Miretskiy <yevgeniy@cockroachlabs.com>
  • Loading branch information
3 people committed Sep 30, 2021
4 parents bd732cb + 6a3af83 + 295689b + f2f824c commit 2e11a18
Show file tree
Hide file tree
Showing 38 changed files with 560 additions and 448 deletions.
2 changes: 2 additions & 0 deletions docs/generated/redact_safe.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ The following types are considered always safe for reporting:

File | Type
--|--
pkg/base/node_id.go | `*NodeIDContainer`
pkg/base/node_id.go | `*StoreIDContainer`
pkg/cli/exit/exit.go | `Code`
pkg/jobs/jobspb/wrap.go | `Type`
pkg/kv/kvserver/closedts/ctpb/service.go | `LAI`
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.1
github.com/opennota/wd v0.0.0-20180911144301-b446539ab1e7 // indirect
github.com/opentracing/opentracing-go v1.2.0
github.com/peterbourgon/g2s v0.0.0-20170223122336-d4e7ad98afea // indirect
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5
github.com/pierrre/geohash v1.0.0
Expand Down
63 changes: 37 additions & 26 deletions pkg/base/node_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,29 @@ import (
type NodeIDContainer struct {
_ util.NoCopy

// nodeID is atomically updated under the mutex; it can be read atomically
// without the mutex.
// nodeID is accessed atomically.
nodeID int32

// If nodeID has been set, str represents nodeID converted to string. We
// precompute this value to speed up String() and keep it from allocating
// memory dynamically.
str atomic.Value
}

// String returns the node ID, or "?" if it is unset.
func (n *NodeIDContainer) String() string {
return redact.StringWithoutMarkers(n)
}

// SafeFormat implements the redact.SafeFormatter interface.
func (n *NodeIDContainer) SafeFormat(w redact.SafePrinter, _ rune) {
val := n.Get()
if val == 0 {
w.SafeRune('?')
} else {
w.Print(val)
s := n.str.Load()
if s == nil {
return "?"
}
return s.(string)
}

var _ redact.SafeValue = &NodeIDContainer{}

// SafeValue implements the redact.SafeValue interface.
func (n *NodeIDContainer) SafeValue() {}

// Get returns the current node ID; 0 if it is unset.
func (n *NodeIDContainer) Get() roachpb.NodeID {
return roachpb.NodeID(atomic.LoadInt32(&n.nodeID))
Expand All @@ -67,13 +70,15 @@ func (n *NodeIDContainer) Set(ctx context.Context, val roachpb.NodeID) {
} else if oldVal != int32(val) {
log.Fatalf(ctx, "different NodeIDs set: %d, then %d", oldVal, val)
}
n.str.Store(strconv.Itoa(int(val)))
}

// Reset changes the NodeID regardless of the old value.
//
// Should only be used in testing code.
func (n *NodeIDContainer) Reset(val roachpb.NodeID) {
atomic.StoreInt32(&n.nodeID, int32(val))
n.str.Store(strconv.Itoa(int(val)))
}

// StoreIDContainer is added as a logtag in the pebbleLogger's context.
Expand All @@ -83,9 +88,13 @@ func (n *NodeIDContainer) Reset(val roachpb.NodeID) {
type StoreIDContainer struct {
_ util.NoCopy

// After the struct is initially created, storeID is atomically
// updated under the mutex; it can be read atomically without the mutex.
// storeID is accessed atomically.
storeID int32

// If storeID has been set, str represents storeID converted to string. We
// precompute this value to speed up String() and keep it from allocating
// memory dynamically.
str atomic.Value
}

// TempStoreID is used as the store id for a temp pebble engine's log
Expand All @@ -95,21 +104,18 @@ const TempStoreID = -1
// stores if they haven't been initialized. If a main store hasn't
// been initialized, then "?" is returned.
func (s *StoreIDContainer) String() string {
return redact.StringWithoutMarkers(s)
}

// SafeFormat implements the redact.SafeFormatter interface.
func (s *StoreIDContainer) SafeFormat(w redact.SafePrinter, _ rune) {
val := s.Get()
if val == 0 {
w.SafeRune('?')
} else if val == TempStoreID {
w.Print("temp")
} else {
w.Print(val)
str := s.str.Load()
if str == nil {
return "?"
}
return str.(string)
}

var _ redact.SafeValue = &StoreIDContainer{}

// SafeValue implements the redact.SafeValue interface.
func (s *StoreIDContainer) SafeValue() {}

// Get returns the current storeID; 0 if it is unset.
func (s *StoreIDContainer) Get() int32 {
return atomic.LoadInt32(&s.storeID)
Expand All @@ -133,6 +139,11 @@ func (s *StoreIDContainer) Set(ctx context.Context, val int32) {
oldVal, val)
}
}
if val == TempStoreID {
s.str.Store("temp")
} else {
s.str.Store(strconv.Itoa(int(val)))
}
}

// A SQLInstanceID is an ephemeral ID assigned to a running instance of the SQL
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/cdcutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ go_library(
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/settings",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/quotapool",
"//pkg/util/timeutil",
"//pkg/util/tracing",
],
)
Expand Down
57 changes: 50 additions & 7 deletions pkg/ccl/changefeedccl/cdcutils/throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)

Expand All @@ -29,6 +31,7 @@ type Throttler struct {
messageLimiter *quotapool.RateLimiter
byteLimiter *quotapool.RateLimiter
flushLimiter *quotapool.RateLimiter
metrics *Metrics
}

// AcquireMessageQuota acquires quota for a message with the specified size.
Expand All @@ -43,10 +46,10 @@ func (t *Throttler) AcquireMessageQuota(ctx context.Context, sz int) error {
ctx, span = tracing.ChildSpan(ctx, fmt.Sprintf("quota-wait-%s", t.name))
defer span.Finish()

if err := t.messageLimiter.WaitN(ctx, 1); err != nil {
if err := waitQuota(ctx, 1, t.messageLimiter, t.metrics.MessagesPushbackNanos); err != nil {
return err
}
return t.byteLimiter.WaitN(ctx, int64(sz))
return waitQuota(ctx, int64(sz), t.byteLimiter, t.metrics.BytesPushbackNanos)
}

// AcquireFlushQuota acquires quota for a message with the specified size.
Expand All @@ -60,8 +63,7 @@ func (t *Throttler) AcquireFlushQuota(ctx context.Context) error {
var span *tracing.Span
ctx, span = tracing.ChildSpan(ctx, fmt.Sprintf("quota-wait-flush-%s", t.name))
defer span.Finish()

return t.flushLimiter.WaitN(ctx, 1)
return waitQuota(ctx, 1, t.flushLimiter, t.metrics.FlushPushbackNanos)
}

func (t *Throttler) updateConfig(config changefeedbase.SinkThrottleConfig) {
Expand All @@ -85,7 +87,7 @@ func (t *Throttler) updateConfig(config changefeedbase.SinkThrottleConfig) {
}

// NewThrottler creates a new throttler with the specified configuration.
func NewThrottler(name string, config changefeedbase.SinkThrottleConfig) *Throttler {
func NewThrottler(name string, config changefeedbase.SinkThrottleConfig, m *Metrics) *Throttler {
logSlowAcquisition := quotapool.OnSlowAcquisition(500*time.Millisecond, quotapool.LogSlowAcquisition)
t := &Throttler{
name: name,
Expand All @@ -98,6 +100,7 @@ func NewThrottler(name string, config changefeedbase.SinkThrottleConfig) *Thrott
flushLimiter: quotapool.NewRateLimiter(
fmt.Sprintf("%s-flushes", name), 0, 0, logSlowAcquisition,
),
metrics: m,
}
t.updateConfig(config)
return t
Expand All @@ -109,7 +112,7 @@ var nodeSinkThrottle = struct {
}{}

// NodeLevelThrottler returns node level Throttler for changefeeds.
func NodeLevelThrottler(sv *settings.Values) *Throttler {
func NodeLevelThrottler(sv *settings.Values, metrics *Metrics) *Throttler {
getConfig := func() (config changefeedbase.SinkThrottleConfig) {
configStr := changefeedbase.NodeSinkThrottleConfig.Get(sv)
if configStr != "" {
Expand All @@ -126,7 +129,7 @@ func NodeLevelThrottler(sv *settings.Values) *Throttler {
if nodeSinkThrottle.Throttler != nil {
panic("unexpected state")
}
nodeSinkThrottle.Throttler = NewThrottler("cf.node.throttle", getConfig())
nodeSinkThrottle.Throttler = NewThrottler("cf.node.throttle", getConfig(), metrics)
// Update node throttler configs when settings change.
changefeedbase.NodeSinkThrottleConfig.SetOnChange(sv, func(ctx context.Context) {
nodeSinkThrottle.Throttler.updateConfig(getConfig())
Expand All @@ -135,3 +138,43 @@ func NodeLevelThrottler(sv *settings.Values) *Throttler {

return nodeSinkThrottle.Throttler
}

// Metrics is a metric.Struct for kvfeed metrics.
type Metrics struct {
BytesPushbackNanos *metric.Counter
MessagesPushbackNanos *metric.Counter
FlushPushbackNanos *metric.Counter
}

// MakeMetrics constructs a Metrics struct with the provided histogram window.
func MakeMetrics(histogramWindow time.Duration) Metrics {
makeMetric := func(n string) metric.Metadata {
return metric.Metadata{
Name: fmt.Sprintf("changefeed.%s.messages_pushback_nanos", n),
Help: fmt.Sprintf("Total time spent throttled for %s quota", n),
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
}

return Metrics{
BytesPushbackNanos: metric.NewCounter(makeMetric("bytes")),
MessagesPushbackNanos: metric.NewCounter(makeMetric("messages")),
FlushPushbackNanos: metric.NewCounter(makeMetric("flush")),
}
}

var _ metric.Struct = (*Metrics)(nil)

// MetricStruct makes Metrics a metric.Struct.
func (m Metrics) MetricStruct() {}

func waitQuota(
ctx context.Context, n int64, limit *quotapool.RateLimiter, c *metric.Counter,
) error {
start := timeutil.Now()
defer func() {
c.Inc(int64(timeutil.Now().Sub(start)))
}()
return limit.WaitN(ctx, n)
}
5 changes: 3 additions & 2 deletions pkg/ccl/changefeedccl/cdcutils/throttle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package cdcutils
import (
"context"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand All @@ -24,8 +25,8 @@ func TestNodeLevelThrottler(t *testing.T) {
defer log.Scope(t).Close(t)

sv := &cluster.MakeTestingClusterSettings().SV

throttler := NodeLevelThrottler(sv)
m := MakeMetrics(time.Minute)
throttler := NodeLevelThrottler(sv, &m)

// Default: no throttling
require.True(t, throttler.messageLimiter.AdmitN(10000000))
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func (ca *changeAggregator) startKVFeed(
cfg := ca.flowCtx.Cfg
buf := kvevent.NewThrottlingBuffer(
kvevent.NewMemBuffer(ca.kvFeedMemMon.MakeBoundAccount(), &cfg.Settings.SV, &ca.metrics.KVFeedMetrics),
cdcutils.NodeLevelThrottler(&cfg.Settings.SV))
cdcutils.NodeLevelThrottler(&cfg.Settings.SV, &ca.metrics.ThrottleMetrics))

// KVFeed takes ownership of the kvevent.Writer portion of the buffer, while
// we return the kvevent.Reader part to the caller.
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcutils"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed"
"github.com/cockroachdb/cockroach/pkg/jobs"
Expand Down Expand Up @@ -233,6 +234,7 @@ type Metrics struct {
Running *metric.Gauge

FrontierUpdates *metric.Counter
ThrottleMetrics cdcutils.Metrics

mu struct {
syncutil.Mutex
Expand Down Expand Up @@ -270,7 +272,9 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct {

Running: metric.NewGauge(metaChangefeedRunning),
FrontierUpdates: metric.NewCounter(metaChangefeedFrontierUpdates),
ThrottleMetrics: cdcutils.MakeMetrics(histogramWindow),
}

m.mu.resolved = make(map[int]hlc.Timestamp)
m.mu.id = 1 // start the first id at 1 so we can detect initialization
m.MaxBehindNanos = metric.NewFunctionalGauge(metaChangefeedMaxBehindNanos, func() int64 {
Expand Down
11 changes: 2 additions & 9 deletions pkg/kv/bulk/sst_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"math/rand"
"reflect"
"runtime"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
Expand Down Expand Up @@ -225,14 +224,8 @@ func runTestImport(t *testing.T, batchSizeValue int64) {
}
}
var splitRetries int
for _, rec := range getRec() {
for _, l := range rec.Logs {
for _, line := range l.Fields {
if strings.Contains(line.Value.StripMarkers(), "SSTable cannot be added spanning range bounds") {
splitRetries++
}
}
}
for _, sp := range getRec() {
splitRetries += tracing.CountLogMessages(sp, "SSTable cannot be added spanning range bounds")
}
if splitRetries != expectedSplitRetries {
t.Fatalf("expected %d split-caused retries, got %d", expectedSplitRetries, splitRetries)
Expand Down
22 changes: 8 additions & 14 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1018,16 +1018,14 @@ func (m *monitor) collectRecordings() string {
rec := g.collect()
for _, span := range rec {
for _, log := range span.Logs {
for _, field := range log.Fields {
if prev > 0 {
prev--
continue
}
logs = append(logs, logRecord{
g: g, value: field.Value.StripMarkers(),
})
g.prevEvents++
if prev > 0 {
prev--
continue
}
logs = append(logs, logRecord{
g: g, value: log.Msg().StripMarkers(),
})
g.prevEvents++
}
}
if atomic.LoadInt32(&g.finished) == 1 {
Expand Down Expand Up @@ -1068,11 +1066,7 @@ func (m *monitor) hasNewEvents(g *monitoredGoroutine) bool {
events := 0
rec := g.collect()
for _, span := range rec {
for _, log := range span.Logs {
for range log.Fields {
events++
}
}
events += len(span.Logs)
}
return events > g.prevEvents
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,7 @@ func (ctx *Context) grpcDialOptions(
// is in setupSpanForIncomingRPC().
//
tagger := func(span *tracing.Span) {
span.SetTag("node", attribute.StringValue(ctx.NodeID.Get().String()))
span.SetTag("node", attribute.IntValue(int(ctx.NodeID.Get())))
}
unaryInterceptors = append(unaryInterceptors,
tracing.ClientInterceptor(tracer, tagger))
Expand Down
Loading

0 comments on commit 2e11a18

Please sign in to comment.