From 2acfad2b82d5bef6a6c9c2daae072bac4956284c Mon Sep 17 00:00:00 2001 From: Vijay Karthik Date: Mon, 1 Oct 2018 14:15:45 +0530 Subject: [PATCH] server: add a configuration to enable GC of system.rangelog system.rangelog table currently grows unboundedly. The rate of growth is slow (as long as there is no replica rebalancing thrashing), but it can still become a problem in long running clusters. This commit adds cluster settings to specify interval and TTL for rows in system.rangelog. By default, TTL of system.rangelog is set to 30 days. Fixes #21260 Release note: Add configuration to enable GC of system.rangelog --- docs/generated/settings/settings.html | 2 + pkg/server/server.go | 150 ++++++++++++++++++++ pkg/server/testserver.go | 10 ++ pkg/storage/log_test.go | 189 ++++++++++++++++++++++++++ pkg/storage/store.go | 4 + 5 files changed, 355 insertions(+) diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 48bdd36540f3..fc8811d08c96 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -49,6 +49,8 @@ server.failed_reservation_timeoutduration5sthe amount of time to consider the store throttled for up-replication after a failed reservation call server.heap_profile.max_profilesinteger5maximum number of profiles to be kept. Profiles with lower score are GC'ed, but latest profile is always kept server.heap_profile.system_memory_threshold_fractionfloat0.85fraction of system memory beyond which if Rss increases, then heap profile is triggered +server.rangelog.gc_intervalduration30m0sinterval for running gc on rangelog. If storage.rangelog.ttl is non zero, range log entries older than server.rangelog.ttl are deleted +server.rangelog.ttlduration720h0m0sif non zero, range log entries older than this duration are deleted periodically based on storage.rangelog.gc_interval server.remote_debugging.modestringlocalset to enable remote debugging, localhost-only or disable (any, local, off) server.shutdown.drain_waitduration0sthe amount of time a server waits in an unready state before proceeding with the rest of the shutdown process server.shutdown.query_waitduration10sthe server will wait for at least this amount of time for active queries to finish diff --git a/pkg/server/server.go b/pkg/server/server.go index 9959a666852a..ccd6f3216ae0 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -38,6 +38,7 @@ import ( "google.golang.org/grpc" "github.com/cockroachdb/cmux" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/internal/client" @@ -83,6 +84,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" ) +const ( + // defaultRangeLogGCInterval is the default interval to run gc on rangelog. + defaultRangeLogGCInterval = 30 * time.Minute +) + var ( // Allocation pool for gzipResponseWriters. gzipResponseWriterPool sync.Pool @@ -120,6 +126,25 @@ var ( "feature.", 0, ) + + // RangeLogTTL is the TTL for rows in system.rangelog. If non zero, range log + // entries are periodically garbage collectd. The period is given by + // server.rangelog.gc_interval + RangeLogTTL = settings.RegisterDurationSetting( + "server.rangelog.ttl", + "if non zero, range log entries older than this duration are deleted periodically "+ + "based on storage.rangelog.gc_interval", + 30*24*time.Hour, // 30 days + ) + + // RangeLogGCInterval is the interval between subsequent runs of gc on + // system.rangelog. + RangeLogGCInterval = settings.RegisterDurationSetting( + "server.rangelog.gc_interval", + "interval for running gc on rangelog. If storage.rangelog.ttl is non zero, "+ + "range log entries older than server.rangelog.ttl are deleted", + defaultRangeLogGCInterval, + ) ) // TODO(peter): Until go1.11, ServeMux.ServeHTTP was not safe to call @@ -1037,6 +1062,129 @@ func (s *Server) startPersistingHLCUpperBound( ) } +// gcRangeLog deletes entries in system.rangelog older than the given +// cutoffTimestamp if the server is the lease holder for range 1. +// Leaseholder constraint is present so that only one node in the cluster +// performs rangelog gc. +// It returns the node currently responsible for performing GC, the number of +// rows affected and error (if any). +func (s *Server) gcRangeLog( + ctx context.Context, cutoffTimestamp time.Time, +) (roachpb.NodeID, int, error) { + const selectLeaseHolderStmt = `SELECT lease_holder from crdb_internal.ranges where range_id=1` + const deleteStmt = `DELETE FROM system.rangelog WHERE timestamp <= $1 LIMIT 1000` + var totalRowsAffected int + var gcNode roachpb.NodeID + for { + var rowsAffected int + err := s.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { + if storeKnobs, ok := s.cfg.TestingKnobs.Store.(*storage.StoreTestingKnobs); ok && storeKnobs.RangelogGCNode != 0 { + gcNode = storeKnobs.RangelogGCNode + + } else { + row, err := s.internalExecutor.QueryRow( + ctx, + "leaseholder-r1", + txn, + selectLeaseHolderStmt, + ) + if err != nil { + return err + } + + if row == nil { + return errors.New("lease holder not found for range 1") + } + + leaseHolder, ok := row[0].(*tree.DInt) + if !ok { + return errors.New("lease holder is not an int") + } + if leaseHolder == nil || *leaseHolder <= 0 { + return errors.Errorf("invalid lease holder %v", leaseHolder) + } + + gcNode = roachpb.NodeID(*leaseHolder) + } + + if gcNode != s.node.Descriptor.NodeID { + return nil + } + + var err error + rowsAffected, err = s.internalExecutor.Exec( + ctx, + "rangelog-gc", + txn, + deleteStmt, + cutoffTimestamp, + ) + return err + }) + totalRowsAffected += rowsAffected + if err != nil || rowsAffected == 0 { + return gcNode, totalRowsAffected, err + } + } +} + +// startRangeLogGC starts a worker which periodically GCs system.rangelog. +// The period is controlled by server.rangelog.gc_interval and the TTL is +// controlled by server.rangelog.ttl +func (s *Server) startRangeLogGC(ctx context.Context) { + s.stopper.RunWorker(ctx, func(ctx context.Context) { + intervalChangeCh := make(chan time.Duration) + interval := RangeLogGCInterval.Get(&s.cfg.Settings.SV) + RangeLogGCInterval.SetOnChange(&s.cfg.Settings.SV, func() { + intervalChangeCh <- RangeLogGCInterval.Get(&s.cfg.Settings.SV) + }) + t := time.NewTimer(interval) + defer t.Stop() + for { + select { + case interval = <-intervalChangeCh: + if !t.Stop() { + <-t.C + } + t.Reset(interval) + case <-t.C: + ttl := RangeLogTTL.Get(&s.cfg.Settings.SV) + if ttl > 0 { + cutoffTimestamp := timeutil.Unix(0, s.db.Clock().PhysicalNow()-int64(ttl)) + gcNode, rowsAffected, err := s.gcRangeLog(ctx, cutoffTimestamp) + if err != nil { + log.Errorf( + ctx, + "error garbage collecting rangelog %v", + err, + ) + } + + if rowsAffected > 0 || gcNode == s.node.Descriptor.NodeID { + log.Infof(ctx, "garbage collected %d rows from rangelog", rowsAffected) + } + + if gcNode != 0 && gcNode != s.node.Descriptor.NodeID { + log.Infof(ctx, "n%d is currently responsible for rangelog gc", gcNode) + } + } + + if storeKnobs, ok := s.cfg.TestingKnobs.Store.(*storage.StoreTestingKnobs); ok && storeKnobs.RangelogGCDone != nil { + select { + case storeKnobs.RangelogGCDone <- struct{}{}: + case <-s.stopper.ShouldStop(): + // Test has finished + return + } + } + t.Reset(interval) + case <-s.stopper.ShouldStop(): + return + } + } + }) +} + // Start starts the server on the specified port, starts gossip and initializes // the node using the engines from the server's context. This is complex since // it sets up the listeners and the associated port muxing, but especially since @@ -1679,6 +1827,8 @@ func (s *Server) Start(ctx context.Context) error { }) } + s.startRangeLogGC(ctx) + // Record that this node joined the cluster in the event log. Since this // executes a SQL query, this must be done after the SQL layer is ready. s.node.recordJoinEvent() diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 7c1a673ca27b..2ab3eb9b7d7d 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -827,6 +827,16 @@ func (ts *TestServer) ExecutorConfig() interface{} { return *ts.execCfg } +// GCRangeLog deletes rows older than the given cutoffTimestamp from +// system.rangelog +// It returns the node currently responsible for performing GC, the number of +// rows affected and error (if any). +func (ts *TestServer) GCRangeLog( + ctx context.Context, cutoffTimestamp time.Time, +) (roachpb.NodeID, int, error) { + return ts.gcRangeLog(ctx, cutoffTimestamp) +} + type testServerFactoryImpl struct{} // TestServerFactory can be passed to serverutils.InitTestServerFactory diff --git a/pkg/storage/log_test.go b/pkg/storage/log_test.go index d7717711b1dc..19b223ac5d8f 100644 --- a/pkg/storage/log_test.go +++ b/pkg/storage/log_test.go @@ -20,8 +20,10 @@ import ( "encoding/json" "net/url" "testing" + "time" _ "github.com/lib/pq" + "github.com/stretchr/testify/assert" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/internal/client" @@ -395,3 +397,190 @@ func TestLogRebalances(t *testing.T) { t.Errorf("expected %d RemoveReplica events logged, found %d", e, a) } } + +func TestLogGC(t *testing.T) { + defer leaktest.AfterTest(t)() + a := assert.New(t) + storeKnobs := &storage.StoreTestingKnobs{} + params := base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: storeKnobs, + }, + } + + s, db, kvDB := serverutils.StartServer(t, params) + ts := s.(*server.TestServer) + ctx := context.Background() + defer s.Stopper().Stop(ctx) + const testRangeID = 10001 + const node1 = roachpb.NodeID(1) + store, pErr := ts.Stores().GetStore(ts.GetFirstStoreID()) + if pErr != nil { + t.Fatal(pErr) + } + + rangeLogRowCount := func() int { + var count int + err := db.QueryRowContext(ctx, + `SELECT count(*) FROM system.rangelog WHERE "rangeID" = $1`, + testRangeID, + ).Scan(&count) + if err != nil { + t.Fatal(err) + } + return count + } + + rangeLogMaxTS := func() time.Time { + var time time.Time + err := db.QueryRowContext(ctx, + `SELECT timestamp FROM system.rangelog WHERE "rangeID" = $1 ORDER by timestamp DESC LIMIT 1`, + testRangeID, + ).Scan(&time) + if err != nil { + t.Fatal(err) + } + return time + } + + logEvents := func(count int) { + for i := 0; i < count; i++ { + a.NoError(kvDB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { + return store.LogReplicaChangeTest( + ctx, + txn, + roachpb.ADD_REPLICA, + roachpb.ReplicaDescriptor{ + NodeID: 1, + StoreID: 1, + }, + roachpb.RangeDescriptor{ + RangeID: testRangeID, + }, + storage.ReasonUnknown, + "", // details + ) + })) + } + } + + // Assert 0 rows before inserting any events + a.Equal(0, rangeLogRowCount()) + // Insert 100 events with timestamp of up to maxTs1 + logEvents(100) + a.Equal(100, rangeLogRowCount()) + maxTs1 := rangeLogMaxTS() + // Insert 50 events with timestamp of up to maxTs2 + logEvents(50) + a.Equal(150, rangeLogRowCount()) + maxTs2 := rangeLogMaxTS() + // Insert 25 events with timestamp of up to maxTs3 + logEvents(25) + a.Equal(175, rangeLogRowCount()) + maxTs3 := rangeLogMaxTS() + + // GC up to maxTs1 + gcNode, rowsGCd, err := ts.GCRangeLog(ctx, maxTs1) + a.NoError(err) + a.Equal(node1, gcNode) + a.True(rowsGCd >= 100, "Expected rowsGCd >= 100, found %d", rowsGCd) + a.Equal(75, rangeLogRowCount()) + + // GC up to maxTs2 + // When GCNode is 2, the server (Node 1) should not GC. + storeKnobs.RangelogGCNode = 2 + gcNode, rowsGCd, err = ts.GCRangeLog(ctx, maxTs2) + a.NoError(err) + a.Equal(roachpb.NodeID(2), gcNode) + a.Equal(0, rowsGCd) + a.Equal(75, rangeLogRowCount()) + + // GCNode is 1 again. the server (Node 1) should GC. + storeKnobs.RangelogGCNode = 1 + gcNode, rowsGCd, err = ts.GCRangeLog(ctx, maxTs2) + a.NoError(err) + a.Equal(node1, gcNode) + a.True(rowsGCd >= 50, "Expected rowsGCd >= 50, found %d", rowsGCd) + a.Equal(25, rangeLogRowCount()) + // Insert 2000 more events + logEvents(2000) + a.Equal(2025, rangeLogRowCount()) + + // Skip overriding GCNode + storeKnobs.RangelogGCNode = 0 + // GC up to maxTs3 + gcNode, rowsGCd, err = ts.GCRangeLog(ctx, maxTs3) + a.Equal(node1, gcNode) + a.NoError(err) + a.True(rowsGCd >= 25, "Expected rowsGCd >= 25, found %d", rowsGCd) + a.Equal(2000, rangeLogRowCount()) + + // GC everything + gcNode, rowsGCd, err = ts.GCRangeLog(ctx, rangeLogMaxTS()) + a.Equal(node1, gcNode) + a.NoError(err) + a.True(rowsGCd >= 2000, "Expected rowsGCd >= 2000, found %d", rowsGCd) + a.Equal(0, rangeLogRowCount()) +} + +func TestLogGCTrigger(t *testing.T) { + defer leaktest.AfterTest(t)() + a := assert.New(t) + gcDone := make(chan struct{}) + params := base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &storage.StoreTestingKnobs{ + RangelogGCDone: gcDone, + }, + }, + } + s, db, _ := serverutils.StartServer(t, params) + ctx := context.Background() + defer s.Stopper().Stop(ctx) + + rangeLogRowCount := func(ts time.Time) int { + var count int + err := db.QueryRowContext(ctx, + `SELECT count(*) FROM system.rangelog WHERE timestamp <= $1`, + ts, + ).Scan(&count) + if err != nil { + t.Fatal(err) + } + return count + } + + rangeLogMaxTS := func() time.Time { + var time time.Time + err := db.QueryRowContext(ctx, + `SELECT timestamp FROM system.rangelog ORDER by timestamp DESC LIMIT 1`, + ).Scan(&time) + if err != nil { + t.Fatal(err) + } + return time + } + + maxTs := rangeLogMaxTS() + a.NotEqual(rangeLogRowCount(maxTs), 0, "Expected non zero number of events before %v", maxTs) + + server.RangeLogGCInterval.Override(&s.ClusterSettings().SV, time.Nanosecond) + // Reading gcDone once ensures that the previous gc is done + // (it could have been done long back and is waiting to send on this channel), + // and the next gc has started. + // Reading it twice guarantees that the next gc has also completed. + // Before running the assertions below one gc run has to be guaranteed. + <-gcDone + <-gcDone + a.NotEqual( + rangeLogRowCount(maxTs), + 0, + "Expected non zero number of events before %v as gc is not enabled", + maxTs, + ) + + server.RangeLogTTL.Override(&s.ClusterSettings().SV, time.Nanosecond) + <-gcDone + <-gcDone + a.Equal(0, rangeLogRowCount(maxTs), "Expected zero events before %v after gc", maxTs) +} diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 623eb2b5cde8..4a9eb54d4b4a 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -853,6 +853,10 @@ type StoreTestingKnobs struct { DisableLeaseCapacityGossip bool // BootstrapVersion overrides the version the stores will be bootstrapped with. BootstrapVersion *cluster.ClusterVersion + // RangelogGCNode is used to override the node performing rangelog GC. + RangelogGCNode roachpb.NodeID + // RangelogGCDone is used to notify when rangelog GC is done + RangelogGCDone chan<- struct{} } var _ base.ModuleTestingKnobs = &StoreTestingKnobs{}