Skip to content

Commit

Permalink
sql: add system table to support LISTEN/NOTIFY
Browse files Browse the repository at this point in the history
Add a new system table, system.notifications, to
back the LISTEN/NOTIFY mechanism.

Part of: cockroachdb#41522

Release note: None
  • Loading branch information
asg0451 committed Jul 30, 2024
1 parent 6be7d4d commit dd9dabe
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 12 deletions.
3 changes: 3 additions & 0 deletions pkg/ccl/backupccl/system_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,9 @@ var systemTableBackupConfiguration = map[string]systemBackupConfiguration{
systemschema.TransactionExecInsightsTable.GetName(): {
shouldIncludeInClusterBackup: optOutOfClusterBackup,
},
systemschema.ListenNotifyQueueTable.GetName(): {
shouldIncludeInClusterBackup: optInToClusterBackup, // ?
},
}

func rekeySystemTable(
Expand Down
4 changes: 4 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ const (
// minimum timestamp field.
V24_2_LeaseMinTimestamp

V24_3_ListenNotifyQueue

// *************************************************
// Step (1) Add new versions above this comment.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -333,6 +335,8 @@ var versionTable = [numKeys]roachpb.Version{
V24_2_DeleteTenantSettingsVersion: {Major: 24, Minor: 1, Internal: 10},
V24_2_LeaseMinTimestamp: {Major: 24, Minor: 1, Internal: 12},

V24_3_ListenNotifyQueue: {Major: 24, Minor: 3, Internal: 0},

// *************************************************
// Step (2): Add new versions above this comment.
// Do not add new versions to a patch release.
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/catalog/bootstrap/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,9 @@ func addSystemDescriptorsToSchema(target *MetadataSchema) {
target.AddDescriptor(systemschema.TransactionExecInsightsTable)
target.AddDescriptor(systemschema.StatementExecInsightsTable)

// Tables introduced in 24.2.
target.AddDescriptor(systemschema.ListenNotifyQueueTable)

// Adding a new system table? It should be added here to the metadata schema,
// and also created as a migration for older clusters.
// If adding a call to AddDescriptor or AddDescriptorForSystemTenant, please
Expand All @@ -467,7 +470,7 @@ func addSystemDescriptorsToSchema(target *MetadataSchema) {
// NumSystemTablesForSystemTenant is the number of system tables defined on
// the system tenant. This constant is only defined to avoid having to manually
// update auto stats tests every time a new system table is added.
const NumSystemTablesForSystemTenant = 56
const NumSystemTablesForSystemTenant = 57

// addSplitIDs adds a split point for each of the PseudoTableIDs to the supplied
// MetadataSchema.
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/catalog/catprivilege/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ var (
catconstants.MVCCStatistics,
catconstants.TxnExecInsightsTableName,
catconstants.StmtExecInsightsTableName,
catconstants.ListenNotifyQueueTableName,
}

readWriteSystemSequences = []catconstants.SystemTableName{
Expand Down
49 changes: 38 additions & 11 deletions pkg/sql/catalog/systemschema/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@ CREATE TABLE system.tenants (
INDEX tenants_service_mode_idx (service_mode ASC)
);`

ListenNotifyQueueTableSchema = `
CREATE TABLE system.notifications (
channel STRING NOT NULL PRIMARY KEY,
payload STRING,
pid INT4 NOT NULL,
);`

// RoleIDSequenceSchema starts at 100 so we have reserved IDs for special
// roles such as root and admin.
RoleIDSequenceSchema = `
Expand Down Expand Up @@ -499,8 +506,8 @@ CREATE TABLE system.scheduled_jobs (
FAMILY sched (schedule_id, next_run, schedule_state),
FAMILY other (
schedule_name, created, owner, schedule_expr,
schedule_details, executor_type, execution_args
schedule_name, created, owner, schedule_expr,
schedule_details, executor_type, execution_args
)
)`

Expand Down Expand Up @@ -966,11 +973,11 @@ CREATE TABLE system.job_info (
SpanStatsUniqueKeysTableSchema = `
CREATE TABLE system.span_stats_unique_keys (
-- Every key has a unique id. We can't use the value of the key itself
-- because we want the cost of storing the key to
-- because we want the cost of storing the key to
-- amortize with repeated references. A UUID is 16 bytes,
-- but a roachpb.Key can be arbitrarily large.
id UUID DEFAULT gen_random_uuid(),
-- key_bytes stores the raw bytes of a roachpb.Key.
key_bytes BYTES,
CONSTRAINT "primary" PRIMARY KEY (id),
Expand All @@ -984,17 +991,17 @@ CREATE TABLE system.span_stats_unique_keys (
CREATE TABLE system.span_stats_buckets (
-- Every bucket has a unique id.
id UUID DEFAULT gen_random_uuid(),
-- The bucket belongs to sample_id
sample_id UUID NOT NULL,
-- The uuid of this bucket's span's start key.
start_key_id UUID NOT NULL,
-- The uuid of this bucket's span's start key.
end_key_id UUID NOT NULL,
-- The number of KV requests destined for this span.
-- The number of KV requests destined for this span.
requests INT NOT NULL,
CONSTRAINT "primary" PRIMARY KEY (id),
INDEX buckets_sample_id_idx (sample_id ASC),
Expand All @@ -1007,7 +1014,7 @@ CREATE TABLE system.span_stats_buckets (
CREATE TABLE system.span_stats_samples (
-- Every sample has a unique id.
id UUID DEFAULT gen_random_uuid(),
-- sample_time represents the time the sample ended.
-- The sample's start time is therefore equal to sample_time - keyvissettings.SampleInterval.
sample_time TIMESTAMP NOT NULL DEFAULT now(),
Expand Down Expand Up @@ -1068,7 +1075,7 @@ CREATE TABLE system.mvcc_statistics (
user_priority STRING,
retries INT8,
last_retry_reason STRING,
problems INT[],
problems INT[],
causes INT[],
stmt_execution_ids STRING[],
cpu_sql_nanos INT8,
Expand Down Expand Up @@ -1418,6 +1425,7 @@ func MakeSystemTables() []SystemTable {
SystemMVCCStatisticsTable,
StatementExecInsightsTable,
TransactionExecInsightsTable,
ListenNotifyQueueTable,
}
}

Expand Down Expand Up @@ -4742,3 +4750,22 @@ var (

// SpanConfigurationsTableName represents system.span_configurations.
var SpanConfigurationsTableName = tree.NewTableNameWithSchema("system", catconstants.PublicSchemaName, tree.Name(catconstants.SpanConfigurationsTableName))

var ListenNotifyQueueTable = makeSystemTable(
ListenNotifyQueueTableSchema,
systemTable(
catconstants.ListenNotifyQueueTableName,
descpb.InvalidID, // dynamically assigned table ID
[]descpb.ColumnDescriptor{
{Name: "channel", ID: 1, Type: types.String},
{Name: "payload", ID: 2, Type: types.String, Nullable: true},
{Name: "pid", ID: 3, Type: types.Int4},
},
[]descpb.ColumnFamilyDescriptor{{
Name: "primary",
ID: 0,
ColumnNames: []string{"channel", "payload", "pid"},
ColumnIDs: []descpb.ColumnID{1, 2, 3},
}},
pk("channel"),
))
3 changes: 3 additions & 0 deletions pkg/sql/pgwire/pgwirebase/servermessagetype_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/sql/sem/catconstants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const (
DescIDSequenceTableName SystemTableName = "descriptor_id_seq"
TenantIDSequenceTableName SystemTableName = "tenant_id_seq"
TenantsTableName SystemTableName = "tenants"
ListenNotifyQueueTableName SystemTableName = "notifications"
LeaseTableName SystemTableName = "lease"
EventLogTableName SystemTableName = "eventlog"
RangeEventTableName SystemTableName = "rangelog"
Expand Down
1 change: 1 addition & 0 deletions pkg/upgrade/upgrades/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"v24_2_delete_version_tenant_settings.go",
"v24_2_tenant_rates.go",
"v24_2_tenant_system_tables.go",
"v24_3_listen_notify_queue.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/upgrade/upgrades",
visibility = ["//visibility:public"],
Expand Down
8 changes: 8 additions & 0 deletions pkg/upgrade/upgrades/upgrades.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,14 @@ var upgrades = []upgradebase.Upgrade{
upgrade.RestoreActionImplemented("bad row skipped when restoring system.tenant_settings"),
),

upgrade.NewTenantUpgrade(
"add new table for listen/notify queue",
clusterversion.V24_3_ListenNotifyQueue.Version(),
upgrade.NoPrecondition,
createListenNotifyQueyeTables,
upgrade.RestoreActionNotRequired("idk lol"),
),

// Note: when starting a new release version, the first upgrade (for
// Vxy_zStart) must be a newFirstUpgrade. Keep this comment at the bottom.
}
Expand Down
29 changes: 29 additions & 0 deletions pkg/upgrade/upgrades/v24_3_listen_notify_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package upgrades

import (
"context"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/upgrade"
)

func createListenNotifyQueyeTables(
ctx context.Context, cv clusterversion.ClusterVersion, d upgrade.TenantDeps,
) error {
if err := createSystemTable(ctx, d.DB, d.Settings, d.Codec, systemschema.ListenNotifyQueueTable, tree.LocalityLevelTable); err != nil {
return err
}
return nil
}

0 comments on commit dd9dabe

Please sign in to comment.