Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pgwire: lift the PreServe() call to the server package #92578

Merged
merged 16 commits into from
Jan 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 68 additions & 47 deletions pkg/cli/democluster/demo_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,6 @@ type transientCluster struct {
tenantServers []serverutils.TestTenantInterface
defaultDB string

httpFirstPort int
sqlFirstPort int
rpcFirstPort int

adminPassword string
adminUser username.SQLUsername

Expand Down Expand Up @@ -205,26 +201,6 @@ func NewDemoCluster(
}
}

c.httpFirstPort = c.demoCtx.HTTPPort
c.sqlFirstPort = c.demoCtx.SQLPort
// +100 is the offset we've chosen to recommend to separate the SQL
// port from the RPC port when we deprecated the use of merged
// ports.
c.rpcFirstPort = c.demoCtx.SQLPort + 100
if c.demoCtx.Multitenant {
// This allows the first secondary tenant server to get the
// desired ports (i.e., those configured by --http-port or
// --sql-port, or the default) without conflicting with the system
// tenant.
// Note: this logic can be removed once we use a single
// listener for HTTP and SQL.
if c.demoCtx.DisableServerController {
c.httpFirstPort += c.demoCtx.NumNodes
}
c.sqlFirstPort += c.demoCtx.NumNodes
c.rpcFirstPort += c.demoCtx.NumNodes
}

c.stickyEngineRegistry = server.NewStickyInMemEnginesRegistry()
return c, nil
}
Expand Down Expand Up @@ -449,8 +425,8 @@ func (c *transientCluster) Start(ctx context.Context) (err error) {
args.SSLCertsDir = c.demoDir
args.DisableTLSForHTTP = true
args.EnableDemoLoginEndpoint = true
args.StartingRPCAndSQLPort = c.demoCtx.SQLPort - secondaryTenantID + i
args.StartingHTTPPort = c.demoCtx.HTTPPort - secondaryTenantID + i
args.StartingRPCAndSQLPort = c.demoCtx.sqlPort(i, true) - secondaryTenantID
args.StartingHTTPPort = c.demoCtx.httpPort(i, true) - secondaryTenantID
args.Locality = c.demoCtx.Localities[i]
}

Expand Down Expand Up @@ -586,9 +562,6 @@ func (c *transientCluster) createAndAddNode(
}
args := c.demoCtx.testServerArgsForTransientCluster(
c.sockForServer(idx), idx, joinAddr, c.demoDir,
c.sqlFirstPort,
c.rpcFirstPort,
c.httpFirstPort,
c.stickyEngineRegistry,
)
if idx == 0 {
Expand Down Expand Up @@ -795,14 +768,72 @@ func (c *transientCluster) waitForSQLReadiness(
return nil
}

func (demoCtx *Context) sqlPort(serverIdx int, forSecondaryTenant bool) int {
if demoCtx.SQLPort == 0 || testingForceRandomizeDemoPorts {
return 0
}
if !demoCtx.Multitenant {
// No multitenancy: just one port per node.
return demoCtx.SQLPort + serverIdx
}
if forSecondaryTenant {
// The port number of the secondary tenant is always
// the "base" port number.
return demoCtx.SQLPort + serverIdx
}
// System tenant.

// Currently using a separate SQL listener. System tenant uses port number
// offset by NumNodes.
return demoCtx.SQLPort + serverIdx + demoCtx.NumNodes
}

func (demoCtx *Context) httpPort(serverIdx int, forSecondaryTenant bool) int {
if demoCtx.HTTPPort == 0 || testingForceRandomizeDemoPorts {
return 0
}
if !demoCtx.Multitenant {
// No multitenancy: just one port per node.
return demoCtx.HTTPPort + serverIdx
}
if forSecondaryTenant {
// The port number of the secondary tenant is always
// the "base" port number.
return demoCtx.HTTPPort + serverIdx
}
// System tenant.
if !demoCtx.DisableServerController {
// Using server controller: same port for app and system tenant.
return demoCtx.HTTPPort + serverIdx
}
// Not using server controller: http port is offset by number of nodes.
return demoCtx.HTTPPort + serverIdx + demoCtx.NumNodes
}

func (demoCtx *Context) rpcPort(serverIdx int, forSecondaryTenant bool) int {
if demoCtx.SQLPort == 0 || testingForceRandomizeDemoPorts {
return 0
}
// +100 is the offset we've chosen to recommend to separate the SQL
// port from the RPC port when we deprecated the use of merged
// ports.
if !demoCtx.Multitenant {
return demoCtx.SQLPort + serverIdx + 100
}
if forSecondaryTenant {
return demoCtx.SQLPort + serverIdx + 100
}
// System tenant.
return demoCtx.SQLPort + serverIdx + demoCtx.NumNodes + 100
}

// testServerArgsForTransientCluster creates the test arguments for
// a necessary server in the demo cluster.
func (demoCtx *Context) testServerArgsForTransientCluster(
sock unixSocketDetails,
serverIdx int,
joinAddr string,
demoDir string,
sqlBasePort, rpcBasePort, httpBasePort int,
stickyEngineRegistry server.StickyInMemEnginesRegistry,
) base.TestServerArgs {
// Assign a path to the store spec, to be saved.
Expand Down Expand Up @@ -841,23 +872,10 @@ func (demoCtx *Context) testServerArgsForTransientCluster(
// `make stress`. This is bound to not work with fixed ports.
// So by default we use :0 to auto-allocate ports.
args.Addr = "127.0.0.1:0"
if !testingForceRandomizeDemoPorts {
sqlPort := sqlBasePort + serverIdx
if sqlBasePort == 0 {
sqlPort = 0
}
rpcPort := rpcBasePort + serverIdx
if rpcBasePort == 0 {
rpcPort = 0
}
httpPort := httpBasePort + serverIdx
if httpBasePort == 0 {
httpPort = 0
}
if sqlPort := demoCtx.sqlPort(serverIdx, false /* forSecondaryTenant */); sqlPort != 0 {
rpcPort := demoCtx.rpcPort(serverIdx, false)
args.Addr = fmt.Sprintf("127.0.0.1:%d", rpcPort)
args.SQLAddr = fmt.Sprintf("127.0.0.1:%d", sqlPort)
args.HTTPAddr = fmt.Sprintf("127.0.0.1:%d", httpPort)

if !demoCtx.DisableServerController {
// The code in NewDemoCluster put the KV ports higher
// so we need to subtract the number of nodes to get
Expand All @@ -867,6 +885,9 @@ func (demoCtx *Context) testServerArgsForTransientCluster(
args.SecondaryTenantPortOffset = -(demoCtx.NumNodes + 1)
}
}
if httpPort := demoCtx.httpPort(serverIdx, false /* forSecondaryTenant */); httpPort != 0 {
args.HTTPAddr = fmt.Sprintf("127.0.0.1:%d", httpPort)
}

if demoCtx.Localities != nil {
args.Locality = demoCtx.Localities[serverIdx]
Expand Down Expand Up @@ -1074,7 +1095,7 @@ func (c *transientCluster) startServerInternal(
c.sockForServer(serverIdx),
serverIdx,
c.firstServer.ServingRPCAddr(), c.demoDir,
c.sqlFirstPort, c.rpcFirstPort, c.httpFirstPort, c.stickyEngineRegistry)
c.stickyEngineRegistry)
s, err := server.TestServerFactory.New(args)
if err != nil {
return 0, err
Expand Down Expand Up @@ -1707,7 +1728,7 @@ func (c *transientCluster) sockForServer(serverIdx int) unixSocketDetails {
if !c.useSockets {
return unixSocketDetails{}
}
port := strconv.Itoa(c.sqlFirstPort + serverIdx)
port := strconv.Itoa(c.demoCtx.sqlPort(serverIdx, false))
databaseName := c.defaultDB
if c.demoCtx.Multitenant {
// TODO(knz): for now, we only define the unix socket for the
Expand Down
13 changes: 7 additions & 6 deletions pkg/cli/democluster/demo_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestTestServerArgsForTransientCluster(t *testing.T) {
PartOfCluster: true,
JoinAddr: "127.0.0.1",
DisableTLSForHTTP: true,
Addr: "127.0.0.1:7890",
Addr: "127.0.0.1:1334",
SQLAddr: "127.0.0.1:1234",
HTTPAddr: "127.0.0.1:4567",
SecondaryTenantPortOffset: -2,
Expand All @@ -95,7 +95,7 @@ func TestTestServerArgsForTransientCluster(t *testing.T) {
DisableDefaultTestTenant: true,
PartOfCluster: true,
JoinAddr: "127.0.0.1",
Addr: "127.0.0.1:7892",
Addr: "127.0.0.1:1336",
SQLAddr: "127.0.0.1:1236",
HTTPAddr: "127.0.0.1:4569",
SecondaryTenantPortOffset: -2,
Expand All @@ -118,8 +118,9 @@ func TestTestServerArgsForTransientCluster(t *testing.T) {
demoCtx := newDemoCtx()
demoCtx.SQLPoolMemorySize = tc.sqlPoolMemorySize
demoCtx.CacheSize = tc.cacheSize

actual := demoCtx.testServerArgsForTransientCluster(unixSocketDetails{}, tc.serverIdx, tc.joinAddr, "", 1234, 7890, 4567, stickyEnginesRegistry)
demoCtx.SQLPort = 1234
demoCtx.HTTPPort = 4567
actual := demoCtx.testServerArgsForTransientCluster(unixSocketDetails{}, tc.serverIdx, tc.joinAddr, "", stickyEnginesRegistry)
stopper := actual.Stopper
defer stopper.Stop(context.Background())

Expand Down Expand Up @@ -259,8 +260,8 @@ func TestTransientClusterMultitenant(t *testing.T) {

// This test is too slow to complete under the race detector, sometimes.
skip.UnderRace(t)
skip.UnderStress(t)
skip.WithIssue(t, 94862)

defer TestingForceRandomizeDemoPorts()()

demoCtx := newDemoCtx()
// Set up an empty 3-node cluster with tenants on each node.
Expand Down
3 changes: 3 additions & 0 deletions pkg/cli/interactive_tests/test_copy.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,11 @@ start_test "Check EOF and \. works as appropriate during COPY"
send "COPY t FROM STDIN CSV;\r"
eexpect ">>"
send "1,text with semicolon;\r"
eexpect ">>"
send "2,beat chef@;\r"
eexpect ">>"
send "3,more&text\r"
eexpect ">>"
send "\\.\r"

eexpect "COPY 3"
Expand Down
Loading