Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
76279: sql,kvserver: stop gossiping the system config r=ajwerner a=ajwerner

The first commit removes the system config gossip trigger and the client code to set it,
after a version gate has been passed. The primary major change is that in 22.1 binaries, we
no longer rely on the gossip data at any point, instead we adopt the systemconfigwatcher which
is backed by a rangefeed. Part of this change is to adopt that separate data source in the
`GossipSubscription` implementation of the `Connector` API. 

Subsequently, a number of tests needed updating. 

Remove sql.catalog.unsafe_skip_system_config_trigger.enabled
This cluster setting is no longer useful.

Fixes #54477.
Fixes #70560.

Release note (sql change): The limitation that schema change statements in a transaction could not follow
DML statements has been lifted. 

76523: sql: add option to enable/disable txn id cache  r=Azhng a=Azhng

Reviewer note: only last commit is relevant

---

Resolves #76329

Release note (sql change): when `sql.contention.txn_id_cache.max_size`
is set to 0, it would effectively turn off transaction ID cache.

76613: ccl/sqlproxyccl: cleanup PG interceptor APIs r=JeffSwenson a=jaylim-crl

Informs #76000.

#### ccl/sqlproxyccl: update ForwardMsg to take in an io.Writer

Previously, interceptors connect a source to a destination. This can be
awkward during connection migration because we will now need to update the
destination for an existing interceptor. Adding an UpdateWriter (or similar)
does not seem right.

This PR changes how interceptors work. Instead of connecting one end to
another, interceptors are now one sided only. When attempting a write through
ForwardMsg, callers will need to pass in a destination of type io.Writer,
resulting in a much cleaner API when it comes to connection migration. We
will also remove WriteMsg in this commit. This makes interceptors purely
readers wrapping net.Conn objects.

Since interceptors no longer have destinations, keeping the closed field is
not very useful, so this commit removes that as well. A warning has been
added to ensure that callers do not reuse interceptors or destinations whenever
a ReadMsg or ForwardMsg call returns an error.

#### ccl/sqlproxyccl: replace errPanicWriter with a writer that returns an error

Previously, we had an errPanicWriter struct that panics whenever a Write
call was made. This is used because Receive on the pgproto3 backend and
frontend instances must not call Write. However, panics are not ideal, so
this commit replaces that with a regular writer that just returns an error
when Write is called.

#### ccl/sqlproxyccl: use a default buffer size of 8K within the interceptors 

Previously, we returned an error whenever callers attempt to create
interceptors with a small buffer size. This case is very uncommon, and the API
can be awkward since we now need to handle the error case. To address that,
this commit updates the interceptor's behavior such that we default to an 8K
buffer whenever a buffer size smaller than 5 bytes is used. Since sqlproxy is
the only user, this seems to be a reasonable tradeoff.

At the same time, we also make the specialized interceptors to default to
a buffer size of 8K bytes.

Release note: None

Co-authored-by: Andrew Werner <awerner32@gmail.com>
Co-authored-by: Azhng <archer.xn@gmail.com>
Co-authored-by: Jay <jay@cockroachlabs.com>
  • Loading branch information
4 people committed Feb 16, 2022
4 parents 05d9b84 + 0e45b5f + cb35150 + 6dce0bf commit 892c640
Show file tree
Hide file tree
Showing 85 changed files with 940 additions and 1,178 deletions.
4 changes: 2 additions & 2 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ server.web_session.purge.max_deletions_per_cycle integer 10 the maximum number o
server.web_session.purge.period duration 1h0m0s the time until old sessions are deleted
server.web_session.purge.ttl duration 1h0m0s if nonzero, entries in system.web_sessions older than this duration are periodically purged
server.web_session_timeout duration 168h0m0s the duration that a newly created web session will be valid
sql.contention.txn_id_cache.max_size byte size 64 MiB the maximum byte size TxnID cache will use
sql.contention.txn_id_cache.max_size byte size 64 MiB the maximum byte size TxnID cache will use (set to 0 to disable)
sql.cross_db_fks.enabled boolean false if true, creating foreign key references across databases is allowed
sql.cross_db_sequence_owners.enabled boolean false if true, creating sequences owned by tables from other databases is allowed
sql.cross_db_sequence_references.enabled boolean false if true, sequences referenced by tables from other databases are allowed
Expand Down Expand Up @@ -176,4 +176,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen
trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 21.2-64 set the active cluster version in the format '<major>.<minor>'
version version 21.2-66 set the active cluster version in the format '<major>.<minor>'
4 changes: 2 additions & 2 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
<tr><td><code>server.web_session.purge.period</code></td><td>duration</td><td><code>1h0m0s</code></td><td>the time until old sessions are deleted</td></tr>
<tr><td><code>server.web_session.purge.ttl</code></td><td>duration</td><td><code>1h0m0s</code></td><td>if nonzero, entries in system.web_sessions older than this duration are periodically purged</td></tr>
<tr><td><code>server.web_session_timeout</code></td><td>duration</td><td><code>168h0m0s</code></td><td>the duration that a newly created web session will be valid</td></tr>
<tr><td><code>sql.contention.txn_id_cache.max_size</code></td><td>byte size</td><td><code>64 MiB</code></td><td>the maximum byte size TxnID cache will use</td></tr>
<tr><td><code>sql.contention.txn_id_cache.max_size</code></td><td>byte size</td><td><code>64 MiB</code></td><td>the maximum byte size TxnID cache will use (set to 0 to disable)</td></tr>
<tr><td><code>sql.cross_db_fks.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if true, creating foreign key references across databases is allowed</td></tr>
<tr><td><code>sql.cross_db_sequence_owners.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if true, creating sequences owned by tables from other databases is allowed</td></tr>
<tr><td><code>sql.cross_db_sequence_references.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if true, sequences referenced by tables from other databases are allowed</td></tr>
Expand Down Expand Up @@ -188,6 +188,6 @@
<tr><td><code>trace.jaeger.agent</code></td><td>string</td><td><code></code></td><td>the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.</td></tr>
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-64</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-66</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ go_test(
"//pkg/cloud",
"//pkg/cloud/impl:cloudimpl",
"//pkg/clusterversion",
"//pkg/config",
"//pkg/gossip",
"//pkg/jobs",
"//pkg/jobs/jobspb",
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/helpers_tenant_shim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -115,3 +116,6 @@ func (t *testServerShim) Engines() []storage.Engine { panic(unsup
func (t *testServerShim) MetricsRecorder() *status.MetricsRecorder { panic(unsupportedShimMethod) }
func (t *testServerShim) CollectionFactory() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SpanConfigKVSubscriber() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SystemConfigProvider() config.SystemConfigProvider {
panic(unsupportedShimMethod)
}
17 changes: 11 additions & 6 deletions pkg/ccl/kvccl/kvtenantccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type Connector struct {
client *client
nodeDescs map[roachpb.NodeID]*roachpb.NodeDescriptor
systemConfig *config.SystemConfig
systemConfigChannels []chan<- struct{}
systemConfigChannels map[chan<- struct{}]struct{}
}

settingsMu struct {
Expand Down Expand Up @@ -140,6 +140,7 @@ func NewConnector(cfg kvtenant.ConnectorConfig, addrs []string) *Connector {
}

c.mu.nodeDescs = make(map[roachpb.NodeID]*roachpb.NodeDescriptor)
c.mu.systemConfigChannels = make(map[chan<- struct{}]struct{})
c.settingsMu.allTenantOverrides = make(map[string]settings.EncodedValue)
c.settingsMu.specificOverrides = make(map[string]settings.EncodedValue)
return c
Expand Down Expand Up @@ -250,7 +251,7 @@ var gossipSubsHandlers = map[string]func(*Connector, context.Context, string, ro
// Subscribe to all *NodeDescriptor updates.
gossip.MakePrefixPattern(gossip.KeyNodeIDPrefix): (*Connector).updateNodeAddress,
// Subscribe to a filtered view of *SystemConfig updates.
gossip.KeySystemConfig: (*Connector).updateSystemConfig,
gossip.KeyDeprecatedSystemConfig: (*Connector).updateSystemConfig,
}

var gossipSubsPatterns = func() []string {
Expand Down Expand Up @@ -322,7 +323,7 @@ func (c *Connector) updateSystemConfig(ctx context.Context, key string, content
c.mu.Lock()
defer c.mu.Unlock()
c.mu.systemConfig = cfg
for _, c := range c.mu.systemConfigChannels {
for c := range c.mu.systemConfigChannels {
select {
case c <- struct{}{}:
default:
Expand All @@ -342,20 +343,24 @@ func (c *Connector) GetSystemConfig() *config.SystemConfig {

// RegisterSystemConfigChannel implements the config.SystemConfigProvider
// interface.
func (c *Connector) RegisterSystemConfigChannel() <-chan struct{} {
func (c *Connector) RegisterSystemConfigChannel() (_ <-chan struct{}, unregister func()) {
// Create channel that receives new system config notifications. The channel
// has a size of 1 to prevent connector from having to block on it.
ch := make(chan struct{}, 1)

c.mu.Lock()
defer c.mu.Unlock()
c.mu.systemConfigChannels = append(c.mu.systemConfigChannels, ch)
c.mu.systemConfigChannels[ch] = struct{}{}

// Notify the channel right away if we have a config.
if c.mu.systemConfig != nil {
ch <- struct{}{}
}
return ch
return ch, func() {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.mu.systemConfigChannels, ch)
}
}

// RangeLookup implements the kvcoord.RangeDescriptorDB interface.
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/kvccl/kvtenantccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ func gossipEventForSystemConfig(cfg *config.SystemConfigEntries) *roachpb.Gossip
panic(err)
}
return &roachpb.GossipSubscriptionEvent{
Key: gossip.KeySystemConfig,
Key: gossip.KeyDeprecatedSystemConfig,
Content: roachpb.MakeValueFromBytesAndTimestamp(val, hlc.Timestamp{}),
PatternMatched: gossip.KeySystemConfig,
PatternMatched: gossip.KeyDeprecatedSystemConfig,
}
}

Expand Down Expand Up @@ -252,7 +252,7 @@ func TestConnectorGossipSubscription(t *testing.T) {
// Test config.SystemConfigProvider impl. Should not have a SystemConfig yet.
sysCfg := c.GetSystemConfig()
require.Nil(t, sysCfg)
sysCfgC := c.RegisterSystemConfigChannel()
sysCfgC, _ := c.RegisterSystemConfigChannel()
require.Len(t, sysCfgC, 0)

// Return first SystemConfig response.
Expand Down Expand Up @@ -282,7 +282,7 @@ func TestConnectorGossipSubscription(t *testing.T) {
require.Equal(t, sysCfgEntriesUp.Values, sysCfg.Values)

// A newly registered SystemConfig channel will be immediately notified.
sysCfgC2 := c.RegisterSystemConfigChannel()
sysCfgC2, _ := c.RegisterSystemConfigChannel()
require.Len(t, sysCfgC2, 1)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2234,7 +2234,7 @@ ALTER TABLE t65064 INJECT STATISTICS '[
}
]';

query T
query T retry
SELECT * FROM [EXPLAIN SELECT * FROM t65064 WHERE username = 'kharris'] OFFSET 2
----
·
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/spanconfigccl/spanconfigcomparedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ go_test(
"//pkg/base",
"//pkg/ccl/kvccl/kvtenantccl",
"//pkg/ccl/utilccl",
"//pkg/gossip",
"//pkg/jobs",
"//pkg/roachpb",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/server/systemconfigwatcher",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigtestutils",
"//pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster",
Expand Down
33 changes: 19 additions & 14 deletions pkg/ccl/spanconfigccl/spanconfigcomparedccl/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
_ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigtestutils"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster"
Expand Down Expand Up @@ -106,14 +106,15 @@ func TestDataDriven(t *testing.T) {
{
tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0))
tdb.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`)
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`)
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '20ms'`)
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '20ms'`)
}

spanConfigTestCluster := spanconfigtestcluster.NewHandle(t, tc, scKnobs, nil /* ptsKnobs */)
defer spanConfigTestCluster.Cleanup()

kvSubscriber := tc.Server(0).SpanConfigKVSubscriber().(spanconfig.KVSubscriber)
underlyingGossip := tc.Server(0).GossipI().(*gossip.Gossip)
systemConfig := tc.Server(0).SystemConfigProvider().(*systemconfigwatcher.Cache)

systemTenant := spanConfigTestCluster.InitializeTenant(ctx, roachpb.SystemTenantID)
datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
Expand Down Expand Up @@ -161,15 +162,19 @@ func TestDataDriven(t *testing.T) {
// (i) reconciliation processes;
// (ii) tenant initializations (where seed span configs are
// installed).
now := systemTenant.Clock().Now()
testutils.SucceedsSoon(t, func() error {
lastUpdated := kvSubscriber.LastUpdated()
if lastUpdated.Less(now) {
return errors.Newf("kvsubscriber last updated timestamp (%s) lagging barrier timestamp (%s)",
lastUpdated.GoTime(), now.GoTime())
}
return nil
})
checkLastUpdated := func(t *testing.T, n string, c interface{ LastUpdated() hlc.Timestamp }) {
now := systemTenant.Clock().Now()
testutils.SucceedsSoon(t, func() error {
lastUpdated := c.LastUpdated()
if lastUpdated.Less(now) {
return errors.Newf("%s last updated timestamp (%s) lagging barrier timestamp (%s)",
n, lastUpdated.GoTime(), now.GoTime())
}
return nil
})
}
checkLastUpdated(t, "kvsubscriber", kvSubscriber)
checkLastUpdated(t, "systemconfigwatcher", systemConfig)

// As for the gossiped system config span, because we're using a
// single node cluster there's no additional timestamp
Expand Down Expand Up @@ -221,7 +226,7 @@ func TestDataDriven(t *testing.T) {

var reader spanconfig.StoreReader
if version == "legacy" {
reader = underlyingGossip.GetSystemConfig()
reader = systemConfig.GetSystemConfig()
} else {
reader = kvSubscriber
}
Expand All @@ -230,7 +235,7 @@ func TestDataDriven(t *testing.T) {
return spanconfigtestutils.MaybeLimitAndOffset(t, d, "...", data)

case "diff":
var before, after spanconfig.StoreReader = underlyingGossip.GetSystemConfig(), kvSubscriber
var before, after spanconfig.StoreReader = systemConfig.GetSystemConfig(), kvSubscriber
diff, err := difflib.GetUnifiedDiffString(difflib.UnifiedDiff{
A: difflib.SplitLines(spanconfigtestutils.GetSplitPoints(ctx, t, before).String()),
B: difflib.SplitLines(spanconfigtestutils.GetSplitPoints(ctx, t, after).String()),
Expand Down
32 changes: 8 additions & 24 deletions pkg/ccl/sqlproxyccl/interceptor/backend_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,10 @@ import (
// BackendInterceptor is a server int/erceptor for the Postgres backend protocol.
type BackendInterceptor pgInterceptor

// NewBackendInterceptor creates a BackendInterceptor. bufSize must be at least
// the size of a pgwire message header.
func NewBackendInterceptor(src io.Reader, dst io.Writer, bufSize int) (*BackendInterceptor, error) {
pgi, err := newPgInterceptor(src, dst, bufSize)
if err != nil {
return nil, err
}
return (*BackendInterceptor)(pgi), nil
// NewBackendInterceptor creates a BackendInterceptor using the default buffer
// size of 8K bytes.
func NewBackendInterceptor(src io.Reader) *BackendInterceptor {
return (*BackendInterceptor)(newPgInterceptor(src, defaultBufferSize))
}

// PeekMsg returns the header of the current pgwire message without advancing
Expand All @@ -37,13 +33,6 @@ func (bi *BackendInterceptor) PeekMsg() (typ pgwirebase.ClientMessageType, size
return pgwirebase.ClientMessageType(byteType), size, err
}

// WriteMsg writes the given bytes to the writer dst.
//
// See pgInterceptor.WriteMsg for more information.
func (bi *BackendInterceptor) WriteMsg(data pgproto3.FrontendMessage) (n int, err error) {
return (*pgInterceptor)(bi).WriteMsg(data.Encode(nil))
}

// ReadMsg decodes the current pgwire message and returns a FrontendMessage.
// This also advances the interceptor to the next message.
//
Expand All @@ -53,19 +42,14 @@ func (bi *BackendInterceptor) ReadMsg() (msg pgproto3.FrontendMessage, err error
if err != nil {
return nil, err
}
// errPanicWriter is used here because Receive must not Write.
return pgproto3.NewBackend(newChunkReader(msgBytes), &errPanicWriter{}).Receive()
// errWriter is used here because Receive must not Write.
return pgproto3.NewBackend(newChunkReader(msgBytes), &errWriter{}).Receive()
}

// ForwardMsg sends the current pgwire message to the destination without any
// decoding, and advances the interceptor to the next message.
//
// See pgInterceptor.ForwardMsg for more information.
func (bi *BackendInterceptor) ForwardMsg() (n int, err error) {
return (*pgInterceptor)(bi).ForwardMsg()
}

// Close closes the interceptor, and prevents further operations on it.
func (bi *BackendInterceptor) Close() {
(*pgInterceptor)(bi).Close()
func (bi *BackendInterceptor) ForwardMsg(dst io.Writer) (n int, err error) {
return (*pgInterceptor)(bi).ForwardMsg(dst)
}
Loading

0 comments on commit 892c640

Please sign in to comment.