From dd9dabede1ded9e9c2e9c7cab031f004cd554af5 Mon Sep 17 00:00:00 2001 From: Miles Frankel Date: Mon, 29 Jul 2024 21:07:50 -0400 Subject: [PATCH] sql: add system table to support LISTEN/NOTIFY Add a new system table, system.notifications, to back the LISTEN/NOTIFY mechanism. Part of: #41522 Release note: None --- pkg/ccl/backupccl/system_schema.go | 3 ++ pkg/clusterversion/cockroach_versions.go | 4 ++ pkg/sql/catalog/bootstrap/metadata.go | 5 +- pkg/sql/catalog/catprivilege/system.go | 1 + pkg/sql/catalog/systemschema/system.go | 49 ++++++++++++++----- .../pgwirebase/servermessagetype_string.go | 3 ++ pkg/sql/sem/catconstants/constants.go | 1 + pkg/upgrade/upgrades/BUILD.bazel | 1 + pkg/upgrade/upgrades/upgrades.go | 8 +++ .../upgrades/v24_3_listen_notify_queue.go | 29 +++++++++++ 10 files changed, 92 insertions(+), 12 deletions(-) create mode 100644 pkg/upgrade/upgrades/v24_3_listen_notify_queue.go diff --git a/pkg/ccl/backupccl/system_schema.go b/pkg/ccl/backupccl/system_schema.go index 9d5cbb877392..b10876e96ffe 100644 --- a/pkg/ccl/backupccl/system_schema.go +++ b/pkg/ccl/backupccl/system_schema.go @@ -847,6 +847,9 @@ var systemTableBackupConfiguration = map[string]systemBackupConfiguration{ systemschema.TransactionExecInsightsTable.GetName(): { shouldIncludeInClusterBackup: optOutOfClusterBackup, }, + systemschema.ListenNotifyQueueTable.GetName(): { + shouldIncludeInClusterBackup: optInToClusterBackup, // ? + }, } func rekeySystemTable( diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 0fcbb3523eb8..62548e511f2a 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -268,6 +268,8 @@ const ( // minimum timestamp field. V24_2_LeaseMinTimestamp + V24_3_ListenNotifyQueue + // ************************************************* // Step (1) Add new versions above this comment. // Do not add new versions to a patch release. @@ -333,6 +335,8 @@ var versionTable = [numKeys]roachpb.Version{ V24_2_DeleteTenantSettingsVersion: {Major: 24, Minor: 1, Internal: 10}, V24_2_LeaseMinTimestamp: {Major: 24, Minor: 1, Internal: 12}, + V24_3_ListenNotifyQueue: {Major: 24, Minor: 3, Internal: 0}, + // ************************************************* // Step (2): Add new versions above this comment. // Do not add new versions to a patch release. diff --git a/pkg/sql/catalog/bootstrap/metadata.go b/pkg/sql/catalog/bootstrap/metadata.go index 3b5de92eb52a..1bc00b87342a 100644 --- a/pkg/sql/catalog/bootstrap/metadata.go +++ b/pkg/sql/catalog/bootstrap/metadata.go @@ -455,6 +455,9 @@ func addSystemDescriptorsToSchema(target *MetadataSchema) { target.AddDescriptor(systemschema.TransactionExecInsightsTable) target.AddDescriptor(systemschema.StatementExecInsightsTable) + // Tables introduced in 24.2. + target.AddDescriptor(systemschema.ListenNotifyQueueTable) + // Adding a new system table? It should be added here to the metadata schema, // and also created as a migration for older clusters. // If adding a call to AddDescriptor or AddDescriptorForSystemTenant, please @@ -467,7 +470,7 @@ func addSystemDescriptorsToSchema(target *MetadataSchema) { // NumSystemTablesForSystemTenant is the number of system tables defined on // the system tenant. This constant is only defined to avoid having to manually // update auto stats tests every time a new system table is added. -const NumSystemTablesForSystemTenant = 56 +const NumSystemTablesForSystemTenant = 57 // addSplitIDs adds a split point for each of the PseudoTableIDs to the supplied // MetadataSchema. diff --git a/pkg/sql/catalog/catprivilege/system.go b/pkg/sql/catalog/catprivilege/system.go index 46ecfcc0848a..845538d7b61d 100644 --- a/pkg/sql/catalog/catprivilege/system.go +++ b/pkg/sql/catalog/catprivilege/system.go @@ -78,6 +78,7 @@ var ( catconstants.MVCCStatistics, catconstants.TxnExecInsightsTableName, catconstants.StmtExecInsightsTableName, + catconstants.ListenNotifyQueueTableName, } readWriteSystemSequences = []catconstants.SystemTableName{ diff --git a/pkg/sql/catalog/systemschema/system.go b/pkg/sql/catalog/systemschema/system.go index cfddbabec730..f62b63b01c66 100644 --- a/pkg/sql/catalog/systemschema/system.go +++ b/pkg/sql/catalog/systemschema/system.go @@ -120,6 +120,13 @@ CREATE TABLE system.tenants ( INDEX tenants_service_mode_idx (service_mode ASC) );` + ListenNotifyQueueTableSchema = ` +CREATE TABLE system.notifications ( + channel STRING NOT NULL PRIMARY KEY, + payload STRING, + pid INT4 NOT NULL, +);` + // RoleIDSequenceSchema starts at 100 so we have reserved IDs for special // roles such as root and admin. RoleIDSequenceSchema = ` @@ -499,8 +506,8 @@ CREATE TABLE system.scheduled_jobs ( FAMILY sched (schedule_id, next_run, schedule_state), FAMILY other ( - schedule_name, created, owner, schedule_expr, - schedule_details, executor_type, execution_args + schedule_name, created, owner, schedule_expr, + schedule_details, executor_type, execution_args ) )` @@ -966,11 +973,11 @@ CREATE TABLE system.job_info ( SpanStatsUniqueKeysTableSchema = ` CREATE TABLE system.span_stats_unique_keys ( -- Every key has a unique id. We can't use the value of the key itself - -- because we want the cost of storing the key to + -- because we want the cost of storing the key to -- amortize with repeated references. A UUID is 16 bytes, -- but a roachpb.Key can be arbitrarily large. id UUID DEFAULT gen_random_uuid(), - + -- key_bytes stores the raw bytes of a roachpb.Key. key_bytes BYTES, CONSTRAINT "primary" PRIMARY KEY (id), @@ -984,17 +991,17 @@ CREATE TABLE system.span_stats_unique_keys ( CREATE TABLE system.span_stats_buckets ( -- Every bucket has a unique id. id UUID DEFAULT gen_random_uuid(), - + -- The bucket belongs to sample_id sample_id UUID NOT NULL, - + -- The uuid of this bucket's span's start key. start_key_id UUID NOT NULL, - + -- The uuid of this bucket's span's start key. end_key_id UUID NOT NULL, - - -- The number of KV requests destined for this span. + + -- The number of KV requests destined for this span. requests INT NOT NULL, CONSTRAINT "primary" PRIMARY KEY (id), INDEX buckets_sample_id_idx (sample_id ASC), @@ -1007,7 +1014,7 @@ CREATE TABLE system.span_stats_buckets ( CREATE TABLE system.span_stats_samples ( -- Every sample has a unique id. id UUID DEFAULT gen_random_uuid(), - + -- sample_time represents the time the sample ended. -- The sample's start time is therefore equal to sample_time - keyvissettings.SampleInterval. sample_time TIMESTAMP NOT NULL DEFAULT now(), @@ -1068,7 +1075,7 @@ CREATE TABLE system.mvcc_statistics ( user_priority STRING, retries INT8, last_retry_reason STRING, - problems INT[], + problems INT[], causes INT[], stmt_execution_ids STRING[], cpu_sql_nanos INT8, @@ -1418,6 +1425,7 @@ func MakeSystemTables() []SystemTable { SystemMVCCStatisticsTable, StatementExecInsightsTable, TransactionExecInsightsTable, + ListenNotifyQueueTable, } } @@ -4742,3 +4750,22 @@ var ( // SpanConfigurationsTableName represents system.span_configurations. var SpanConfigurationsTableName = tree.NewTableNameWithSchema("system", catconstants.PublicSchemaName, tree.Name(catconstants.SpanConfigurationsTableName)) + +var ListenNotifyQueueTable = makeSystemTable( + ListenNotifyQueueTableSchema, + systemTable( + catconstants.ListenNotifyQueueTableName, + descpb.InvalidID, // dynamically assigned table ID + []descpb.ColumnDescriptor{ + {Name: "channel", ID: 1, Type: types.String}, + {Name: "payload", ID: 2, Type: types.String, Nullable: true}, + {Name: "pid", ID: 3, Type: types.Int4}, + }, + []descpb.ColumnFamilyDescriptor{{ + Name: "primary", + ID: 0, + ColumnNames: []string{"channel", "payload", "pid"}, + ColumnIDs: []descpb.ColumnID{1, 2, 3}, + }}, + pk("channel"), + )) diff --git a/pkg/sql/pgwire/pgwirebase/servermessagetype_string.go b/pkg/sql/pgwire/pgwirebase/servermessagetype_string.go index 2fba13917155..8cee0286ecd5 100644 --- a/pkg/sql/pgwire/pgwirebase/servermessagetype_string.go +++ b/pkg/sql/pgwire/pgwirebase/servermessagetype_string.go @@ -21,6 +21,7 @@ func _() { _ = x[ServerMsgEmptyQuery-73] _ = x[ServerMsgErrorResponse-69] _ = x[ServerMsgNoticeResponse-78] + _ = x[ServerMsgNotificationResponse-65] _ = x[ServerMsgNoData-110] _ = x[ServerMsgParameterDescription-116] _ = x[ServerMsgParameterStatus-83] @@ -58,6 +59,8 @@ func (i ServerMessageType) String() string { return "ServerMsgErrorResponse" case ServerMsgNoticeResponse: return "ServerMsgNoticeResponse" + case ServerMsgNotificationResponse: + return "ServerMsgNotificationResponse" case ServerMsgNoData: return "ServerMsgNoData" case ServerMsgParameterDescription: diff --git a/pkg/sql/sem/catconstants/constants.go b/pkg/sql/sem/catconstants/constants.go index 8b2636f2289c..7f60c17ce459 100644 --- a/pkg/sql/sem/catconstants/constants.go +++ b/pkg/sql/sem/catconstants/constants.go @@ -58,6 +58,7 @@ const ( DescIDSequenceTableName SystemTableName = "descriptor_id_seq" TenantIDSequenceTableName SystemTableName = "tenant_id_seq" TenantsTableName SystemTableName = "tenants" + ListenNotifyQueueTableName SystemTableName = "notifications" LeaseTableName SystemTableName = "lease" EventLogTableName SystemTableName = "eventlog" RangeEventTableName SystemTableName = "rangelog" diff --git a/pkg/upgrade/upgrades/BUILD.bazel b/pkg/upgrade/upgrades/BUILD.bazel index 175052189fcd..0b9bc478c35e 100644 --- a/pkg/upgrade/upgrades/BUILD.bazel +++ b/pkg/upgrade/upgrades/BUILD.bazel @@ -23,6 +23,7 @@ go_library( "v24_2_delete_version_tenant_settings.go", "v24_2_tenant_rates.go", "v24_2_tenant_system_tables.go", + "v24_3_listen_notify_queue.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/upgrade/upgrades", visibility = ["//visibility:public"], diff --git a/pkg/upgrade/upgrades/upgrades.go b/pkg/upgrade/upgrades/upgrades.go index 98d2ca243349..ee2c07513d44 100644 --- a/pkg/upgrade/upgrades/upgrades.go +++ b/pkg/upgrade/upgrades/upgrades.go @@ -156,6 +156,14 @@ var upgrades = []upgradebase.Upgrade{ upgrade.RestoreActionImplemented("bad row skipped when restoring system.tenant_settings"), ), + upgrade.NewTenantUpgrade( + "add new table for listen/notify queue", + clusterversion.V24_3_ListenNotifyQueue.Version(), + upgrade.NoPrecondition, + createListenNotifyQueyeTables, + upgrade.RestoreActionNotRequired("idk lol"), + ), + // Note: when starting a new release version, the first upgrade (for // Vxy_zStart) must be a newFirstUpgrade. Keep this comment at the bottom. } diff --git a/pkg/upgrade/upgrades/v24_3_listen_notify_queue.go b/pkg/upgrade/upgrades/v24_3_listen_notify_queue.go new file mode 100644 index 000000000000..f23b920b299d --- /dev/null +++ b/pkg/upgrade/upgrades/v24_3_listen_notify_queue.go @@ -0,0 +1,29 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package upgrades + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/upgrade" +) + +func createListenNotifyQueyeTables( + ctx context.Context, cv clusterversion.ClusterVersion, d upgrade.TenantDeps, +) error { + if err := createSystemTable(ctx, d.DB, d.Settings, d.Codec, systemschema.ListenNotifyQueueTable, tree.LocalityLevelTable); err != nil { + return err + } + return nil +}