Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
130110: ops: add ttl metrics for certificate expiration r=angles-n-daemons a=angles-n-daemons

ops: add ttl metrics for certificate expiration

Currently, cockroach only exposes point in time certificate
expiration metrics. If the certificate is to expire 1 day from now,
we expose a gauge `security.certificate.expiration.<cert-type>`
which is the unix timestamp when it will expire. This PR also
exposes a ttl metric `security.certificate.ttl.<cert-type>` so that
consumers of this information can run operations based on their
distance to certificate expiration without additional
transformations.

Additionally, this PR refactors how the expiration gauges are set,
so that reads of the gauge directly reference the value of the
certificate.

Epic: CRDB-40209
Fixes: cockroachdb#77376

Release note (ops change): new metrics which expose the ttl for various
certificates


130699: replica_rac2: remove a todo and add another one r=pav-kv a=sumeerbhola

The first todo related to Processor.OnDescChangedLocked is obsolete, since we enqueue for ready processing. But ready procesing is not unconditional, hence the new todo.

Epic: CRDB-37515

Release note: None

Co-authored-by: angles-n-daemons <brian.dillmann@cockroachlabs.com>
Co-authored-by: sumeerbhola <sumeer@cockroachlabs.com>
  • Loading branch information
3 people committed Sep 13, 2024
3 parents ae4d08b + c830973 + fa79ddb commit 66edff7
Show file tree
Hide file tree
Showing 10 changed files with 393 additions and 125 deletions.
25 changes: 14 additions & 11 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,14 +297,6 @@ type Processor interface {
// tenantID passed in all calls must be the same.
//
// Both Replica mu and raftMu are held.
//
// TODO(sumeer): we are currently delaying the processing caused by this
// until HandleRaftReadyRaftMuLocked, including telling the
// RangeController. However, RangeController.WaitForEval needs to have the
// latest state. We need to either (a) change this
// OnDescChangedRaftMuLocked, or (b) add a method in RangeController that
// only updates the voting replicas used in WaitForEval, and call that
// from OnDescChangedLocked, and do the rest of the updating later.
OnDescChangedLocked(
ctx context.Context, desc *roachpb.RangeDescriptor, tenantID roachpb.TenantID)

Expand Down Expand Up @@ -610,9 +602,20 @@ func (p *processorImpl) OnDescChangedLocked(
}
p.desc.replicas = descToReplicaSet(desc)
p.desc.replicasChanged = true
// We need to promptly return tokens if some replicas have been removed,
// since those tokens could be used by other ranges with replicas on the
// same store. Ensure that promptness by scheduling ready.
// We need to promptly:
// - Return tokens if some replicas have been removed, since those tokens
// could be used by other ranges with replicas on the same store.
// - Update (create) the RangeController's state used in WaitForEval, and
// for sending MsgApps to new replicas (by creating replicaSendStreams).
//
// We ensure that promptness by scheduling ready.
//
// TODO(sumeer): this is currently gated on !initialization due to kvserver
// test failure for a quiescence test that ought to be rewritten. So if
// processorImpl starts in pull mode, this is the leader, there are no new
// entries being written, and other replicas have a send-queue, MsgApps can
// be (arbitrarily?) delayed. Change this to unconditionally call
// EnqueueRaftReady.
if !initialization {
p.opts.RaftScheduler.EnqueueRaftReady(p.opts.RangeID)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/security/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ go_test(
"cert_expiry_cache_test.go",
"certificate_loader_test.go",
"certificate_manager_test.go",
"certificate_metrics_test.go",
"certs_rotation_test.go",
"certs_tenant_test.go",
"certs_test.go",
Expand Down
84 changes: 65 additions & 19 deletions pkg/security/cert_expiry_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ var ClientCertExpirationCacheCapacity = settings.RegisterIntSetting(
1000,
settings.WithPublic)

type clientCertExpirationMetrics struct {
expiration aggmetric.Gauge
ttl aggmetric.Gauge
}

// ClientCertExpirationCache contains a cache of gauge objects keyed by
// SQL username strings. It is a FIFO cache that stores gauges valued by
// minimum expiration of the client certs seen (per user).
Expand Down Expand Up @@ -87,12 +92,14 @@ func NewClientCertExpirationCache(
return int64(size) > capacity
},
OnEvictedEntry: func(entry *cache.Entry) {
gauge := entry.Value.(*aggmetric.Gauge)
metrics := entry.Value.(*clientCertExpirationMetrics)
// The child metric will continue to report into the parent metric even
// after unlinking, so we also reset it to 0.
gauge.Update(0)
gauge.Unlink()
c.mu.acc.Shrink(ctx, int64(unsafe.Sizeof(*gauge)))
metrics.expiration.Update(0)
metrics.expiration.Unlink()
metrics.ttl.Update(0)
metrics.ttl.Unlink()
c.mu.acc.Shrink(ctx, int64(unsafe.Sizeof(*metrics)))
},
})
c.mon = mon.NewMonitorInheritWithLimit(
Expand All @@ -112,50 +119,89 @@ func NewClientCertExpirationCache(
return c
}

// Get retrieves the cert expiration for the given username, if it exists.
// GetTTL retrieves seconds till cert expiration for the given username, if it exists.
// A TTL of 0 indicates an entry was not found.
func (c *ClientCertExpirationCache) GetTTL(key string) (int64, bool) {
c.mu.Lock()
defer c.mu.Unlock()
value, ok := c.mu.cache.Get(key)
if !ok {
return 0, ok
}
// If the metrics has already been reached, remove the entry and indicate
// that the entry was not found.
metrics := value.(*clientCertExpirationMetrics)
if metrics.expiration.Value() < c.timeNow() {
c.mu.cache.Del(key)
return 0, false
}
return metrics.ttl.Value(), ok
}

// GetExpiration retrieves the cert expiration for the given username, if it exists.
// An expiration of 0 indicates an entry was not found.
func (c *ClientCertExpirationCache) Get(key string) (int64, bool) {
func (c *ClientCertExpirationCache) GetExpiration(key string) (int64, bool) {
c.mu.Lock()
defer c.mu.Unlock()
value, ok := c.mu.cache.Get(key)
if !ok {
return 0, ok
}
// If the expiration has already been reached, remove the entry and indicate
// If the metrics has already been reached, remove the entry and indicate
// that the entry was not found.
gauge := value.(*aggmetric.Gauge)
if gauge.Value() < c.timeNow() {
metrics := value.(*clientCertExpirationMetrics)
if metrics.expiration.Value() < c.timeNow() {
c.mu.cache.Del(key)
return 0, false
}
return gauge.Value(), ok
return metrics.expiration.Value(), ok
}

// ttlFunc returns a function function which takes a time,
// if the time is past returns 0, otherwise returns the number
// of seconds until that timestamp
func ttlFunc(now func() int64, exp int64) func() int64 {
return func() int64 {
ttl := exp - now()
if ttl > 0 {
return ttl
} else {
return 0
}
}
}

// MaybeUpsert may update or insert a client cert expiration gauge for a
// particular user into the cache. An update is contingent on whether the
// old expiration is after the new expiration. This ensures that the cache
// maintains the minimum expiration for each user.
func (c *ClientCertExpirationCache) MaybeUpsert(
ctx context.Context, key string, newExpiry int64, parentGauge *aggmetric.AggGauge,
ctx context.Context,
key string,
newExpiry int64,
parentExpirationGauge *aggmetric.AggGauge,
parentTTLGauge *aggmetric.AggGauge,
) {
c.mu.Lock()
defer c.mu.Unlock()

value, ok := c.mu.cache.Get(key)
if !ok {
err := c.mu.acc.Grow(ctx, int64(unsafe.Sizeof(aggmetric.Gauge{})))
err := c.mu.acc.Grow(ctx, int64(unsafe.Sizeof(clientCertExpirationMetrics{})))
if err == nil {
// Only create new gauges for expirations in the future.
if newExpiry > c.timeNow() {
gauge := parentGauge.AddChild(key)
gauge.Update(newExpiry)
c.mu.cache.Add(key, gauge)
expiration := parentExpirationGauge.AddChild(key)
expiration.Update(newExpiry)
ttl := parentTTLGauge.AddFunctionalChild(ttlFunc(c.timeNow, newExpiry), key)
c.mu.cache.Add(key, &clientCertExpirationMetrics{*expiration, *ttl})
}
} else {
log.Ops.Warningf(ctx, "no memory available to cache cert expiry: %v", err)
}
} else if gauge := value.(*aggmetric.Gauge); newExpiry < gauge.Value() || gauge.Value() == 0 {
gauge.Update(newExpiry)
} else if metrics := value.(*clientCertExpirationMetrics); newExpiry < metrics.expiration.Value() || metrics.expiration.Value() == 0 {
metrics.expiration.Update(newExpiry)
metrics.ttl.UpdateFn(ttlFunc(c.timeNow, newExpiry))
}
}

Expand Down Expand Up @@ -216,8 +262,8 @@ func (c *ClientCertExpirationCache) PurgePastExpirations() {
var deleteEntryKeys []interface{}
now := c.timeNow()
c.mu.cache.Do(func(entry *cache.Entry) {
gauge := entry.Value.(*aggmetric.Gauge)
if gauge.Value() <= now {
metrics := entry.Value.(*clientCertExpirationMetrics)
if metrics.expiration.Value() <= now {
deleteEntryKeys = append(deleteEntryKeys, entry.Key)
}
})
Expand Down
83 changes: 48 additions & 35 deletions pkg/security/cert_expiry_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,75 +41,88 @@ func TestEntryCache(t *testing.T) {

ctx := context.Background()

timesource := timeutil.NewManualTime(timeutil.Unix(0, 123))
// Create a cache with a capacity of 3.
cache, metric := newCache(
cache, expMetric, ttlMetric := newCache(
ctx,
&cluster.Settings{},
3, /* capacity */
timeutil.NewManualTime(timeutil.Unix(0, 123)),
timesource,
)
require.Equal(t, 0, cache.Len())

// Verify insert.
cache.MaybeUpsert(ctx, fooUser, laterExpiration, metric)
cache.MaybeUpsert(ctx, fooUser, laterExpiration, expMetric, ttlMetric)
require.Equal(t, 1, cache.Len())

// Verify update.
cache.MaybeUpsert(ctx, fooUser, closerExpiration, metric)
cache.MaybeUpsert(ctx, fooUser, closerExpiration, expMetric, ttlMetric)
require.Equal(t, 1, cache.Len())

// Verify retrieval.
expiration, found := cache.Get(fooUser)
expiration, found := cache.GetExpiration(fooUser)
require.Equal(t, true, found)
require.Equal(t, closerExpiration, expiration)

// Verify the cache retains the minimum expiration for a user, assuming no
// eviction.
cache.MaybeUpsert(ctx, barUser, closerExpiration, metric)
cache.MaybeUpsert(ctx, barUser, closerExpiration, expMetric, ttlMetric)
require.Equal(t, 2, cache.Len())
cache.MaybeUpsert(ctx, barUser, laterExpiration, metric)
cache.MaybeUpsert(ctx, barUser, laterExpiration, expMetric, ttlMetric)
require.Equal(t, 2, cache.Len())
expiration, found = cache.Get(barUser)
expiration, found = cache.GetExpiration(barUser)
require.Equal(t, true, found)
require.Equal(t, closerExpiration, expiration)

// Verify indication of absence for non-existent values.
expiration, found = cache.Get(fakeUser)
expiration, found = cache.GetExpiration(fakeUser)
require.Equal(t, false, found)
require.Equal(t, int64(0), expiration)

// Verify eviction when the capacity is exceeded.
cache.MaybeUpsert(ctx, blahUser, laterExpiration, metric)
cache.MaybeUpsert(ctx, blahUser, laterExpiration, expMetric, ttlMetric)
require.Equal(t, 3, cache.Len())
cache.MaybeUpsert(ctx, fakeUser, closerExpiration, metric)
cache.MaybeUpsert(ctx, fakeUser, closerExpiration, expMetric, ttlMetric)
require.Equal(t, 3, cache.Len())
_, found = cache.Get(fooUser)
_, found = cache.GetExpiration(fooUser)
require.Equal(t, false, found)
_, found = cache.Get(barUser)
_, found = cache.GetExpiration(barUser)
require.Equal(t, true, found)

// Verify previous entries can be inserted after the cache is cleared.
cache.Clear()
require.Equal(t, 0, cache.Len())
_, found = cache.Get(fooUser)
_, found = cache.GetExpiration(fooUser)
require.Equal(t, false, found)
_, found = cache.Get(barUser)
_, found = cache.GetExpiration(barUser)
require.Equal(t, false, found)
cache.MaybeUpsert(ctx, fooUser, laterExpiration, metric)
cache.MaybeUpsert(ctx, fooUser, laterExpiration, expMetric, ttlMetric)
require.Equal(t, 1, cache.Len())
cache.MaybeUpsert(ctx, barUser, laterExpiration, metric)
cache.MaybeUpsert(ctx, barUser, laterExpiration, expMetric, ttlMetric)
require.Equal(t, 2, cache.Len())
expiration, found = cache.Get(fooUser)
expiration, found = cache.GetExpiration(fooUser)
require.Equal(t, true, found)
require.Equal(t, laterExpiration, expiration)
expiration, found = cache.Get(barUser)
expiration, found = cache.GetExpiration(barUser)
require.Equal(t, true, found)
require.Equal(t, laterExpiration, expiration)

// Verify expirations in the past cannot be inserted into the cache.
cache.Clear()
cache.MaybeUpsert(ctx, fooUser, int64(0), metric)
cache.MaybeUpsert(ctx, fooUser, int64(0), expMetric, ttlMetric)
require.Equal(t, 0, cache.Len())

// Verify value of TTL metrics
cache.Clear()
timesource.AdvanceTo(timeutil.Unix(closerExpiration+20, 0))
cache.MaybeUpsert(ctx, fooUser, closerExpiration, expMetric, ttlMetric)
cache.MaybeUpsert(ctx, barUser, laterExpiration, expMetric, ttlMetric)
ttl, found := cache.GetTTL(fooUser)
require.Equal(t, false, found)
require.Equal(t, int64(0), ttl)
ttl, found = cache.GetTTL(barUser)
require.Equal(t, true, found)
require.Equal(t, laterExpiration-(closerExpiration+20), ttl)
}

func TestPurgePastEntries(t *testing.T) {
Expand All @@ -130,31 +143,31 @@ func TestPurgePastEntries(t *testing.T) {

// Create a cache with a capacity of 4.
clock := timeutil.NewManualTime(timeutil.Unix(0, 123))
cache, metric := newCache(ctx, &cluster.Settings{}, 4 /* capacity */, clock)
cache, expMetric, ttlMetric := newCache(ctx, &cluster.Settings{}, 4 /* capacity */, clock)

// Insert entries that we expect to be cleaned up after advancing in time.
cache.MaybeUpsert(ctx, fooUser, pastExpiration1, metric)
cache.MaybeUpsert(ctx, barUser, pastExpiration2, metric)
cache.MaybeUpsert(ctx, blahUser, pastExpiration2, metric)
cache.MaybeUpsert(ctx, fooUser, pastExpiration1, expMetric, ttlMetric)
cache.MaybeUpsert(ctx, barUser, pastExpiration2, expMetric, ttlMetric)
cache.MaybeUpsert(ctx, blahUser, pastExpiration2, expMetric, ttlMetric)
// Insert an entry that should NOT be removed after advancing in time
// because it is still in the future.
cache.MaybeUpsert(ctx, bazUser, futureExpiration, metric)
cache.MaybeUpsert(ctx, bazUser, futureExpiration, expMetric, ttlMetric)
require.Equal(t, 4, cache.Len())

// Advance time so that expirations have been reached already.
clock.AdvanceTo(timeutil.Unix(2000000000, 123))

// Verify an expiration from the past cannot be retrieved. Confirm it has
// been removed after the attempt as well.
_, found := cache.Get(fooUser)
_, found := cache.GetExpiration(fooUser)
require.Equal(t, false, found)
require.Equal(t, 3, cache.Len())

// Verify that when the cache gets cleaned of the past expirations.
// Confirm that expirations in the future do not get removed.
cache.PurgePastExpirations()
require.Equal(t, 1, cache.Len())
_, found = cache.Get(bazUser)
_, found = cache.GetExpiration(bazUser)
require.Equal(t, true, found)
}

Expand All @@ -167,7 +180,7 @@ func TestConcurrentUpdates(t *testing.T) {
st := &cluster.Settings{}

// Create a cache with a large capacity.
cache, metric := newCache(
cache, expMetric, ttlMetric := newCache(
ctx,
st,
10000, /* capacity */
Expand All @@ -186,7 +199,7 @@ func TestConcurrentUpdates(t *testing.T) {
for i := 0; i < N; i++ {
go func(i int) {
if i%2 == 1 {
cache.MaybeUpsert(ctx, user, expiration, metric)
cache.MaybeUpsert(ctx, user, expiration, expMetric, ttlMetric)
} else {
cache.Clear()
}
Expand All @@ -201,19 +214,19 @@ func BenchmarkCertExpirationCacheInsert(b *testing.B) {
ctx := context.Background()
st := &cluster.Settings{}
clock := timeutil.NewManualTime(timeutil.Unix(0, 123))
cache, metric := newCache(ctx, st, 1000 /* capacity */, clock)
cache, expMetric, ttlMetric := newCache(ctx, st, 1000 /* capacity */, clock)

b.ResetTimer()
for i := 0; i < b.N; i++ {
cache.MaybeUpsert(ctx, "foo", clock.Now().Unix(), metric)
cache.MaybeUpsert(ctx, "bar", clock.Now().Unix(), metric)
cache.MaybeUpsert(ctx, "blah", clock.Now().Unix(), metric)
cache.MaybeUpsert(ctx, "foo", clock.Now().Unix(), expMetric, ttlMetric)
cache.MaybeUpsert(ctx, "bar", clock.Now().Unix(), expMetric, ttlMetric)
cache.MaybeUpsert(ctx, "blah", clock.Now().Unix(), expMetric, ttlMetric)
}
}

func newCache(
ctx context.Context, st *cluster.Settings, capacity int, clock *timeutil.ManualTime,
) (*security.ClientCertExpirationCache, *aggmetric.AggGauge) {
) (*security.ClientCertExpirationCache, *aggmetric.AggGauge, *aggmetric.AggGauge) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
security.ClientCertExpirationCacheCapacity.Override(ctx, &st.SV, int64(capacity))
Expand All @@ -222,5 +235,5 @@ func newCache(
Settings: st,
})
cache := security.NewClientCertExpirationCache(ctx, st, stopper, clock, parentMon)
return cache, aggmetric.MakeBuilder(security.SQLUserLabel).Gauge(metric.Metadata{})
return cache, aggmetric.MakeBuilder(security.SQLUserLabel).Gauge(metric.Metadata{}), aggmetric.MakeBuilder(security.SQLUserLabel).Gauge(metric.Metadata{})
}
2 changes: 2 additions & 0 deletions pkg/security/certificate_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ func (p PemUsage) String() string {
return "Client"
case TenantPem:
return "Tenant Client"
case TenantSigningPem:
return "Tenant Signing"
default:
return "unknown"
}
Expand Down
Loading

0 comments on commit 66edff7

Please sign in to comment.