Skip to content

Commit

Permalink
Merge #32366
Browse files Browse the repository at this point in the history
32366: sql: add a sql.defaults.conn_results_buffer_size cluster setting r=andreimatei a=andreimatei

... controlling the buffering of results on (all) future connections.
This has come up with users enough to warrant it: some would rather
buffer more to get more auto-retries.
Controlling this for each connection individually is not done.

Release note: None

Co-authored-by: Andrei Matei <andrei@cockroachlabs.com>
  • Loading branch information
craig[bot] and andreimatei committed Nov 26, 2018
2 parents 169140a + f25ac68 commit 9e75806
Show file tree
Hide file tree
Showing 14 changed files with 96 additions and 64 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
<tr><td><code>sql.defaults.distsql</code></td><td>enumeration</td><td><code>1</code></td><td>default distributed SQL execution mode [off = 0, auto = 1, on = 2, 2.0-off = 3, 2.0-auto = 4]</td></tr>
<tr><td><code>sql.defaults.experimental_vectorize</code></td><td>boolean</td><td><code>false</code></td><td>default experimental_vectorize mode</td></tr>
<tr><td><code>sql.defaults.optimizer</code></td><td>enumeration</td><td><code>1</code></td><td>default cost-based optimizer mode [off = 0, on = 1, local = 2]</td></tr>
<tr><td><code>sql.defaults.results_buffer.size</code></td><td>byte size</td><td><code>16 KiB</code></td><td>size of the buffer that accumulates results for a statement or a batch of statements before they are sent to the client. Note that auto-retries generally only happen while no results have been delivered to the client, so reducing this size can increase the number of retriable errors a client receives. On the other hand, increasing the buffer size can increase the delay until the client receives the first result row. Updating the setting only affects new connections. Setting to 0 disables any buffering.</td></tr>
<tr><td><code>sql.defaults.serial_normalization</code></td><td>enumeration</td><td><code>0</code></td><td>default handling of SERIAL in table definitions [rowid = 0, virtual_sequence = 1, sql_sequence = 2]</td></tr>
<tr><td><code>sql.distsql.distribute_index_joins</code></td><td>boolean</td><td><code>true</code></td><td>if set, for index joins we instantiate a join reader on every node that has a stream; if not set, we use a single join reader</td></tr>
<tr><td><code>sql.distsql.flow_stream_timeout</code></td><td>duration</td><td><code>10s</code></td><td>amount of time incoming streams wait for a flow to be set up before erroring out</td></tr>
Expand Down
5 changes: 0 additions & 5 deletions pkg/base/test_server_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,6 @@ type TestServerArgs struct {
// If set, web session authentication will be disabled, even if the server
// is running in secure mode.
DisableWebSessionAuthentication bool

// ConnResultsBufferBytes is the size of the buffer in which each connection
// accumulates results set. Results are flushed to the network when this
// buffer overflows.
ConnResultsBufferBytes int
}

// TestClusterArgs contains the parameters one can set when creating a test
Expand Down
25 changes: 19 additions & 6 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,17 +686,30 @@ func sinklessTest(testFn func(*testing.T, *gosql.DB, testfeedFactory)) func(*tes
ctx := context.Background()
knobs := base.TestingKnobs{DistSQL: &distsqlrun.TestingKnobs{Changefeed: &TestingKnobs{}}}
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
UseDatabase: "d",
Knobs: knobs,
// TODO(dan): HACK until the changefeed can control pgwire flushing.
ConnResultsBufferBytes: 1,
Knobs: knobs,
})
defer s.Stopper().Stop(ctx)
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.experimental_poll_interval = '0ns'`)
// TODO(dan): HACK until the changefeed can control pgwire flushing.
sqlDB.Exec(t, `SET CLUSTER SETTING sql.defaults.results_buffer.size = '0'`)
sqlDB.Exec(t, `CREATE DATABASE d`)
f := makeSinkless(s, db)
testFn(t, db, f)

// Now that we've updated sql.defaults.results_buffer.size, open a new
// conn pool so that connections use the new setting.
pgURL, cleanupFunc := sqlutils.PGUrl(
t, s.ServingAddr(), "sinklessTest" /* prefix */, url.User(security.RootUser),
)
defer cleanupFunc()
pgURL.Path = "d"
noBufferDB, err := gosql.Open("postgres", pgURL.String())
if err != nil {
t.Fatal(err)
}
defer noBufferDB.Close()

f := makeSinkless(s, noBufferDB)
testFn(t, noBufferDB, f)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/server/authentication.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func (s *authenticationServer) verifyPassword(
ctx context.Context, username string, password string,
) (bool, error) {
exists, hashedPassword, err := sql.GetUserHashedPassword(
ctx, s.server.execCfg, s.memMetrics, username,
ctx, s.server.execCfg.InternalExecutor, s.memMetrics, username,
)
if err != nil {
return false, err
Expand Down
11 changes: 0 additions & 11 deletions pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ const (
minimumNetworkFileDescriptors = 256
recommendedNetworkFileDescriptors = 5000

defaultConnResultsBufferBytes = 16 << 10 // 16 KiB

defaultSQLTableStatCacheSize = 256
)

Expand Down Expand Up @@ -254,11 +252,6 @@ type Config struct {
// the Admin API's HTTP endpoints.
EnableWebSessionAuthentication bool

// ConnResultsBufferBytes is the size of the buffer in which each connection
// accumulates results set. Results are flushed to the network when this
// buffer overflows.
ConnResultsBufferBytes int

enginesCreated bool
}

Expand Down Expand Up @@ -343,10 +336,6 @@ func MakeConfig(ctx context.Context, st *cluster.Settings) Config {
},
TempStorageConfig: base.TempStorageConfigFromEnv(
ctx, st, storeSpec, "" /* parentDir */, base.DefaultTempStorageMaxSizeBytes, 0),
// TODO(dan): Hack. Remove this env override once changefeeds have
// control over buffering.
ConnResultsBufferBytes: envutil.EnvOrDefaultInt(
"COCKROACH_CONN_RESULTS_BUFFER_BYTES", defaultConnResultsBufferBytes),
}
cfg.AmbientCtx.Tracer = st.Tracer

Expand Down
2 changes: 0 additions & 2 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,8 +631,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
AuditLogger: log.NewSecondaryLogger(
s.cfg.SQLAuditLogDirName, "sql-audit", true /*enableGc*/, true, /*forceSyncWrites*/
),

ConnResultsBufferBytes: s.cfg.ConnResultsBufferBytes,
}

if sqlSchemaChangerTestingKnobs := s.cfg.TestingKnobs.SQLSchemaChanger; sqlSchemaChangerTestingKnobs != nil {
Expand Down
5 changes: 0 additions & 5 deletions pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,6 @@ func makeTestConfigFromParams(params base.TestServerArgs) Config {
cfg.TestingKnobs.Store = &storage.StoreTestingKnobs{}
}
cfg.TestingKnobs.Store.(*storage.StoreTestingKnobs).SkipMinSizeCheck = true

if params.ConnResultsBufferBytes != 0 {
cfg.ConnResultsBufferBytes = params.ConnResultsBufferBytes
}

return cfg
}

Expand Down
3 changes: 0 additions & 3 deletions pkg/sql/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,9 +439,6 @@ SELECT * FROM t.kv%d
func TestCreateStatementType(t *testing.T) {
defer leaktest.AfterTest(t)()
s, _, _ := serverutils.StartServer(t, base.TestServerArgs{
// Make the connections' results buffers really small so that it overflows
// when we produce a few results.
ConnResultsBufferBytes: 10,
// Andrei is too lazy to figure out the incantation for telling pgx about
// our test certs.
Insecure: true,
Expand Down
5 changes: 0 additions & 5 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,6 @@ type ExecutorConfig struct {
// Caches updated by DistSQL.
RangeDescriptorCache *kv.RangeDescriptorCache
LeaseHolderCache *kv.LeaseHolderCache

// ConnResultsBufferBytes is the size of the buffer in which each connection
// accumulates results set. Results are flushed to the network when this
// buffer overflows.
ConnResultsBufferBytes int
}

// Organization returns the value of cluster.organization.
Expand Down
33 changes: 18 additions & 15 deletions pkg/sql/pgwire/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ const (
type conn struct {
conn net.Conn

sessionArgs sql.SessionArgs
execCfg *sql.ExecutorConfig
metrics *ServerMetrics
sessionArgs sql.SessionArgs
resultsBufferBytes int64
metrics *ServerMetrics

// rd is a buffered reader consuming conn. All reads from conn go through
// this.
Expand Down Expand Up @@ -136,11 +136,12 @@ func serveConn(
ctx context.Context,
netConn net.Conn,
sArgs sql.SessionArgs,
resultsBufferBytes int64,
metrics *ServerMetrics,
reserved mon.BoundAccount,
sqlServer *sql.Server,
draining func() bool,
execCfg *sql.ExecutorConfig,
ie *sql.InternalExecutor,
stopper *stop.Stopper,
insecure bool,
) error {
Expand All @@ -150,9 +151,9 @@ func serveConn(
log.Infof(ctx, "new connection with options: %+v", sArgs)
}

c := newConn(netConn, sArgs, metrics, execCfg)
c := newConn(netConn, sArgs, metrics, resultsBufferBytes)

if err := c.handleAuthentication(ctx, insecure); err != nil {
if err := c.handleAuthentication(ctx, insecure, ie); err != nil {
_ = c.conn.Close()
reserved.Close(ctx)
return err
Expand All @@ -164,14 +165,14 @@ func serveConn(
}

func newConn(
netConn net.Conn, sArgs sql.SessionArgs, metrics *ServerMetrics, execCfg *sql.ExecutorConfig,
netConn net.Conn, sArgs sql.SessionArgs, metrics *ServerMetrics, resultsBufferBytes int64,
) *conn {
c := &conn{
conn: netConn,
sessionArgs: sArgs,
execCfg: execCfg,
metrics: metrics,
rd: *bufio.NewReader(netConn),
conn: netConn,
sessionArgs: sArgs,
resultsBufferBytes: resultsBufferBytes,
metrics: metrics,
rd: *bufio.NewReader(netConn),
}
c.stmtBuf.Init()
c.writerState.fi.buf = &c.writerState.buf
Expand Down Expand Up @@ -1095,7 +1096,7 @@ func (c *conn) Flush(pos sql.CmdPos) error {
// maybeFlush flushes the buffer to the network connection if it exceeded
// connResultsBufferSizeBytes.
func (c *conn) maybeFlush(pos sql.CmdPos) (bool, error) {
if c.writerState.buf.Len() <= c.execCfg.ConnResultsBufferBytes {
if int64(c.writerState.buf.Len()) <= c.resultsBufferBytes {
return false, nil
}
return true, c.Flush(pos)
Expand Down Expand Up @@ -1261,7 +1262,9 @@ func (r *pgwireReader) ReadByte() (byte, error) {
// name, if different from the one given initially. Note: at this
// point the sql.Session does not exist yet! If need exists to access the
// database to look up authentication data, use the internal executor.
func (c *conn) handleAuthentication(ctx context.Context, insecure bool) error {
func (c *conn) handleAuthentication(
ctx context.Context, insecure bool, ie *sql.InternalExecutor,
) error {
sendError := func(err error) error {
_ /* err */ = writeErr(err, &c.msgBuilder, c.conn)
return err
Expand All @@ -1270,7 +1273,7 @@ func (c *conn) handleAuthentication(ctx context.Context, insecure bool) error {
// Check that the requested user exists and retrieve the hashed
// password in case password authentication is needed.
exists, hashedPassword, err := sql.GetUserHashedPassword(
ctx, c.execCfg, &c.metrics.SQLMemMetrics, c.sessionArgs.User,
ctx, ie, &c.metrics.SQLMemMetrics, c.sessionArgs.User,
)
if err != nil {
return sendError(err)
Expand Down
34 changes: 28 additions & 6 deletions pkg/sql/pgwire/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ package pgwire
import (
"bytes"
"context"
gosql "database/sql"
"io"
"io/ioutil"
"net"
"net/url"
"strconv"
"strings"
"sync"
Expand All @@ -31,6 +33,7 @@ import (
"golang.org/x/sync/errgroup"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase"
Expand Down Expand Up @@ -329,7 +332,7 @@ func waitForClientConn(ln net.Listener) (*conn, error) {
}

metrics := makeServerMetrics(sql.MemoryMetrics{} /* sqlMemMetrics */, metric.TestSampleInterval)
pgwireConn := newConn(conn, sql.SessionArgs{}, &metrics, &sql.ExecutorConfig{})
pgwireConn := newConn(conn, sql.SessionArgs{}, &metrics, 16<<10 /* resultsBufferBytes */)
return pgwireConn, nil
}

Expand Down Expand Up @@ -649,17 +652,31 @@ var _ pgx.Logger = pgxTestLogger{}
func TestConnClose(t *testing.T) {
defer leaktest.AfterTest(t)()
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
// Make the connections' results buffers really small so that it overflows
// when we produce a few results.
ConnResultsBufferBytes: 10,
// Andrei is too lazy to figure out the incantation for telling pgx about
// our test certs.
Insecure: true,
})
ctx := context.TODO()
defer s.Stopper().Stop(ctx)

r := sqlutils.MakeSQLRunner(db)
// Disable results buffering.
if _, err := db.Exec(
`SET CLUSTER SETTING sql.defaults.results_buffer.size = '0'`,
); err != nil {
t.Fatal(err)
}
pgURL, cleanupFunc := sqlutils.PGUrl(
t, s.ServingAddr(), "testConnClose" /* prefix */, url.User(security.RootUser),
)
pgURL.RawQuery = "sslmode=disable"
defer cleanupFunc()
noBufferDB, err := gosql.Open("postgres", pgURL.String())
if err != nil {
t.Fatal(err)
}
defer noBufferDB.Close()

r := sqlutils.MakeSQLRunner(noBufferDB)
r.Exec(t, "CREATE DATABASE test")
r.Exec(t, "CREATE TABLE test.test AS SELECT * FROM generate_series(1,100)")

Expand Down Expand Up @@ -764,7 +781,12 @@ func TestMaliciousInputs(t *testing.T) {
sqlMetrics := sql.MakeMemMetrics("test" /* endpoint */, time.Second /* histogramWindow */)
metrics := makeServerMetrics(sqlMetrics, time.Second /* histogramWindow */)

conn := newConn(r, sql.SessionArgs{}, &metrics, nil /* execCfg */)
conn := newConn(
r, sql.SessionArgs{}, &metrics,
// resultsBufferBytes - really small so that it overflows when we
// produce a few results.
10,
)
// Ignore the error from serveImpl. There might be one when the client
// sends malformed input.
_ /* err */ = conn.serveImpl(
Expand Down
28 changes: 26 additions & 2 deletions pkg/sql/pgwire/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pkg/errors"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
Expand All @@ -41,6 +42,26 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

// ATTENTION: After changing this value in a unit test, you probably want to
// open a new connection pool since the connections in the existing one are not
// affected.
//
// TODO(andrei): This setting is under "sql.defaults", but there's no way to
// control the setting on a per-connection basis. We should introduce a
// corresponding session variable.
var connResultsBufferSize = settings.RegisterByteSizeSetting(
"sql.defaults.results_buffer.size",
"size of the buffer that accumulates results for a statement or a batch "+
"of statements before they are sent to the client. Note that auto-retries "+
"generally only happen while no results have been delivered to the client, so "+
"reducing this size can increase the number of retriable errors a client "+
"receives. On the other hand, increasing the buffer size can increase the "+
"delay until the client receives the first result row. "+
"Updating the setting only affects new connections. "+
"Setting to 0 disables any buffering.",
16<<10, // 16 KiB
)

const (
// ErrSSLRequired is returned when a client attempts to connect to a
// secure server in cleartext.
Expand Down Expand Up @@ -452,8 +473,11 @@ func (s *Server) ServeConn(ctx context.Context, conn net.Conn) error {
return errors.Errorf("unable to pre-allocate %d bytes for this connection: %v",
baseSQLMemoryBudget, err)
}
return serveConn(ctx, conn, sArgs, &s.metrics, reserved, s.SQLServer,
s.IsDraining, s.execCfg, s.stopper, s.cfg.Insecure)
return serveConn(
ctx, conn, sArgs,
connResultsBufferSize.Get(&s.execCfg.Settings.SV),
&s.metrics, reserved, s.SQLServer,
s.IsDraining, s.execCfg.InternalExecutor, s.stopper, s.cfg.Insecure)
}

func parseOptions(ctx context.Context, data []byte) (sql.SessionArgs, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
// GetUserHashedPassword returns the hashedPassword for the given username if
// found in system.users.
func GetUserHashedPassword(
ctx context.Context, execCfg *ExecutorConfig, metrics *MemoryMetrics, username string,
ctx context.Context, ie *InternalExecutor, metrics *MemoryMetrics, username string,
) (bool, []byte, error) {
normalizedUsername := tree.Name(username).Normalize()
// Always return no password for the root user, even if someone manually inserts one.
Expand All @@ -37,7 +37,7 @@ func GetUserHashedPassword(

const getHashedPassword = `SELECT "hashedPassword" FROM system.users ` +
`WHERE username=$1 AND "isRole" = false`
values, err := execCfg.InternalExecutor.QueryRow(
values, err := ie.QueryRow(
ctx, "get-hashed-pwd", nil /* txn */, getHashedPassword, normalizedUsername)
if err != nil {
return false, nil, errors.Wrapf(err, "error looking up user %s", normalizedUsername)
Expand Down
2 changes: 1 addition & 1 deletion pkg/testutils/serverutils/test_server_shim.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func StartServer(
}

pgURL, cleanupGoDB := sqlutils.PGUrl(
t, server.ServingAddr(), "StartServer", url.User(security.RootUser))
t, server.ServingAddr(), "StartServer" /* prefix */, url.User(security.RootUser))
pgURL.Path = params.UseDatabase
if params.Insecure {
pgURL.RawQuery = "sslmode=disable"
Expand Down

0 comments on commit 9e75806

Please sign in to comment.