diff --git a/docs/generated/eventlog.md b/docs/generated/eventlog.md index 80ff066b6399..ed32afb96cfc 100644 --- a/docs/generated/eventlog.md +++ b/docs/generated/eventlog.md @@ -2802,6 +2802,34 @@ ChangefeedFailed events. | `InitialScan` | The desired behavior of initial scans (ex: yes, no, only) | no | | `Format` | The data format being emitted (ex: JSON, Avro). | no | +### `hot_ranges_stats` + +An event of type `hot_ranges_stats` + + +| Field | Description | Sensitive | +|--|--|--| +| `RangeID` | | no | +| `Qps` | | no | +| `DatabaseName` | DatabaseName is the name of the database in which the index was created. | yes | +| `TableName` | TableName is the name of the table on which the index was created. | yes | +| `IndexName` | IndexName is the name of the index within the scope of the given table. | yes | +| `SchemaName` | SchemaName is the name of the schema in which the index was created. | yes | +| `LeaseholderNodeID` | LeaseholderNodeID indicates the Node ID that is the current leaseholder for the given range. | no | +| `WritesPerSecond` | Writes per second is the recent number of keys written per second on this range. | no | +| `ReadsPerSecond` | Reads per second is the recent number of keys read per second on this range. | no | +| `WriteBytesPerSecond` | Write bytes per second is the recent number of bytes written per second on this range. | no | +| `ReadBytesPerSecond` | Read bytes per second is the recent number of bytes read per second on this range. | no | +| `CPUTimePerSecond` | CPU time per second is the recent cpu usage in nanoseconds of this range. | no | + + +#### Common fields + +| Field | Description | Sensitive | +|--|--|--| +| `Timestamp` | The timestamp of the event. Expressed as nanoseconds since the Unix epoch. | no | +| `EventType` | The type of the event. | no | + ### `recovery_event` An event of type `recovery_event` is an event that is logged on every invocation of BACKUP, diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 8be6c838e6d9..dd9db07b448f 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -293,6 +293,7 @@ ALL_TESTS = [ "//pkg/server/serverrules:serverrules_test", "//pkg/server/settingswatcher:settingswatcher_test", "//pkg/server/status:status_test", + "//pkg/server/structlogging:structlogging_test", "//pkg/server/systemconfigwatcher:systemconfigwatcher_test", "//pkg/server/telemetry:telemetry_test", "//pkg/server/tenantsettingswatcher:tenantsettingswatcher_test", @@ -1474,6 +1475,8 @@ GO_TARGETS = [ "//pkg/server/status/statuspb:statuspb", "//pkg/server/status:status", "//pkg/server/status:status_test", + "//pkg/server/structlogging:structlogging", + "//pkg/server/structlogging:structlogging_test", "//pkg/server/systemconfigwatcher/systemconfigwatchertest:systemconfigwatchertest", "//pkg/server/systemconfigwatcher:systemconfigwatcher", "//pkg/server/systemconfigwatcher:systemconfigwatcher_test", @@ -2862,6 +2865,7 @@ GET_X_DATA_TARGETS = [ "//pkg/server/settingswatcher:get_x_data", "//pkg/server/status:get_x_data", "//pkg/server/status/statuspb:get_x_data", + "//pkg/server/structlogging:get_x_data", "//pkg/server/systemconfigwatcher:get_x_data", "//pkg/server/systemconfigwatcher/systemconfigwatchertest:get_x_data", "//pkg/server/telemetry:get_x_data", diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index e9a4c736e423..c388cebc8a9b 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -174,6 +174,7 @@ go_library( "//pkg/server/settingswatcher", "//pkg/server/status", "//pkg/server/status/statuspb", + "//pkg/server/structlogging", "//pkg/server/systemconfigwatcher", "//pkg/server/telemetry", "//pkg/server/tenantsettingswatcher", diff --git a/pkg/server/server.go b/pkg/server/server.go index 572ba9c6d49c..28630b1a8d47 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -68,6 +68,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/server/serverrules" "github.com/cockroachdb/cockroach/pkg/server/status" + "github.com/cockroachdb/cockroach/pkg/server/structlogging" "github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher" "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/server/tenantsettingswatcher" @@ -102,6 +103,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/goschedstats" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/log/logcrash" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/netutil" @@ -2053,6 +2055,10 @@ func (s *Server) AcceptClients(ctx context.Context) error { return err } + if logcrash.DiagnosticsReportingEnabled.Get(&s.ClusterSettings().SV) { + structlogging.StartHotRangesLoggingScheduler(ctx, s.stopper, s.status, *s.sqlServer.internalExecutor, s.ClusterSettings()) + } + s.sqlServer.isReady.Set(true) log.Event(ctx, "server ready") diff --git a/pkg/server/structlogging/BUILD.bazel b/pkg/server/structlogging/BUILD.bazel new file mode 100644 index 000000000000..3c1f6b034ad1 --- /dev/null +++ b/pkg/server/structlogging/BUILD.bazel @@ -0,0 +1,53 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "structlogging", + srcs = ["hot_ranges_log.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/server/structlogging", + visibility = ["//visibility:public"], + deps = [ + "//pkg/server/serverpb", + "//pkg/settings", + "//pkg/settings/cluster", + "//pkg/sql", + "//pkg/util/log", + "//pkg/util/log/eventpb", + "//pkg/util/log/logpb", + "//pkg/util/log/logutil", + "//pkg/util/stop", + "//pkg/util/timeutil", + ], +) + +go_test( + name = "structlogging_test", + srcs = [ + "hot_ranges_log_test.go", + "main_test.go", + ], + args = ["-test.timeout=295s"], + deps = [ + ":structlogging", + "//pkg/base", + "//pkg/ccl", + "//pkg/kv/kvserver", + "//pkg/roachpb", + "//pkg/security/securityassets", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/server/serverpb", + "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/testcluster", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/log/logcrash", + "//pkg/util/log/logtestutils", + "//pkg/util/randutil", + "@com_github_cockroachdb_errors//:errors", + "@com_github_stretchr_testify//require", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/server/structlogging/hot_ranges_log.go b/pkg/server/structlogging/hot_ranges_log.go new file mode 100644 index 000000000000..3c17e64200c1 --- /dev/null +++ b/pkg/server/structlogging/hot_ranges_log.go @@ -0,0 +1,130 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package structlogging + +import ( + "context" + "time" + + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" + "github.com/cockroachdb/cockroach/pkg/util/log/logpb" + "github.com/cockroachdb/cockroach/pkg/util/log/logutil" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" +) + +// ReportTopHottestRanges limits the number of ranges to be reported per iteration +const ReportTopHottestRanges = 5 + +var TelemetryHotRangesStatsInterval = settings.RegisterDurationSetting( + settings.TenantWritable, + "server.telemetry.hot_ranges_stats.interval", + "the time interval to log hot ranges stats", + 4*time.Hour, + settings.NonNegativeDuration, +) + +var TelemetryHotRangesStatsEnabled = settings.RegisterBoolSetting( + settings.TenantWritable, + "server.telemetry.hot_ranges_stats.enabled", + "enable/disable capturing hot ranges statistics to the telemetry logging channel", + true, +) + +var TelemetryHotRangesStatsLoggingDelay = settings.RegisterDurationSetting( + settings.TenantWritable, + "server.telemetry.hot_ranges_stats.logging_delay", + "the time delay between emitting individual hot ranges stats logs", + 1*time.Second, + settings.NonNegativeDuration, +) + +// hotRangesLoggingScheduler is responsible for logging index usage stats +// on a scheduled interval. +type hotRangesLoggingScheduler struct { + ie sql.InternalExecutor + sServer serverpb.TenantStatusServer + st *cluster.Settings +} + +// StartHotRangesLoggingScheduler starts the capture index usage statistics logging scheduler. +func StartHotRangesLoggingScheduler( + ctx context.Context, + stopper *stop.Stopper, + sServer serverpb.TenantStatusServer, + ie sql.InternalExecutor, + st *cluster.Settings, +) { + scheduler := hotRangesLoggingScheduler{ + ie: ie, + sServer: sServer, + st: st, + } + scheduler.start(ctx, stopper) +} + +func (s *hotRangesLoggingScheduler) start(ctx context.Context, stopper *stop.Stopper) { + _ = stopper.RunAsyncTask(ctx, "hot-ranges-stats", func(ctx context.Context) { + ticker := time.NewTicker(TelemetryHotRangesStatsInterval.Get(&s.st.SV)) + defer ticker.Stop() + + TelemetryHotRangesStatsInterval.SetOnChange(&s.st.SV, func(ctx context.Context) { + ticker.Reset(TelemetryHotRangesStatsInterval.Get(&s.st.SV)) + }) + + for { + select { + case <-stopper.ShouldQuiesce(): + return + case <-ctx.Done(): + return + case <-ticker.C: + if !TelemetryHotRangesStatsEnabled.Get(&s.st.SV) { + continue + } + resp, err := s.sServer.HotRangesV2(ctx, &serverpb.HotRangesRequest{PageSize: ReportTopHottestRanges}) + if err != nil { + log.Warningf(ctx, "failed to get hot ranges: %s", err) + continue + } + var events []logpb.EventPayload + ts := timeutil.Now().UnixNano() + + for _, r := range resp.Ranges { + hrEvent := &eventpb.HotRangesStats{ + RangeID: int64(r.RangeID), + Qps: r.QPS, + DatabaseName: r.DatabaseName, + SchemaName: r.SchemaName, + TableName: r.TableName, + IndexName: r.IndexName, + CPUTimePerSecond: r.CPUTimePerSecond, + ReadBytesPerSecond: r.ReadBytesPerSecond, + WriteBytesPerSecond: r.WriteBytesPerSecond, + ReadsPerSecond: r.ReadsPerSecond, + WritesPerSecond: r.WritesPerSecond, + LeaseholderNodeID: int32(r.LeaseholderNodeID), + CommonEventDetails: logpb.CommonEventDetails{ + Timestamp: ts, + }, + } + events = append(events, hrEvent) + } + logutil.LogEventsWithDelay(ctx, events, stopper, TelemetryHotRangesStatsLoggingDelay.Get(&s.st.SV)) + } + } + }) +} diff --git a/pkg/server/structlogging/hot_ranges_log_test.go b/pkg/server/structlogging/hot_ranges_log_test.go new file mode 100644 index 000000000000..3f1982b2ec67 --- /dev/null +++ b/pkg/server/structlogging/hot_ranges_log_test.go @@ -0,0 +1,103 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package structlogging_test + +import ( + "context" + "math" + "regexp" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/server/structlogging" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/log/logcrash" + "github.com/cockroachdb/cockroach/pkg/util/log/logtestutils" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +func TestHotRangesStats(t *testing.T) { + ctx := context.Background() + defer leaktest.AfterTest(t)() + ccl.TestingEnableEnterprise() + defer ccl.TestingDisableEnterprise() + sc := log.ScopeWithoutShowLogs(t) + defer sc.Close(t) + + cleanup := logtestutils.InstallTelemetryLogFileSink(sc, t) + defer cleanup() + + s, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + DefaultTestTenant: base.TestTenantDisabled, + StoreSpecs: []base.StoreSpec{ + base.DefaultTestStoreSpec, + base.DefaultTestStoreSpec, + base.DefaultTestStoreSpec, + }, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableReplicaRebalancing: true, + }, + }, + }) + defer s.Stopper().Stop(ctx) + + logcrash.DiagnosticsReportingEnabled.Override(ctx, &s.ClusterSettings().SV, true) + structlogging.TelemetryHotRangesStatsEnabled.Override(ctx, &s.ClusterSettings().SV, true) + structlogging.TelemetryHotRangesStatsInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Millisecond) + structlogging.TelemetryHotRangesStatsLoggingDelay.Override(ctx, &s.ClusterSettings().SV, 10*time.Millisecond) + + tenantID := roachpb.MustMakeTenantID(2) + tt, err := s.StartTenant(ctx, base.TestTenantArgs{ + TenantID: tenantID, + Settings: s.ClusterSettings(), + }) + require.NoError(t, err) + + testutils.SucceedsSoon(t, func() error { + ss := tt.TenantStatusServer().(serverpb.TenantStatusServer) + resp, err := ss.HotRangesV2(ctx, &serverpb.HotRangesRequest{TenantID: tenantID.String()}) + if err != nil { + return err + } + if len(resp.Ranges) == 0 { + return errors.New("waiting for hot ranges to be collected") + } + return nil + }) + + testutils.SucceedsWithin(t, func() error { + log.Flush() + entries, err := log.FetchEntriesFromFiles( + 0, + math.MaxInt64, + 10000, + regexp.MustCompile(`"EventType":"hot_ranges_stats"`), + log.WithMarkedSensitiveData, + ) + if err != nil { + return err + } + if len(entries) == 0 { + return errors.New("waiting for logs") + } + return nil + }, 5*time.Second) +} diff --git a/pkg/server/structlogging/main_test.go b/pkg/server/structlogging/main_test.go new file mode 100644 index 000000000000..80650e38f6e0 --- /dev/null +++ b/pkg/server/structlogging/main_test.go @@ -0,0 +1,33 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package structlogging_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/ccl" + "github.com/cockroachdb/cockroach/pkg/security/securityassets" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/randutil" +) + +func TestMain(m *testing.M) { + defer ccl.TestingEnableEnterprise()() + securityassets.SetLoader(securitytest.EmbeddedAssets) + randutil.SeedForTests() + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index ed9f50935563..6c66123d719a 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -47,6 +47,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/debug" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/server/status" + "github.com/cockroachdb/cockroach/pkg/server/structlogging" "github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" @@ -64,6 +65,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/log/logcrash" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/netutil" "github.com/cockroachdb/cockroach/pkg/util/schedulerlatency" @@ -847,6 +849,10 @@ func (s *SQLServerWrapper) AcceptClients(ctx context.Context) error { } } + if logcrash.DiagnosticsReportingEnabled.Get(&s.ClusterSettings().SV) { + structlogging.StartHotRangesLoggingScheduler(ctx, s.stopper, s.sqlServer.tenantConnect, *s.sqlServer.internalExecutor, s.ClusterSettings()) + } + s.sqlServer.isReady.Set(true) log.Event(ctx, "server ready") diff --git a/pkg/sql/scheduledlogging/BUILD.bazel b/pkg/sql/scheduledlogging/BUILD.bazel index c37f534a5d62..dcb8a64b4297 100644 --- a/pkg/sql/scheduledlogging/BUILD.bazel +++ b/pkg/sql/scheduledlogging/BUILD.bazel @@ -17,6 +17,7 @@ go_library( "//pkg/util/log", "//pkg/util/log/eventpb", "//pkg/util/log/logpb", + "//pkg/util/log/logutil", "//pkg/util/stop", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/sql/scheduledlogging/captured_index_usage_stats.go b/pkg/sql/scheduledlogging/captured_index_usage_stats.go index b349e022a4e9..3e3cde9c09ff 100644 --- a/pkg/sql/scheduledlogging/captured_index_usage_stats.go +++ b/pkg/sql/scheduledlogging/captured_index_usage_stats.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" "github.com/cockroachdb/cockroach/pkg/util/log/logpb" + "github.com/cockroachdb/cockroach/pkg/util/log/logutil" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -269,35 +270,10 @@ ORDER BY total_reads ASC` return err } } - logIndexUsageStatsWithDelay(ctx, allCapturedIndexUsageStats, stopper, loggingDelay) + logutil.LogEventsWithDelay(ctx, allCapturedIndexUsageStats, stopper, loggingDelay) return nil } -// logIndexUsageStatsWithDelay logs an eventpb.EventPayload at each -// telemetryCaptureIndexUsageStatsLoggingDelay to avoid exceeding the 10 -// log-line per second limit per node on the telemetry logging pipeline. -// Currently, this log-line limit is only shared with 1 other telemetry event, -// SampledQuery, which now has a logging frequency of 8 logs per second. -func logIndexUsageStatsWithDelay( - ctx context.Context, events []logpb.EventPayload, stopper *stop.Stopper, delay time.Duration, -) { - // Log the first event immediately. - timer := time.NewTimer(0 * time.Second) - defer timer.Stop() - for len(events) > 0 { - select { - case <-stopper.ShouldQuiesce(): - return - case <-timer.C: - event := events[0] - log.StructuredEvent(ctx, event) - events = events[1:] - // Apply a delay to subsequent events. - timer.Reset(delay) - } - } -} - func getAllDatabaseNames(ctx context.Context, ie isql.Executor) (tree.NameList, error) { var allDatabaseNames tree.NameList var ok bool diff --git a/pkg/util/log/eventpb/eventlog_channels_generated.go b/pkg/util/log/eventpb/eventlog_channels_generated.go index 602d18f3e09b..067e9054d7ed 100644 --- a/pkg/util/log/eventpb/eventlog_channels_generated.go +++ b/pkg/util/log/eventpb/eventlog_channels_generated.go @@ -325,6 +325,9 @@ func (m *ChangefeedFailed) LoggingChannel() logpb.Channel { return logpb.Channel // LoggingChannel implements the EventPayload interface. func (m *CreateChangefeed) LoggingChannel() logpb.Channel { return logpb.Channel_TELEMETRY } +// LoggingChannel implements the EventPayload interface. +func (m *HotRangesStats) LoggingChannel() logpb.Channel { return logpb.Channel_TELEMETRY } + // LoggingChannel implements the EventPayload interface. func (m *RecoveryEvent) LoggingChannel() logpb.Channel { return logpb.Channel_TELEMETRY } diff --git a/pkg/util/log/eventpb/json_encode_generated.go b/pkg/util/log/eventpb/json_encode_generated.go index ec1eb639fba3..8d22e58012b3 100644 --- a/pkg/util/log/eventpb/json_encode_generated.go +++ b/pkg/util/log/eventpb/json_encode_generated.go @@ -3054,6 +3054,132 @@ func (m *GrantRole) AppendJSONFields(printComma bool, b redact.RedactableBytes) return printComma, b } +// AppendJSONFields implements the EventPayload interface. +func (m *HotRangesStats) AppendJSONFields(printComma bool, b redact.RedactableBytes) (bool, redact.RedactableBytes) { + + printComma, b = m.CommonEventDetails.AppendJSONFields(printComma, b) + + if m.RangeID != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"RangeID\":"...) + b = strconv.AppendInt(b, int64(m.RangeID), 10) + } + + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"Qps\":"...) + b = strconv.AppendFloat(b, float64(m.Qps), 'f', -1, 64) + + if m.DatabaseName != "" { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"DatabaseName\":\""...) + b = append(b, redact.StartMarker()...) + b = redact.RedactableBytes(jsonbytes.EncodeString([]byte(b), string(redact.EscapeMarkers([]byte(m.DatabaseName))))) + b = append(b, redact.EndMarker()...) + b = append(b, '"') + } + + if m.TableName != "" { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"TableName\":\""...) + b = append(b, redact.StartMarker()...) + b = redact.RedactableBytes(jsonbytes.EncodeString([]byte(b), string(redact.EscapeMarkers([]byte(m.TableName))))) + b = append(b, redact.EndMarker()...) + b = append(b, '"') + } + + if m.IndexName != "" { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"IndexName\":\""...) + b = append(b, redact.StartMarker()...) + b = redact.RedactableBytes(jsonbytes.EncodeString([]byte(b), string(redact.EscapeMarkers([]byte(m.IndexName))))) + b = append(b, redact.EndMarker()...) + b = append(b, '"') + } + + if m.SchemaName != "" { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"SchemaName\":\""...) + b = append(b, redact.StartMarker()...) + b = redact.RedactableBytes(jsonbytes.EncodeString([]byte(b), string(redact.EscapeMarkers([]byte(m.SchemaName))))) + b = append(b, redact.EndMarker()...) + b = append(b, '"') + } + + if m.LeaseholderNodeID != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"LeaseholderNodeID\":"...) + b = strconv.AppendInt(b, int64(m.LeaseholderNodeID), 10) + } + + if m.WritesPerSecond != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"WritesPerSecond\":"...) + b = strconv.AppendFloat(b, float64(m.WritesPerSecond), 'f', -1, 64) + } + + if m.ReadsPerSecond != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"ReadsPerSecond\":"...) + b = strconv.AppendFloat(b, float64(m.ReadsPerSecond), 'f', -1, 64) + } + + if m.WriteBytesPerSecond != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"WriteBytesPerSecond\":"...) + b = strconv.AppendFloat(b, float64(m.WriteBytesPerSecond), 'f', -1, 64) + } + + if m.ReadBytesPerSecond != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"ReadBytesPerSecond\":"...) + b = strconv.AppendFloat(b, float64(m.ReadBytesPerSecond), 'f', -1, 64) + } + + if m.CPUTimePerSecond != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"CPUTimePerSecond\":"...) + b = strconv.AppendFloat(b, float64(m.CPUTimePerSecond), 'f', -1, 64) + } + + return printComma, b +} + // AppendJSONFields implements the EventPayload interface. func (m *Import) AppendJSONFields(printComma bool, b redact.RedactableBytes) (bool, redact.RedactableBytes) { diff --git a/pkg/util/log/eventpb/telemetry.proto b/pkg/util/log/eventpb/telemetry.proto index d6cac6cc507b..887da355d6b2 100644 --- a/pkg/util/log/eventpb/telemetry.proto +++ b/pkg/util/log/eventpb/telemetry.proto @@ -446,3 +446,46 @@ message SchemaSnapshotMetadata { // which includes the redaction of any potential PII. repeated string errors = 5 [(gogoproto.jsontag) = ",omitempty"]; } + +// HotRangesStats +message HotRangesStats { + CommonEventDetails common = 1 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "", (gogoproto.embed) = true]; + + int64 range_id = 2 [(gogoproto.jsontag) = ",omitempty", (gogoproto.customname) = "RangeID"]; + + double qps = 3 [(gogoproto.jsontag) = ",includeempty"]; + + // DatabaseName is the name of the database in which the index was created. + string database_name = 4 [(gogoproto.jsontag) = ",omitempty"]; + + // TableName is the name of the table on which the index was created. + string table_name = 5 [(gogoproto.jsontag) = ",omitempty"]; + + // IndexName is the name of the index within the scope of the given table. + string index_name = 6 [(gogoproto.jsontag) = ",omitempty"]; + + // SchemaName is the name of the schema in which the index was created. + string schema_name = 7 [(gogoproto.jsontag) = ",omitempty"]; + + // LeaseholderNodeID indicates the Node ID that is the current leaseholder for the given range. + int32 leaseholder_node_id = 8 [(gogoproto.customname) = "LeaseholderNodeID", (gogoproto.jsontag) = ",omitempty"]; + + // Writes per second is the recent number of keys written per second on + // this range. + double writes_per_second = 9 [(gogoproto.jsontag) = ",omitempty"]; + + // Reads per second is the recent number of keys read per second on + // this range. + double reads_per_second = 10 [(gogoproto.jsontag) = ",omitempty"]; + + // Write bytes per second is the recent number of bytes written per second on + // this range. + double write_bytes_per_second = 11 [(gogoproto.jsontag) = ",omitempty"]; + + // Read bytes per second is the recent number of bytes read per second on + // this range. + double read_bytes_per_second = 12 [(gogoproto.jsontag) = ",omitempty"]; + + // CPU time per second is the recent cpu usage in nanoseconds of this range. + double cpu_time_per_second = 13 [(gogoproto.customname) = "CPUTimePerSecond", (gogoproto.jsontag) = ",omitempty"]; +} diff --git a/pkg/util/log/logutil/BUILD.bazel b/pkg/util/log/logutil/BUILD.bazel index adf86971dd9e..92653761ac17 100644 --- a/pkg/util/log/logutil/BUILD.bazel +++ b/pkg/util/log/logutil/BUILD.bazel @@ -11,6 +11,8 @@ go_library( "//pkg/jobs/jobspb", "//pkg/util/log", "//pkg/util/log/eventpb", + "//pkg/util/log/logpb", + "//pkg/util/stop", "@com_github_cockroachdb_redact//:redact", ], ) diff --git a/pkg/util/log/logutil/log_util.go b/pkg/util/log/logutil/log_util.go index 884cf8e36386..59bc6ec65a9e 100644 --- a/pkg/util/log/logutil/log_util.go +++ b/pkg/util/log/logutil/log_util.go @@ -12,11 +12,14 @@ package logutil import ( "context" + "time" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" + "github.com/cockroachdb/cockroach/pkg/util/log/logpb" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/redact" ) @@ -53,3 +56,26 @@ func LogJobCompletion( log.StructuredEvent(ctx, event) } + +// LogEventsWithDelay logs an eventpb.EventPayload at provided +// delay duration to avoid exceeding the 10 log-line per second limit per node +// on the telemetry logging pipeline. +func LogEventsWithDelay( + ctx context.Context, events []logpb.EventPayload, stopper *stop.Stopper, delay time.Duration, +) { + // Log the first event immediately. + timer := time.NewTimer(0 * time.Second) + defer timer.Stop() + for len(events) > 0 { + select { + case <-stopper.ShouldQuiesce(): + return + case <-timer.C: + event := events[0] + log.StructuredEvent(ctx, event) + events = events[1:] + // Apply a delay to subsequent events. + timer.Reset(delay) + } + } +}