From 440aa71ed51a73f348a3225940d7ae83b2cfeaf0 Mon Sep 17 00:00:00 2001 From: Andrii Vorobiov Date: Wed, 5 Oct 2022 20:16:22 +0300 Subject: [PATCH] server, tenant: hot ranges structured logging for all cluster types This patch introduces periodic logging of hot ranges info (ie QPS and relevant DB objects info). It is done for both dedicated and serverless cluster types, and by default called every 4h. Stats collection can be enabled/disabled, or changed interval through cluster settings. Release note: None --- docs/generated/eventlog.md | 28 ++++ pkg/BUILD.bazel | 4 + pkg/server/BUILD.bazel | 1 + pkg/server/server.go | 6 + pkg/server/structlogging/BUILD.bazel | 53 +++++++ pkg/server/structlogging/hot_ranges_log.go | 130 ++++++++++++++++++ .../structlogging/hot_ranges_log_test.go | 103 ++++++++++++++ pkg/server/structlogging/main_test.go | 33 +++++ pkg/server/tenant.go | 6 + pkg/sql/scheduledlogging/BUILD.bazel | 1 + .../captured_index_usage_stats.go | 28 +--- .../eventpb/eventlog_channels_generated.go | 3 + pkg/util/log/eventpb/json_encode_generated.go | 126 +++++++++++++++++ pkg/util/log/eventpb/telemetry.proto | 43 ++++++ pkg/util/log/logutil/BUILD.bazel | 2 + pkg/util/log/logutil/log_util.go | 26 ++++ 16 files changed, 567 insertions(+), 26 deletions(-) create mode 100644 pkg/server/structlogging/BUILD.bazel create mode 100644 pkg/server/structlogging/hot_ranges_log.go create mode 100644 pkg/server/structlogging/hot_ranges_log_test.go create mode 100644 pkg/server/structlogging/main_test.go diff --git a/docs/generated/eventlog.md b/docs/generated/eventlog.md index 80ff066b6399..ed32afb96cfc 100644 --- a/docs/generated/eventlog.md +++ b/docs/generated/eventlog.md @@ -2802,6 +2802,34 @@ ChangefeedFailed events. | `InitialScan` | The desired behavior of initial scans (ex: yes, no, only) | no | | `Format` | The data format being emitted (ex: JSON, Avro). | no | +### `hot_ranges_stats` + +An event of type `hot_ranges_stats` + + +| Field | Description | Sensitive | +|--|--|--| +| `RangeID` | | no | +| `Qps` | | no | +| `DatabaseName` | DatabaseName is the name of the database in which the index was created. | yes | +| `TableName` | TableName is the name of the table on which the index was created. | yes | +| `IndexName` | IndexName is the name of the index within the scope of the given table. | yes | +| `SchemaName` | SchemaName is the name of the schema in which the index was created. | yes | +| `LeaseholderNodeID` | LeaseholderNodeID indicates the Node ID that is the current leaseholder for the given range. | no | +| `WritesPerSecond` | Writes per second is the recent number of keys written per second on this range. | no | +| `ReadsPerSecond` | Reads per second is the recent number of keys read per second on this range. | no | +| `WriteBytesPerSecond` | Write bytes per second is the recent number of bytes written per second on this range. | no | +| `ReadBytesPerSecond` | Read bytes per second is the recent number of bytes read per second on this range. | no | +| `CPUTimePerSecond` | CPU time per second is the recent cpu usage in nanoseconds of this range. | no | + + +#### Common fields + +| Field | Description | Sensitive | +|--|--|--| +| `Timestamp` | The timestamp of the event. Expressed as nanoseconds since the Unix epoch. | no | +| `EventType` | The type of the event. | no | + ### `recovery_event` An event of type `recovery_event` is an event that is logged on every invocation of BACKUP, diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 5df060b50923..6c616e7f7abe 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -293,6 +293,7 @@ ALL_TESTS = [ "//pkg/server/serverrules:serverrules_test", "//pkg/server/settingswatcher:settingswatcher_test", "//pkg/server/status:status_test", + "//pkg/server/structlogging:structlogging_test", "//pkg/server/systemconfigwatcher:systemconfigwatcher_test", "//pkg/server/telemetry:telemetry_test", "//pkg/server/tenantsettingswatcher:tenantsettingswatcher_test", @@ -1475,6 +1476,8 @@ GO_TARGETS = [ "//pkg/server/status/statuspb:statuspb", "//pkg/server/status:status", "//pkg/server/status:status_test", + "//pkg/server/structlogging:structlogging", + "//pkg/server/structlogging:structlogging_test", "//pkg/server/systemconfigwatcher/systemconfigwatchertest:systemconfigwatchertest", "//pkg/server/systemconfigwatcher:systemconfigwatcher", "//pkg/server/systemconfigwatcher:systemconfigwatcher_test", @@ -2863,6 +2866,7 @@ GET_X_DATA_TARGETS = [ "//pkg/server/settingswatcher:get_x_data", "//pkg/server/status:get_x_data", "//pkg/server/status/statuspb:get_x_data", + "//pkg/server/structlogging:get_x_data", "//pkg/server/systemconfigwatcher:get_x_data", "//pkg/server/systemconfigwatcher/systemconfigwatchertest:get_x_data", "//pkg/server/telemetry:get_x_data", diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index e9a4c736e423..c388cebc8a9b 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -174,6 +174,7 @@ go_library( "//pkg/server/settingswatcher", "//pkg/server/status", "//pkg/server/status/statuspb", + "//pkg/server/structlogging", "//pkg/server/systemconfigwatcher", "//pkg/server/telemetry", "//pkg/server/tenantsettingswatcher", diff --git a/pkg/server/server.go b/pkg/server/server.go index 572ba9c6d49c..28630b1a8d47 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -68,6 +68,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/server/serverrules" "github.com/cockroachdb/cockroach/pkg/server/status" + "github.com/cockroachdb/cockroach/pkg/server/structlogging" "github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher" "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/server/tenantsettingswatcher" @@ -102,6 +103,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/goschedstats" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/log/logcrash" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/netutil" @@ -2053,6 +2055,10 @@ func (s *Server) AcceptClients(ctx context.Context) error { return err } + if logcrash.DiagnosticsReportingEnabled.Get(&s.ClusterSettings().SV) { + structlogging.StartHotRangesLoggingScheduler(ctx, s.stopper, s.status, *s.sqlServer.internalExecutor, s.ClusterSettings()) + } + s.sqlServer.isReady.Set(true) log.Event(ctx, "server ready") diff --git a/pkg/server/structlogging/BUILD.bazel b/pkg/server/structlogging/BUILD.bazel new file mode 100644 index 000000000000..3c1f6b034ad1 --- /dev/null +++ b/pkg/server/structlogging/BUILD.bazel @@ -0,0 +1,53 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "structlogging", + srcs = ["hot_ranges_log.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/server/structlogging", + visibility = ["//visibility:public"], + deps = [ + "//pkg/server/serverpb", + "//pkg/settings", + "//pkg/settings/cluster", + "//pkg/sql", + "//pkg/util/log", + "//pkg/util/log/eventpb", + "//pkg/util/log/logpb", + "//pkg/util/log/logutil", + "//pkg/util/stop", + "//pkg/util/timeutil", + ], +) + +go_test( + name = "structlogging_test", + srcs = [ + "hot_ranges_log_test.go", + "main_test.go", + ], + args = ["-test.timeout=295s"], + deps = [ + ":structlogging", + "//pkg/base", + "//pkg/ccl", + "//pkg/kv/kvserver", + "//pkg/roachpb", + "//pkg/security/securityassets", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/server/serverpb", + "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/testcluster", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/log/logcrash", + "//pkg/util/log/logtestutils", + "//pkg/util/randutil", + "@com_github_cockroachdb_errors//:errors", + "@com_github_stretchr_testify//require", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/server/structlogging/hot_ranges_log.go b/pkg/server/structlogging/hot_ranges_log.go new file mode 100644 index 000000000000..3c17e64200c1 --- /dev/null +++ b/pkg/server/structlogging/hot_ranges_log.go @@ -0,0 +1,130 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package structlogging + +import ( + "context" + "time" + + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" + "github.com/cockroachdb/cockroach/pkg/util/log/logpb" + "github.com/cockroachdb/cockroach/pkg/util/log/logutil" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" +) + +// ReportTopHottestRanges limits the number of ranges to be reported per iteration +const ReportTopHottestRanges = 5 + +var TelemetryHotRangesStatsInterval = settings.RegisterDurationSetting( + settings.TenantWritable, + "server.telemetry.hot_ranges_stats.interval", + "the time interval to log hot ranges stats", + 4*time.Hour, + settings.NonNegativeDuration, +) + +var TelemetryHotRangesStatsEnabled = settings.RegisterBoolSetting( + settings.TenantWritable, + "server.telemetry.hot_ranges_stats.enabled", + "enable/disable capturing hot ranges statistics to the telemetry logging channel", + true, +) + +var TelemetryHotRangesStatsLoggingDelay = settings.RegisterDurationSetting( + settings.TenantWritable, + "server.telemetry.hot_ranges_stats.logging_delay", + "the time delay between emitting individual hot ranges stats logs", + 1*time.Second, + settings.NonNegativeDuration, +) + +// hotRangesLoggingScheduler is responsible for logging index usage stats +// on a scheduled interval. +type hotRangesLoggingScheduler struct { + ie sql.InternalExecutor + sServer serverpb.TenantStatusServer + st *cluster.Settings +} + +// StartHotRangesLoggingScheduler starts the capture index usage statistics logging scheduler. +func StartHotRangesLoggingScheduler( + ctx context.Context, + stopper *stop.Stopper, + sServer serverpb.TenantStatusServer, + ie sql.InternalExecutor, + st *cluster.Settings, +) { + scheduler := hotRangesLoggingScheduler{ + ie: ie, + sServer: sServer, + st: st, + } + scheduler.start(ctx, stopper) +} + +func (s *hotRangesLoggingScheduler) start(ctx context.Context, stopper *stop.Stopper) { + _ = stopper.RunAsyncTask(ctx, "hot-ranges-stats", func(ctx context.Context) { + ticker := time.NewTicker(TelemetryHotRangesStatsInterval.Get(&s.st.SV)) + defer ticker.Stop() + + TelemetryHotRangesStatsInterval.SetOnChange(&s.st.SV, func(ctx context.Context) { + ticker.Reset(TelemetryHotRangesStatsInterval.Get(&s.st.SV)) + }) + + for { + select { + case <-stopper.ShouldQuiesce(): + return + case <-ctx.Done(): + return + case <-ticker.C: + if !TelemetryHotRangesStatsEnabled.Get(&s.st.SV) { + continue + } + resp, err := s.sServer.HotRangesV2(ctx, &serverpb.HotRangesRequest{PageSize: ReportTopHottestRanges}) + if err != nil { + log.Warningf(ctx, "failed to get hot ranges: %s", err) + continue + } + var events []logpb.EventPayload + ts := timeutil.Now().UnixNano() + + for _, r := range resp.Ranges { + hrEvent := &eventpb.HotRangesStats{ + RangeID: int64(r.RangeID), + Qps: r.QPS, + DatabaseName: r.DatabaseName, + SchemaName: r.SchemaName, + TableName: r.TableName, + IndexName: r.IndexName, + CPUTimePerSecond: r.CPUTimePerSecond, + ReadBytesPerSecond: r.ReadBytesPerSecond, + WriteBytesPerSecond: r.WriteBytesPerSecond, + ReadsPerSecond: r.ReadsPerSecond, + WritesPerSecond: r.WritesPerSecond, + LeaseholderNodeID: int32(r.LeaseholderNodeID), + CommonEventDetails: logpb.CommonEventDetails{ + Timestamp: ts, + }, + } + events = append(events, hrEvent) + } + logutil.LogEventsWithDelay(ctx, events, stopper, TelemetryHotRangesStatsLoggingDelay.Get(&s.st.SV)) + } + } + }) +} diff --git a/pkg/server/structlogging/hot_ranges_log_test.go b/pkg/server/structlogging/hot_ranges_log_test.go new file mode 100644 index 000000000000..3f1982b2ec67 --- /dev/null +++ b/pkg/server/structlogging/hot_ranges_log_test.go @@ -0,0 +1,103 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package structlogging_test + +import ( + "context" + "math" + "regexp" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/server/structlogging" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/log/logcrash" + "github.com/cockroachdb/cockroach/pkg/util/log/logtestutils" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +func TestHotRangesStats(t *testing.T) { + ctx := context.Background() + defer leaktest.AfterTest(t)() + ccl.TestingEnableEnterprise() + defer ccl.TestingDisableEnterprise() + sc := log.ScopeWithoutShowLogs(t) + defer sc.Close(t) + + cleanup := logtestutils.InstallTelemetryLogFileSink(sc, t) + defer cleanup() + + s, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + DefaultTestTenant: base.TestTenantDisabled, + StoreSpecs: []base.StoreSpec{ + base.DefaultTestStoreSpec, + base.DefaultTestStoreSpec, + base.DefaultTestStoreSpec, + }, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableReplicaRebalancing: true, + }, + }, + }) + defer s.Stopper().Stop(ctx) + + logcrash.DiagnosticsReportingEnabled.Override(ctx, &s.ClusterSettings().SV, true) + structlogging.TelemetryHotRangesStatsEnabled.Override(ctx, &s.ClusterSettings().SV, true) + structlogging.TelemetryHotRangesStatsInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Millisecond) + structlogging.TelemetryHotRangesStatsLoggingDelay.Override(ctx, &s.ClusterSettings().SV, 10*time.Millisecond) + + tenantID := roachpb.MustMakeTenantID(2) + tt, err := s.StartTenant(ctx, base.TestTenantArgs{ + TenantID: tenantID, + Settings: s.ClusterSettings(), + }) + require.NoError(t, err) + + testutils.SucceedsSoon(t, func() error { + ss := tt.TenantStatusServer().(serverpb.TenantStatusServer) + resp, err := ss.HotRangesV2(ctx, &serverpb.HotRangesRequest{TenantID: tenantID.String()}) + if err != nil { + return err + } + if len(resp.Ranges) == 0 { + return errors.New("waiting for hot ranges to be collected") + } + return nil + }) + + testutils.SucceedsWithin(t, func() error { + log.Flush() + entries, err := log.FetchEntriesFromFiles( + 0, + math.MaxInt64, + 10000, + regexp.MustCompile(`"EventType":"hot_ranges_stats"`), + log.WithMarkedSensitiveData, + ) + if err != nil { + return err + } + if len(entries) == 0 { + return errors.New("waiting for logs") + } + return nil + }, 5*time.Second) +} diff --git a/pkg/server/structlogging/main_test.go b/pkg/server/structlogging/main_test.go new file mode 100644 index 000000000000..80650e38f6e0 --- /dev/null +++ b/pkg/server/structlogging/main_test.go @@ -0,0 +1,33 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package structlogging_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/ccl" + "github.com/cockroachdb/cockroach/pkg/security/securityassets" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/randutil" +) + +func TestMain(m *testing.M) { + defer ccl.TestingEnableEnterprise()() + securityassets.SetLoader(securitytest.EmbeddedAssets) + randutil.SeedForTests() + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index ed9f50935563..6c66123d719a 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -47,6 +47,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/debug" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/server/status" + "github.com/cockroachdb/cockroach/pkg/server/structlogging" "github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" @@ -64,6 +65,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/log/logcrash" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/netutil" "github.com/cockroachdb/cockroach/pkg/util/schedulerlatency" @@ -847,6 +849,10 @@ func (s *SQLServerWrapper) AcceptClients(ctx context.Context) error { } } + if logcrash.DiagnosticsReportingEnabled.Get(&s.ClusterSettings().SV) { + structlogging.StartHotRangesLoggingScheduler(ctx, s.stopper, s.sqlServer.tenantConnect, *s.sqlServer.internalExecutor, s.ClusterSettings()) + } + s.sqlServer.isReady.Set(true) log.Event(ctx, "server ready") diff --git a/pkg/sql/scheduledlogging/BUILD.bazel b/pkg/sql/scheduledlogging/BUILD.bazel index c37f534a5d62..dcb8a64b4297 100644 --- a/pkg/sql/scheduledlogging/BUILD.bazel +++ b/pkg/sql/scheduledlogging/BUILD.bazel @@ -17,6 +17,7 @@ go_library( "//pkg/util/log", "//pkg/util/log/eventpb", "//pkg/util/log/logpb", + "//pkg/util/log/logutil", "//pkg/util/stop", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/sql/scheduledlogging/captured_index_usage_stats.go b/pkg/sql/scheduledlogging/captured_index_usage_stats.go index b349e022a4e9..3e3cde9c09ff 100644 --- a/pkg/sql/scheduledlogging/captured_index_usage_stats.go +++ b/pkg/sql/scheduledlogging/captured_index_usage_stats.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" "github.com/cockroachdb/cockroach/pkg/util/log/logpb" + "github.com/cockroachdb/cockroach/pkg/util/log/logutil" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -269,35 +270,10 @@ ORDER BY total_reads ASC` return err } } - logIndexUsageStatsWithDelay(ctx, allCapturedIndexUsageStats, stopper, loggingDelay) + logutil.LogEventsWithDelay(ctx, allCapturedIndexUsageStats, stopper, loggingDelay) return nil } -// logIndexUsageStatsWithDelay logs an eventpb.EventPayload at each -// telemetryCaptureIndexUsageStatsLoggingDelay to avoid exceeding the 10 -// log-line per second limit per node on the telemetry logging pipeline. -// Currently, this log-line limit is only shared with 1 other telemetry event, -// SampledQuery, which now has a logging frequency of 8 logs per second. -func logIndexUsageStatsWithDelay( - ctx context.Context, events []logpb.EventPayload, stopper *stop.Stopper, delay time.Duration, -) { - // Log the first event immediately. - timer := time.NewTimer(0 * time.Second) - defer timer.Stop() - for len(events) > 0 { - select { - case <-stopper.ShouldQuiesce(): - return - case <-timer.C: - event := events[0] - log.StructuredEvent(ctx, event) - events = events[1:] - // Apply a delay to subsequent events. - timer.Reset(delay) - } - } -} - func getAllDatabaseNames(ctx context.Context, ie isql.Executor) (tree.NameList, error) { var allDatabaseNames tree.NameList var ok bool diff --git a/pkg/util/log/eventpb/eventlog_channels_generated.go b/pkg/util/log/eventpb/eventlog_channels_generated.go index 602d18f3e09b..067e9054d7ed 100644 --- a/pkg/util/log/eventpb/eventlog_channels_generated.go +++ b/pkg/util/log/eventpb/eventlog_channels_generated.go @@ -325,6 +325,9 @@ func (m *ChangefeedFailed) LoggingChannel() logpb.Channel { return logpb.Channel // LoggingChannel implements the EventPayload interface. func (m *CreateChangefeed) LoggingChannel() logpb.Channel { return logpb.Channel_TELEMETRY } +// LoggingChannel implements the EventPayload interface. +func (m *HotRangesStats) LoggingChannel() logpb.Channel { return logpb.Channel_TELEMETRY } + // LoggingChannel implements the EventPayload interface. func (m *RecoveryEvent) LoggingChannel() logpb.Channel { return logpb.Channel_TELEMETRY } diff --git a/pkg/util/log/eventpb/json_encode_generated.go b/pkg/util/log/eventpb/json_encode_generated.go index ec1eb639fba3..8d22e58012b3 100644 --- a/pkg/util/log/eventpb/json_encode_generated.go +++ b/pkg/util/log/eventpb/json_encode_generated.go @@ -3054,6 +3054,132 @@ func (m *GrantRole) AppendJSONFields(printComma bool, b redact.RedactableBytes) return printComma, b } +// AppendJSONFields implements the EventPayload interface. +func (m *HotRangesStats) AppendJSONFields(printComma bool, b redact.RedactableBytes) (bool, redact.RedactableBytes) { + + printComma, b = m.CommonEventDetails.AppendJSONFields(printComma, b) + + if m.RangeID != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"RangeID\":"...) + b = strconv.AppendInt(b, int64(m.RangeID), 10) + } + + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"Qps\":"...) + b = strconv.AppendFloat(b, float64(m.Qps), 'f', -1, 64) + + if m.DatabaseName != "" { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"DatabaseName\":\""...) + b = append(b, redact.StartMarker()...) + b = redact.RedactableBytes(jsonbytes.EncodeString([]byte(b), string(redact.EscapeMarkers([]byte(m.DatabaseName))))) + b = append(b, redact.EndMarker()...) + b = append(b, '"') + } + + if m.TableName != "" { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"TableName\":\""...) + b = append(b, redact.StartMarker()...) + b = redact.RedactableBytes(jsonbytes.EncodeString([]byte(b), string(redact.EscapeMarkers([]byte(m.TableName))))) + b = append(b, redact.EndMarker()...) + b = append(b, '"') + } + + if m.IndexName != "" { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"IndexName\":\""...) + b = append(b, redact.StartMarker()...) + b = redact.RedactableBytes(jsonbytes.EncodeString([]byte(b), string(redact.EscapeMarkers([]byte(m.IndexName))))) + b = append(b, redact.EndMarker()...) + b = append(b, '"') + } + + if m.SchemaName != "" { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"SchemaName\":\""...) + b = append(b, redact.StartMarker()...) + b = redact.RedactableBytes(jsonbytes.EncodeString([]byte(b), string(redact.EscapeMarkers([]byte(m.SchemaName))))) + b = append(b, redact.EndMarker()...) + b = append(b, '"') + } + + if m.LeaseholderNodeID != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"LeaseholderNodeID\":"...) + b = strconv.AppendInt(b, int64(m.LeaseholderNodeID), 10) + } + + if m.WritesPerSecond != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"WritesPerSecond\":"...) + b = strconv.AppendFloat(b, float64(m.WritesPerSecond), 'f', -1, 64) + } + + if m.ReadsPerSecond != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"ReadsPerSecond\":"...) + b = strconv.AppendFloat(b, float64(m.ReadsPerSecond), 'f', -1, 64) + } + + if m.WriteBytesPerSecond != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"WriteBytesPerSecond\":"...) + b = strconv.AppendFloat(b, float64(m.WriteBytesPerSecond), 'f', -1, 64) + } + + if m.ReadBytesPerSecond != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"ReadBytesPerSecond\":"...) + b = strconv.AppendFloat(b, float64(m.ReadBytesPerSecond), 'f', -1, 64) + } + + if m.CPUTimePerSecond != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"CPUTimePerSecond\":"...) + b = strconv.AppendFloat(b, float64(m.CPUTimePerSecond), 'f', -1, 64) + } + + return printComma, b +} + // AppendJSONFields implements the EventPayload interface. func (m *Import) AppendJSONFields(printComma bool, b redact.RedactableBytes) (bool, redact.RedactableBytes) { diff --git a/pkg/util/log/eventpb/telemetry.proto b/pkg/util/log/eventpb/telemetry.proto index d6cac6cc507b..887da355d6b2 100644 --- a/pkg/util/log/eventpb/telemetry.proto +++ b/pkg/util/log/eventpb/telemetry.proto @@ -446,3 +446,46 @@ message SchemaSnapshotMetadata { // which includes the redaction of any potential PII. repeated string errors = 5 [(gogoproto.jsontag) = ",omitempty"]; } + +// HotRangesStats +message HotRangesStats { + CommonEventDetails common = 1 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "", (gogoproto.embed) = true]; + + int64 range_id = 2 [(gogoproto.jsontag) = ",omitempty", (gogoproto.customname) = "RangeID"]; + + double qps = 3 [(gogoproto.jsontag) = ",includeempty"]; + + // DatabaseName is the name of the database in which the index was created. + string database_name = 4 [(gogoproto.jsontag) = ",omitempty"]; + + // TableName is the name of the table on which the index was created. + string table_name = 5 [(gogoproto.jsontag) = ",omitempty"]; + + // IndexName is the name of the index within the scope of the given table. + string index_name = 6 [(gogoproto.jsontag) = ",omitempty"]; + + // SchemaName is the name of the schema in which the index was created. + string schema_name = 7 [(gogoproto.jsontag) = ",omitempty"]; + + // LeaseholderNodeID indicates the Node ID that is the current leaseholder for the given range. + int32 leaseholder_node_id = 8 [(gogoproto.customname) = "LeaseholderNodeID", (gogoproto.jsontag) = ",omitempty"]; + + // Writes per second is the recent number of keys written per second on + // this range. + double writes_per_second = 9 [(gogoproto.jsontag) = ",omitempty"]; + + // Reads per second is the recent number of keys read per second on + // this range. + double reads_per_second = 10 [(gogoproto.jsontag) = ",omitempty"]; + + // Write bytes per second is the recent number of bytes written per second on + // this range. + double write_bytes_per_second = 11 [(gogoproto.jsontag) = ",omitempty"]; + + // Read bytes per second is the recent number of bytes read per second on + // this range. + double read_bytes_per_second = 12 [(gogoproto.jsontag) = ",omitempty"]; + + // CPU time per second is the recent cpu usage in nanoseconds of this range. + double cpu_time_per_second = 13 [(gogoproto.customname) = "CPUTimePerSecond", (gogoproto.jsontag) = ",omitempty"]; +} diff --git a/pkg/util/log/logutil/BUILD.bazel b/pkg/util/log/logutil/BUILD.bazel index adf86971dd9e..92653761ac17 100644 --- a/pkg/util/log/logutil/BUILD.bazel +++ b/pkg/util/log/logutil/BUILD.bazel @@ -11,6 +11,8 @@ go_library( "//pkg/jobs/jobspb", "//pkg/util/log", "//pkg/util/log/eventpb", + "//pkg/util/log/logpb", + "//pkg/util/stop", "@com_github_cockroachdb_redact//:redact", ], ) diff --git a/pkg/util/log/logutil/log_util.go b/pkg/util/log/logutil/log_util.go index 884cf8e36386..59bc6ec65a9e 100644 --- a/pkg/util/log/logutil/log_util.go +++ b/pkg/util/log/logutil/log_util.go @@ -12,11 +12,14 @@ package logutil import ( "context" + "time" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" + "github.com/cockroachdb/cockroach/pkg/util/log/logpb" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/redact" ) @@ -53,3 +56,26 @@ func LogJobCompletion( log.StructuredEvent(ctx, event) } + +// LogEventsWithDelay logs an eventpb.EventPayload at provided +// delay duration to avoid exceeding the 10 log-line per second limit per node +// on the telemetry logging pipeline. +func LogEventsWithDelay( + ctx context.Context, events []logpb.EventPayload, stopper *stop.Stopper, delay time.Duration, +) { + // Log the first event immediately. + timer := time.NewTimer(0 * time.Second) + defer timer.Stop() + for len(events) > 0 { + select { + case <-stopper.ShouldQuiesce(): + return + case <-timer.C: + event := events[0] + log.StructuredEvent(ctx, event) + events = events[1:] + // Apply a delay to subsequent events. + timer.Reset(delay) + } + } +}