diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 42abcf12ce60..84e28cdbfa70 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -2045,6 +2045,7 @@ GO_TARGETS = [ "//pkg/sql/pgwire/pgerror:pgerror", "//pkg/sql/pgwire/pgerror:pgerror_test", "//pkg/sql/pgwire/pgnotice:pgnotice", + "//pkg/sql/pgwire/pgnotification:pgnotification", "//pkg/sql/pgwire/pgwirebase:pgwirebase", "//pkg/sql/pgwire/pgwirecancel:pgwirecancel", "//pkg/sql/pgwire/pgwirecancel:pgwirecancel_test", diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 8163e1261259..1fba1e474526 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -158,6 +158,7 @@ go_library( "mvcc_statistics_update_job.go", "name_util.go", "notice.go", + "notification.go", "notify.go", "opaque.go", "opt_catalog.go", @@ -461,6 +462,7 @@ go_library( "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", "//pkg/sql/pgwire/pgnotice", + "//pkg/sql/pgwire/pgnotification", "//pkg/sql/pgwire/pgwirebase", "//pkg/sql/pgwire/pgwirecancel", "//pkg/sql/physicalplan", diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 8e07688619fa..167d6410fd7d 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -3823,6 +3823,7 @@ func (ex *connExecutor) initPlanner(ctx context.Context, p *planner) { p.sessionDataMutatorIterator = ex.dataMutatorIterator p.noticeSender = nil + p.notificationSender = nil p.preparedStatements = ex.getPrepStmtsAccessor() p.sqlCursors = ex.getCursorAccessor() p.storedProcTxnState = ex.getStoredProcTxnStateAccessor() diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 7f92e6e55d7f..498d811bba1a 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -479,6 +479,7 @@ func (ex *connExecutor) execStmtInOpenState( ex.resetPlanner(ctx, p, ex.state.mu.txn, stmtTS) p.sessionDataMutatorIterator.paramStatusUpdater = res p.noticeSender = res + p.notificationSender = ex.clientComm ih := &p.instrumentation if ex.executorType != executorTypeInternal { diff --git a/pkg/sql/conn_io.go b/pkg/sql/conn_io.go index 44c7dafa891e..b313a2b7ddec 100644 --- a/pkg/sql/conn_io.go +++ b/pkg/sql/conn_io.go @@ -763,6 +763,9 @@ type ClientComm interface { // CreateDrainResult creates a result for a Drain command. CreateDrainResult(pos CmdPos) DrainResult + // is this good? probably not... + notificationSender + // LockCommunication ensures that no further results are delivered to the // client. The returned ClientLock can be queried to see what results have // been already delivered to the client and to discard results that haven't diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index f59d40f25356..24bf5d166fff 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/parser/statements" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotification" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase" "github.com/cockroachdb/cockroach/pkg/sql/regions" "github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants" @@ -1695,6 +1696,11 @@ func (icc *internalClientComm) CreateDrainResult(pos CmdPos) DrainResult { panic("unimplemented") } +// BufferNotification is part of the ClientComm interface. +func (icc *internalClientComm) BufferNotification(pgnotification.Notification) error { + panic("unimplemented") +} + // Close is part of the ClientLock interface. func (icc *internalClientComm) Close() {} diff --git a/pkg/sql/listen.go b/pkg/sql/listen.go index 03b958be767b..5b2bac475789 100644 --- a/pkg/sql/listen.go +++ b/pkg/sql/listen.go @@ -13,17 +13,14 @@ package sql import ( "context" - "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" ) func (p *planner) Listen(ctx context.Context, n *tree.Listen) (planNode, error) { - p.BufferClientNotice(ctx, - pgerror.WithSeverity( - unimplemented.NewWithIssuef(41522, "CRDB does not support LISTEN"), - "ERROR", - ), - ) - return newZeroNode(nil /* columns */), nil + // TODO: run an EXPERIMENTAL CREATE CHANGEFEED plan/stmt basically with a different sort of sink that hooks up to this guy + + // Dummy implementation. + dummyNotificationListens[n.ChannelName.String()] = struct{}{} + + return newZeroNode(nil), nil } diff --git a/pkg/sql/notification.go b/pkg/sql/notification.go new file mode 100644 index 000000000000..58434b1f9469 --- /dev/null +++ b/pkg/sql/notification.go @@ -0,0 +1,48 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotification" + "github.com/cockroachdb/cockroach/pkg/util/log" +) + +// Notifications is the cluster setting that allows users +// to enable notifications. +var Notifications = settings.RegisterBoolSetting( + settings.ApplicationLevel, + "sql.notifications.enabled", + "enable notifications in the server/client protocol being sent", + true, + settings.WithPublic) + +type notificationSender interface { + // BufferNotification buffers the given notification to be flushed to the + // client before the connection is closed. + BufferNotification(pgnotification.Notification) error +} + +// BufferClientNotice implements the eval.ClientNotificationSender interface. +func (p *planner) BufferClientNotification(ctx context.Context, notification pgnotification.Notification) { + if log.V(2) { + log.Infof(ctx, "buffered notification: %+v", notification) + } + if !Notifications.Get(&p.ExecCfg().Settings.SV) { + return + } + if err := p.notificationSender.BufferNotification(notification); err != nil { + // This is just an artifact of the dummy impl, probably. + log.Errorf(ctx, "buffering notification: %v", err) + } +} diff --git a/pkg/sql/notify.go b/pkg/sql/notify.go index d3f10934e0b8..5d1e93d6bed4 100644 --- a/pkg/sql/notify.go +++ b/pkg/sql/notify.go @@ -12,18 +12,24 @@ package sql import ( "context" + "os" - "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotification" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" ) +var dummyNotificationListens = make(map[string]struct{}) + func (p *planner) Notify(ctx context.Context, n *tree.Notify) (planNode, error) { - p.BufferClientNotice(ctx, - pgerror.WithSeverity( - unimplemented.NewWithIssuef(41522, "CRDB does not support LISTEN"), - "ERROR", - ), - ) - return newZeroNode(nil /* columns */), nil + // This is a dummy implementation. + if _, ok := dummyNotificationListens[n.ChannelName.String()]; !ok { + return newZeroNode(nil), nil + } + p.BufferClientNotification(ctx, pgnotification.Notification{ + Channel: n.ChannelName.String(), + Payload: n.Payload.String(), + PID: int32(os.Getpid()), + }) + + return newZeroNode(nil), nil } diff --git a/pkg/sql/pgwire/BUILD.bazel b/pkg/sql/pgwire/BUILD.bazel index 2208f9985320..c115af7670f7 100644 --- a/pkg/sql/pgwire/BUILD.bazel +++ b/pkg/sql/pgwire/BUILD.bazel @@ -50,6 +50,7 @@ go_library( "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", "//pkg/sql/pgwire/pgnotice", + "//pkg/sql/pgwire/pgnotification", "//pkg/sql/pgwire/pgwirebase", "//pkg/sql/pgwire/pgwirecancel", "//pkg/sql/sem/catconstants", diff --git a/pkg/sql/pgwire/conn.go b/pkg/sql/pgwire/conn.go index 158cc72beb72..b4e878579bd8 100644 --- a/pkg/sql/pgwire/conn.go +++ b/pkg/sql/pgwire/conn.go @@ -35,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotification" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirecancel" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -270,6 +271,26 @@ func (c *conn) bufferNotice(ctx context.Context, noticeErr pgnotice.Notice) erro return c.writeErrFields(ctx, noticeErr, &c.writerState.buf) } +// TODO: only send at end of txn, if we're in one. what level do we do this at? +// TODO: synchronize with command execution results writing with `bufferRows`. Currently this is just racey. +func (c *conn) BufferNotification(notif pgnotification.Notification) error { + c.msgBuilder.initMsg(pgwirebase.ServerMsgNotificationResponse) + c.msgBuilder.putInt32(notif.PID) + c.msgBuilder.writeTerminatedString(notif.Channel) + c.msgBuilder.writeTerminatedString(notif.Payload) + if err := c.msgBuilder.finishMsg(&c.writerState.buf); err != nil { + return err + } + + // Flush immediately. TODO: is this right? + _, err := c.writerState.buf.WriteTo(c.conn) + if err != nil { + c.setErr(err) + return err + } + return nil +} + func (c *conn) sendInitialConnData( ctx context.Context, sqlServer *sql.Server, diff --git a/pkg/sql/pgwire/pgnotification/BUILD.bazel b/pkg/sql/pgwire/pgnotification/BUILD.bazel new file mode 100644 index 000000000000..9bfa3e7751ea --- /dev/null +++ b/pkg/sql/pgwire/pgnotification/BUILD.bazel @@ -0,0 +1,8 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "pgnotification", + srcs = ["pgnotification.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotification", + visibility = ["//visibility:public"], +) diff --git a/pkg/sql/pgwire/pgnotification/pgnotification.go b/pkg/sql/pgwire/pgnotification/pgnotification.go new file mode 100644 index 000000000000..928bc7ddfc73 --- /dev/null +++ b/pkg/sql/pgwire/pgnotification/pgnotification.go @@ -0,0 +1,17 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package pgnotification + +type Notification struct { + Channel string + Payload string + PID int32 +} diff --git a/pkg/sql/pgwire/pgwirebase/msg.go b/pkg/sql/pgwire/pgwirebase/msg.go index 8e4b5156c915..89b912871566 100644 --- a/pkg/sql/pgwire/pgwirebase/msg.go +++ b/pkg/sql/pgwire/pgwirebase/msg.go @@ -51,6 +51,7 @@ const ( ServerMsgEmptyQuery ServerMessageType = 'I' ServerMsgErrorResponse ServerMessageType = 'E' ServerMsgNoticeResponse ServerMessageType = 'N' + ServerMsgNotificationResponse ServerMessageType = 'A' ServerMsgNoData ServerMessageType = 'n' ServerMsgParameterDescription ServerMessageType = 't' ServerMsgParameterStatus ServerMessageType = 'S' diff --git a/pkg/sql/pgwire/server.go b/pkg/sql/pgwire/server.go index 5a37edb09844..1c1c3c8de7a2 100644 --- a/pkg/sql/pgwire/server.go +++ b/pkg/sql/pgwire/server.go @@ -1043,6 +1043,25 @@ func (s *Server) serveImpl( sessionID, ) }() + + // don't think we can do it like this because the conn is not thread safe and it's owned by the above goroutine. so do stuff in there + + // // Spawn the notification listening (NOTIFY/LISTEN) goroutine, which + // // listens for notifications and sends them to the client. + // procWg.Add(1) + // go func() { + // defer procWg.Done() + // c.handleNotifications( + // ctx, + // authOpt, + // authPipe, + // sqlServer, + // reserved, + // onDefaultIntSizeChange, + // sessionID, + // ) + // }() + } else { // sqlServer == nil means we are in a local test. In this case // we only need the minimum to make pgx happy. diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index bc06b9445ff4..7b70f57217ad 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -273,6 +273,10 @@ type planner struct { // Do not use this object directly; use the BufferClientNotice() method // instead. noticeSender noticeSender + // ditto? idk. + // hooked up to the client comm. this is like a sink. + // TODO: this has to be thread safe wrt the conn and statements executing on it. + notificationSender notificationSender queryCacheSession querycache.Session diff --git a/pkg/sql/sem/eval/BUILD.bazel b/pkg/sql/sem/eval/BUILD.bazel index 6e3a3e1071bf..483808041220 100644 --- a/pkg/sql/sem/eval/BUILD.bazel +++ b/pkg/sql/sem/eval/BUILD.bazel @@ -61,6 +61,7 @@ go_library( "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", "//pkg/sql/pgwire/pgnotice", + "//pkg/sql/pgwire/pgnotification", "//pkg/sql/pgwire/pgwirecancel", "//pkg/sql/privilege", "//pkg/sql/sem/builtins/builtinsregistry", diff --git a/pkg/sql/sem/eval/deps.go b/pkg/sql/sem/eval/deps.go index 3bb02a8348b7..25c7959d0cc4 100644 --- a/pkg/sql/sem/eval/deps.go +++ b/pkg/sql/sem/eval/deps.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotification" "github.com/cockroachdb/cockroach/pkg/sql/privilege" "github.com/cockroachdb/cockroach/pkg/sql/sem/catid" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -565,6 +566,10 @@ type ClientNoticeSender interface { SendClientNotice(ctx context.Context, notice pgnotice.Notice) error } +type ClientNotificationSender interface { + BufferClientNotification(ctx context.Context, notification pgnotification.Notification) +} + // DeferredRoutineSender allows a nested routine to send the information needed // for its own evaluation to a parent routine. This is used to defer execution // for tail-call optimization. It can only be used during local execution. diff --git a/pkg/sql/unlisten.go b/pkg/sql/unlisten.go index 997733025792..eafa04b45d29 100644 --- a/pkg/sql/unlisten.go +++ b/pkg/sql/unlisten.go @@ -13,17 +13,10 @@ package sql import ( "context" - "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" ) func (p *planner) Unlisten(ctx context.Context, n *tree.Unlisten) (planNode, error) { - p.BufferClientNotice(ctx, - pgerror.WithSeverity( - unimplemented.NewWithIssuef(41522, "CRDB does not support LISTEN, making UNLISTEN a no-op"), - "NOTICE", - ), - ) + delete(dummyNotificationListens, n.ChannelName.String()) return newZeroNode(nil /* columns */), nil }