diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 63f9312fa5b2..29ec49b17c0f 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -46,9 +46,11 @@ server.clock.persist_upper_bound_intervalduration0sthe interval between persisting the wall time upper bound of the clock. The clock does not generate a wall time greater than the persisted timestamp and will panic if it sees a wall time greater than this value. When cockroach starts, it waits for the wall time to catch-up till this persisted timestamp. This guarantees monotonic wall time across server restarts. Not setting this or setting a value of 0 disables this feature. server.consistency_check.intervalduration24h0m0sthe time between range consistency checks; set to 0 to disable consistency checking server.declined_reservation_timeoutduration1sthe amount of time to consider the store throttled for up-replication after a reservation was declined +server.eventlog.ttlduration2160h0m0sif nonzero, event log entries older than this duration are deleted every 10m0s. Should not be lowered below 24 hours server.failed_reservation_timeoutduration5sthe amount of time to consider the store throttled for up-replication after a failed reservation call server.heap_profile.max_profilesinteger5maximum number of profiles to be kept. Profiles with lower score are GC'ed, but latest profile is always kept server.heap_profile.system_memory_threshold_fractionfloat0.85fraction of system memory beyond which if Rss increases, then heap profile is triggered +server.rangelog.ttlduration720h0m0sif nonzero, range log entries older than this duration are deleted every 10m0s. Should not be lowered below 24 hours server.remote_debugging.modestringlocalset to enable remote debugging, localhost-only or disable (any, local, off) server.shutdown.drain_waitduration0sthe amount of time a server waits in an unready state before proceeding with the rest of the shutdown process server.shutdown.query_waitduration10sthe server will wait for at least this amount of time for active queries to finish diff --git a/pkg/server/server.go b/pkg/server/server.go index 2a2cacc2596c..8b9aea70a12b 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1680,6 +1680,8 @@ func (s *Server) Start(ctx context.Context) error { }) } + s.startSystemLogsGC(ctx) + // Record that this node joined the cluster in the event log. Since this // executes a SQL query, this must be done after the SQL layer is ready. s.node.recordJoinEvent() diff --git a/pkg/server/server_systemlog_gc.go b/pkg/server/server_systemlog_gc.go new file mode 100644 index 000000000000..66100bdfaadd --- /dev/null +++ b/pkg/server/server_systemlog_gc.go @@ -0,0 +1,217 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package server + +import ( + "context" + "fmt" + "time" + + "github.com/pkg/errors" + + "github.com/cockroachdb/cockroach/pkg/internal/client" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" +) + +const ( + // systemLogGCPeriod is the period for running gc on systemlog tables. + systemLogGCPeriod = 10 * time.Minute +) + +var ( + // rangeLogTTL is the TTL for rows in system.rangelog. If non zero, range log + // entries are periodically garbage collected. + rangeLogTTL = settings.RegisterDurationSetting( + "server.rangelog.ttl", + fmt.Sprintf( + "if nonzero, range log entries older than this duration are deleted every %s. "+ + "Should not be lowered below 24 hours", + systemLogGCPeriod, + ), + 0, // disabled + ) + + // eventLogTTL is the TTL for rows in system.eventlog. If non zero, event log + // entries are periodically garbage collected. + eventLogTTL = settings.RegisterDurationSetting( + "server.eventlog.ttl", + fmt.Sprintf( + "if nonzero, event log entries older than this duration are deleted every %s. "+ + "Should not be lowered below 24 hours", + systemLogGCPeriod, + ), + 0, // disabled + ) +) + +// gcSystemLog deletes entries in the given system log table between +// timestampLowerBound and timestampUpperBound if the server is the lease holder +// for range 1. +// Leaseholder constraint is present so that only one node in the cluster +// performs gc. +// The system log table is expected to have a "timestamp" column. +// It returns the timestampLowerBound to be used in the next iteration, number +// of rows affected and error (if any). +func (s *Server) gcSystemLog( + ctx context.Context, table string, timestampLowerBound, timestampUpperBound time.Time, +) (time.Time, int64, error) { + var totalRowsAffected int64 + repl, err := s.node.stores.GetReplicaForRangeID(roachpb.RangeID(1)) + if err != nil { + return timestampLowerBound, 0, nil + } + + if !repl.IsFirstRange() || !repl.OwnsValidLease(s.clock.Now()) { + return timestampLowerBound, 0, nil + } + + deleteStmt := fmt.Sprintf( + `SELECT count(1), max(timestamp) FROM +[DELETE FROM system.%s WHERE timestamp >= $1 AND timestamp <= $2 LIMIT 1000 RETURNING timestamp]`, + table, + ) + + for { + var rowsAffected int64 + err := s.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { + var err error + row, err := s.internalExecutor.QueryRow( + ctx, + table+"-gc", + txn, + deleteStmt, + timestampLowerBound, + timestampUpperBound, + ) + if err != nil { + return err + } + + if row != nil { + rowCount, ok := row[0].(*tree.DInt) + if !ok { + return errors.Errorf("row count is of unknown type %T", row[0]) + } + if rowCount == nil { + return errors.New("error parsing row count") + } + rowsAffected = int64(*rowCount) + + if rowsAffected > 0 { + maxTimestamp, ok := row[1].(*tree.DTimestamp) + if !ok { + return errors.Errorf("timestamp is of unknown type %T", row[1]) + } + if maxTimestamp == nil { + return errors.New("error parsing timestamp") + } + timestampLowerBound = maxTimestamp.Time + } + } + return nil + }) + totalRowsAffected += rowsAffected + if err != nil { + return timestampLowerBound, totalRowsAffected, err + } + + if rowsAffected == 0 { + return timestampUpperBound, totalRowsAffected, nil + } + } +} + +// systemLogGCConfig has configurations for gc of systemlog. +type systemLogGCConfig struct { + // ttl is the time to live for rows in systemlog table. + ttl *settings.DurationSetting + // timestampLowerBound is the timestamp below which rows are gc'ed. + // It is maintained to avoid hitting tombstones during gc and is updated + // after every gc run. + timestampLowerBound time.Time +} + +// startSystemLogsGC starts a worker which periodically GCs system.rangelog +// and system.eventlog. +// The TTLs for each of these logs is retrieved from cluster settings. +func (s *Server) startSystemLogsGC(ctx context.Context) { + systemLogsToGC := map[string]*systemLogGCConfig{ + "rangelog": { + ttl: rangeLogTTL, + timestampLowerBound: timeutil.Unix(0, 0), + }, + "eventlog": { + ttl: eventLogTTL, + timestampLowerBound: timeutil.Unix(0, 0), + }, + } + + s.stopper.RunWorker(ctx, func(ctx context.Context) { + period := systemLogGCPeriod + if storeKnobs, ok := s.cfg.TestingKnobs.Store.(*storage.StoreTestingKnobs); ok && storeKnobs.SystemLogsGCPeriod != 0 { + period = storeKnobs.SystemLogsGCPeriod + } + + t := time.NewTicker(period) + defer t.Stop() + + for { + select { + case <-t.C: + for table, gcConfig := range systemLogsToGC { + ttl := gcConfig.ttl.Get(&s.cfg.Settings.SV) + if ttl > 0 { + timestampUpperBound := timeutil.Unix(0, s.clock.PhysicalNow()-int64(ttl)) + newTimestampLowerBound, rowsAffected, err := s.gcSystemLog( + ctx, + table, + gcConfig.timestampLowerBound, + timestampUpperBound, + ) + if err != nil { + log.Warningf( + ctx, + "error garbage collecting %s: %v", + table, + err, + ) + } else { + gcConfig.timestampLowerBound = newTimestampLowerBound + if log.V(1) { + log.Infof(ctx, "garbage collected %d rows from %s", rowsAffected, table) + } + } + } + } + + if storeKnobs, ok := s.cfg.TestingKnobs.Store.(*storage.StoreTestingKnobs); ok && storeKnobs.SystemLogsGCGCDone != nil { + select { + case storeKnobs.SystemLogsGCGCDone <- struct{}{}: + case <-s.stopper.ShouldStop(): + // Test has finished. + return + } + } + case <-s.stopper.ShouldStop(): + return + } + } + }) +} diff --git a/pkg/server/server_systemlog_gc_test.go b/pkg/server/server_systemlog_gc_test.go new file mode 100644 index 000000000000..5dc4c078f3fb --- /dev/null +++ b/pkg/server/server_systemlog_gc_test.go @@ -0,0 +1,224 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package server + +import ( + "context" + gosql "database/sql" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" +) + +func TestLogGC(t *testing.T) { + defer leaktest.AfterTest(t)() + a := assert.New(t) + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) + ts := s.(*TestServer) + ctx := context.Background() + defer s.Stopper().Stop(ctx) + const testRangeID = 10001 + const table = "rangelog" + + rangeLogRowCount := func() int { + var count int + err := db.QueryRowContext(ctx, + `SELECT count(*) FROM system.rangelog WHERE "rangeID" = $1`, + testRangeID, + ).Scan(&count) + if err != nil { + t.Fatal(err) + } + return count + } + + logEvents := func(count int, timestamp time.Time) { + for i := 0; i < count; i++ { + _, err := db.Exec( + `INSERT INTO system.rangelog ( + timestamp, "rangeID", "storeID", "eventType" + ) VALUES ( + $1, $2, $3, $4 + )`, + timestamp, + testRangeID, + 1, // storeID + storage.RangeLogEventType_add, + ) + a.NoError(err) + } + } + maxTS1 := timeutil.Now() + maxTS2 := maxTS1.Add(time.Second) + maxTS3 := maxTS2.Add(time.Second) + maxTS4 := maxTS3.Add(time.Second) + maxTS5 := maxTS4.Add(time.Hour) + maxTS6 := maxTS5.Add(time.Hour) + + // Assert 0 rows before inserting any events. + a.Equal(0, rangeLogRowCount()) + // Insert 100 events with timestamp of up to maxTS1. + logEvents(100, maxTS1) + a.Equal(100, rangeLogRowCount()) + // Insert 1 event with timestamp of up to maxTS2. + logEvents(1, maxTS2) + // Insert 49 event with timestamp of up to maxTS3. + logEvents(49, maxTS3) + a.Equal(150, rangeLogRowCount()) + // Insert 25 events with timestamp of up to maxTS4. + logEvents(25, maxTS4) + a.Equal(175, rangeLogRowCount()) + + // GC up to maxTS1. + tm, rowsGCd, err := ts.GCSystemLog(ctx, table, timeutil.Unix(0, 0), maxTS1) + a.NoError(err) + a.Equal(maxTS1, tm) + a.True(rowsGCd >= 100, "Expected rowsGCd >= 100, found %d", rowsGCd) + a.Equal(75, rangeLogRowCount()) + + // GC exactly maxTS2. + tm, rowsGCd, err = ts.GCSystemLog(ctx, table, maxTS2, maxTS2) + a.NoError(err) + a.Equal(maxTS2, tm) + a.True(rowsGCd >= 1, "Expected rowsGCd >= 1, found %d", rowsGCd) + a.Equal(74, rangeLogRowCount()) + + // GC upto maxTS2. + tm, rowsGCd, err = ts.GCSystemLog(ctx, table, maxTS1, maxTS3) + a.NoError(err) + a.Equal(maxTS3, tm) + a.True(rowsGCd >= 49, "Expected rowsGCd >= 49, found %d", rowsGCd) + a.Equal(25, rangeLogRowCount()) + // Insert 2000 more events. + logEvents(2000, maxTS5) + a.Equal(2025, rangeLogRowCount()) + + // GC up to maxTS4. + tm, rowsGCd, err = ts.GCSystemLog(ctx, table, maxTS2, maxTS4) + a.NoError(err) + a.Equal(maxTS4, tm) + a.True(rowsGCd >= 25, "Expected rowsGCd >= 25, found %d", rowsGCd) + a.Equal(2000, rangeLogRowCount()) + + // GC everything. + tm, rowsGCd, err = ts.GCSystemLog(ctx, table, maxTS4, maxTS5) + a.NoError(err) + a.Equal(maxTS5, tm) + a.True(rowsGCd >= 2000, "Expected rowsGCd >= 2000, found %d", rowsGCd) + a.Equal(0, rangeLogRowCount()) + + // Ensure no errors when lowerBound > upperBound. + logEvents(5, maxTS6) + tm, rowsGCd, err = ts.GCSystemLog(ctx, table, maxTS6.Add(time.Hour), maxTS6) + a.NoError(err) + a.Equal(maxTS6, tm) + a.Equal(int64(0), rowsGCd) + a.Equal(5, rangeLogRowCount()) +} + +func TestLogGCTrigger(t *testing.T) { + defer leaktest.AfterTest(t)() + systemLogRowCount := func(ctx context.Context, db *gosql.DB, table string, ts time.Time) int { + var count int + err := db.QueryRowContext(ctx, + fmt.Sprintf(`SELECT count(*) FROM system.%s WHERE timestamp <= $1`, table), + ts, + ).Scan(&count) + if err != nil { + t.Fatal(err) + } + return count + } + + systemLogMaxTS := func(ctx context.Context, db *gosql.DB, table string) time.Time { + var time time.Time + err := db.QueryRowContext(ctx, + fmt.Sprintf(`SELECT timestamp FROM system.%s ORDER by timestamp DESC LIMIT 1`, table), + ).Scan(&time) + if err != nil { + t.Fatal(err) + } + return time + } + + testCases := []struct { + table string + setting *settings.DurationSetting + }{ + { + table: "rangelog", + setting: rangeLogTTL, + }, + { + table: "eventlog", + setting: eventLogTTL, + }, + } + + gcDone := make(chan struct{}) + storeKnobs := &storage.StoreTestingKnobs{ + SystemLogsGCGCDone: gcDone, + SystemLogsGCPeriod: time.Nanosecond, + } + + params := base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: storeKnobs, + }, + } + + s, db, _ := serverutils.StartServer(t, params) + ctx := context.Background() + defer s.Stopper().Stop(ctx) + + for _, tc := range testCases { + t.Run(tc.table, func(t *testing.T) { + a := assert.New(t) + maxTS := systemLogMaxTS(ctx, db, tc.table) + a.NotEqual(systemLogRowCount(ctx, db, tc.table, maxTS), 0, "Expected non zero number of rows before %v", maxTS) + + // Reading gcDone once ensures that the previous gc is done + // (it could have been done long back and is waiting to send on this channel), + // and the next gc has started. + // Reading it twice guarantees that the next gc has also completed. + // Before running the assertions below one gc run has to be guaranteed. + <-gcDone + <-gcDone + a.NotEqual( + systemLogRowCount(ctx, db, tc.table, maxTS), + 0, + "Expected non zero number of events before %v as gc is not enabled", + maxTS, + ) + + _, err := db.Exec(fmt.Sprintf("SET CLUSTER SETTING server.%s.ttl='1ns'", tc.table)) + a.NoError(err) + time.Sleep(time.Second) + + <-gcDone + <-gcDone + a.Equal(0, systemLogRowCount(ctx, db, tc.table, maxTS), "Expected zero events before %v after gc", maxTS) + }) + } +} diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 241c75bfb64e..665089afd609 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -823,6 +823,20 @@ func (ts *TestServer) ExecutorConfig() interface{} { return *ts.execCfg } +// GCSystemLog deletes entries in the given system log table between +// timestamp and timestampUpperBound if the server is the lease holder +// for range 1. +// Leaseholder constraint is present so that only one node in the cluster +// performs gc. +// The system log table is expected to have a "timestamp" column. +// It returns the timestampLowerBound to be used in the next iteration, number +// of rows affected and error (if any). +func (ts *TestServer) GCSystemLog( + ctx context.Context, table string, timestampLowerBound, timestampUpperBound time.Time, +) (time.Time, int64, error) { + return ts.gcSystemLog(ctx, table, timestampLowerBound, timestampUpperBound) +} + type testServerFactoryImpl struct{} // TestServerFactory can be passed to serverutils.InitTestServerFactory diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 7a4db1c0e39e..65cfd3267da3 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -851,6 +851,10 @@ type StoreTestingKnobs struct { DisableLeaseCapacityGossip bool // BootstrapVersion overrides the version the stores will be bootstrapped with. BootstrapVersion *cluster.ClusterVersion + // SystemLogsGCPeriod is used to override the period of GC of system logs. + SystemLogsGCPeriod time.Duration + // SystemLogsGCGCDone is used to notify when system logs GC is done. + SystemLogsGCGCDone chan<- struct{} } var _ base.ModuleTestingKnobs = &StoreTestingKnobs{}