Skip to content

Commit

Permalink
sql: add support for LISTEN/NOTIFY in the pgwire protocol
Browse files Browse the repository at this point in the history
Built on top of#127818, which adds parsing
support, this patch adds the capability to send
asynchronous notifications to clients over the
pgwire protocol.

Part of: cockroachdb#41522

Release note: None
  • Loading branch information
asg0451 committed Jul 29, 2024
1 parent 3ca4780 commit 6be7d4d
Show file tree
Hide file tree
Showing 19 changed files with 161 additions and 26 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2045,6 +2045,7 @@ GO_TARGETS = [
"//pkg/sql/pgwire/pgerror:pgerror",
"//pkg/sql/pgwire/pgerror:pgerror_test",
"//pkg/sql/pgwire/pgnotice:pgnotice",
"//pkg/sql/pgwire/pgnotification:pgnotification",
"//pkg/sql/pgwire/pgwirebase:pgwirebase",
"//pkg/sql/pgwire/pgwirecancel:pgwirecancel",
"//pkg/sql/pgwire/pgwirecancel:pgwirecancel_test",
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ go_library(
"mvcc_statistics_update_job.go",
"name_util.go",
"notice.go",
"notification.go",
"notify.go",
"opaque.go",
"opt_catalog.go",
Expand Down Expand Up @@ -461,6 +462,7 @@ go_library(
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/pgwire/pgnotice",
"//pkg/sql/pgwire/pgnotification",
"//pkg/sql/pgwire/pgwirebase",
"//pkg/sql/pgwire/pgwirecancel",
"//pkg/sql/physicalplan",
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3823,6 +3823,7 @@ func (ex *connExecutor) initPlanner(ctx context.Context, p *planner) {

p.sessionDataMutatorIterator = ex.dataMutatorIterator
p.noticeSender = nil
p.notificationSender = nil
p.preparedStatements = ex.getPrepStmtsAccessor()
p.sqlCursors = ex.getCursorAccessor()
p.storedProcTxnState = ex.getStoredProcTxnStateAccessor()
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ func (ex *connExecutor) execStmtInOpenState(
ex.resetPlanner(ctx, p, ex.state.mu.txn, stmtTS)
p.sessionDataMutatorIterator.paramStatusUpdater = res
p.noticeSender = res
p.notificationSender = ex.clientComm
ih := &p.instrumentation

if ex.executorType != executorTypeInternal {
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/conn_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,9 @@ type ClientComm interface {
// CreateDrainResult creates a result for a Drain command.
CreateDrainResult(pos CmdPos) DrainResult

// is this good? probably not...
notificationSender

// LockCommunication ensures that no further results are delivered to the
// client. The returned ClientLock can be queried to see what results have
// been already delivered to the client and to discard results that haven't
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/parser/statements"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotification"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase"
"github.com/cockroachdb/cockroach/pkg/sql/regions"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
Expand Down Expand Up @@ -1695,6 +1696,11 @@ func (icc *internalClientComm) CreateDrainResult(pos CmdPos) DrainResult {
panic("unimplemented")
}

// BufferNotification is part of the ClientComm interface.
func (icc *internalClientComm) BufferNotification(pgnotification.Notification) error {
panic("unimplemented")
}

// Close is part of the ClientLock interface.
func (icc *internalClientComm) Close() {}

Expand Down
15 changes: 6 additions & 9 deletions pkg/sql/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,14 @@ package sql
import (
"context"

"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
)

func (p *planner) Listen(ctx context.Context, n *tree.Listen) (planNode, error) {
p.BufferClientNotice(ctx,
pgerror.WithSeverity(
unimplemented.NewWithIssuef(41522, "CRDB does not support LISTEN"),
"ERROR",
),
)
return newZeroNode(nil /* columns */), nil
// TODO: run an EXPERIMENTAL CREATE CHANGEFEED plan/stmt basically with a different sort of sink that hooks up to this guy

// Dummy implementation.
dummyNotificationListens[n.ChannelName.String()] = struct{}{}

return newZeroNode(nil), nil
}
48 changes: 48 additions & 0 deletions pkg/sql/notification.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package sql

import (
"context"

"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotification"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

// Notifications is the cluster setting that allows users
// to enable notifications.
var Notifications = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"sql.notifications.enabled",
"enable notifications in the server/client protocol being sent",
true,
settings.WithPublic)

type notificationSender interface {
// BufferNotification buffers the given notification to be flushed to the
// client before the connection is closed.
BufferNotification(pgnotification.Notification) error
}

// BufferClientNotice implements the eval.ClientNotificationSender interface.
func (p *planner) BufferClientNotification(ctx context.Context, notification pgnotification.Notification) {
if log.V(2) {
log.Infof(ctx, "buffered notification: %+v", notification)
}
if !Notifications.Get(&p.ExecCfg().Settings.SV) {
return
}
if err := p.notificationSender.BufferNotification(notification); err != nil {
// This is just an artifact of the dummy impl, probably.
log.Errorf(ctx, "buffering notification: %v", err)
}
}
24 changes: 15 additions & 9 deletions pkg/sql/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,24 @@ package sql

import (
"context"
"os"

"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotification"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
)

var dummyNotificationListens = make(map[string]struct{})

func (p *planner) Notify(ctx context.Context, n *tree.Notify) (planNode, error) {
p.BufferClientNotice(ctx,
pgerror.WithSeverity(
unimplemented.NewWithIssuef(41522, "CRDB does not support LISTEN"),
"ERROR",
),
)
return newZeroNode(nil /* columns */), nil
// This is a dummy implementation.
if _, ok := dummyNotificationListens[n.ChannelName.String()]; !ok {
return newZeroNode(nil), nil
}
p.BufferClientNotification(ctx, pgnotification.Notification{
Channel: n.ChannelName.String(),
Payload: n.Payload.String(),
PID: int32(os.Getpid()),
})

return newZeroNode(nil), nil
}
1 change: 1 addition & 0 deletions pkg/sql/pgwire/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ go_library(
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/pgwire/pgnotice",
"//pkg/sql/pgwire/pgnotification",
"//pkg/sql/pgwire/pgwirebase",
"//pkg/sql/pgwire/pgwirecancel",
"//pkg/sql/sem/catconstants",
Expand Down
21 changes: 21 additions & 0 deletions pkg/sql/pgwire/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotification"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirecancel"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -270,6 +271,26 @@ func (c *conn) bufferNotice(ctx context.Context, noticeErr pgnotice.Notice) erro
return c.writeErrFields(ctx, noticeErr, &c.writerState.buf)
}

// TODO: only send at end of txn, if we're in one. what level do we do this at?
// TODO: synchronize with command execution results writing with `bufferRows`. Currently this is just racey.
func (c *conn) BufferNotification(notif pgnotification.Notification) error {
c.msgBuilder.initMsg(pgwirebase.ServerMsgNotificationResponse)
c.msgBuilder.putInt32(notif.PID)
c.msgBuilder.writeTerminatedString(notif.Channel)
c.msgBuilder.writeTerminatedString(notif.Payload)
if err := c.msgBuilder.finishMsg(&c.writerState.buf); err != nil {
return err
}

// Flush immediately. TODO: is this right?
_, err := c.writerState.buf.WriteTo(c.conn)
if err != nil {
c.setErr(err)
return err
}
return nil
}

func (c *conn) sendInitialConnData(
ctx context.Context,
sqlServer *sql.Server,
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/pgwire/pgnotification/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "pgnotification",
srcs = ["pgnotification.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotification",
visibility = ["//visibility:public"],
)
17 changes: 17 additions & 0 deletions pkg/sql/pgwire/pgnotification/pgnotification.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package pgnotification

type Notification struct {
Channel string
Payload string
PID int32
}
1 change: 1 addition & 0 deletions pkg/sql/pgwire/pgwirebase/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const (
ServerMsgEmptyQuery ServerMessageType = 'I'
ServerMsgErrorResponse ServerMessageType = 'E'
ServerMsgNoticeResponse ServerMessageType = 'N'
ServerMsgNotificationResponse ServerMessageType = 'A'
ServerMsgNoData ServerMessageType = 'n'
ServerMsgParameterDescription ServerMessageType = 't'
ServerMsgParameterStatus ServerMessageType = 'S'
Expand Down
19 changes: 19 additions & 0 deletions pkg/sql/pgwire/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,25 @@ func (s *Server) serveImpl(
sessionID,
)
}()

// don't think we can do it like this because the conn is not thread safe and it's owned by the above goroutine. so do stuff in there

// // Spawn the notification listening (NOTIFY/LISTEN) goroutine, which
// // listens for notifications and sends them to the client.
// procWg.Add(1)
// go func() {
// defer procWg.Done()
// c.handleNotifications(
// ctx,
// authOpt,
// authPipe,
// sqlServer,
// reserved,
// onDefaultIntSizeChange,
// sessionID,
// )
// }()

} else {
// sqlServer == nil means we are in a local test. In this case
// we only need the minimum to make pgx happy.
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,10 @@ type planner struct {
// Do not use this object directly; use the BufferClientNotice() method
// instead.
noticeSender noticeSender
// ditto? idk.
// hooked up to the client comm. this is like a sink.
// TODO: this has to be thread safe wrt the conn and statements executing on it.
notificationSender notificationSender

queryCacheSession querycache.Session

Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sem/eval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ go_library(
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/pgwire/pgnotice",
"//pkg/sql/pgwire/pgnotification",
"//pkg/sql/pgwire/pgwirecancel",
"//pkg/sql/privilege",
"//pkg/sql/sem/builtins/builtinsregistry",
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/sem/eval/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotification"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -565,6 +566,10 @@ type ClientNoticeSender interface {
SendClientNotice(ctx context.Context, notice pgnotice.Notice) error
}

type ClientNotificationSender interface {
BufferClientNotification(ctx context.Context, notification pgnotification.Notification)
}

// DeferredRoutineSender allows a nested routine to send the information needed
// for its own evaluation to a parent routine. This is used to defer execution
// for tail-call optimization. It can only be used during local execution.
Expand Down
9 changes: 1 addition & 8 deletions pkg/sql/unlisten.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,10 @@ package sql
import (
"context"

"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
)

func (p *planner) Unlisten(ctx context.Context, n *tree.Unlisten) (planNode, error) {
p.BufferClientNotice(ctx,
pgerror.WithSeverity(
unimplemented.NewWithIssuef(41522, "CRDB does not support LISTEN, making UNLISTEN a no-op"),
"NOTICE",
),
)
delete(dummyNotificationListens, n.ChannelName.String())
return newZeroNode(nil /* columns */), nil
}

0 comments on commit 6be7d4d

Please sign in to comment.