Skip to content

Commit

Permalink
server: use a shared HTTP listener for all in-memory tenant servers
Browse files Browse the repository at this point in the history
This commit introduces a HTTP (de)multiplexer for all in-memory tenant
servers.

By default, HTTP requests are routed to the system tenant server.
Thsi can be overridden with the header `X-Cockroach-Tenant`.

Release note: None
  • Loading branch information
knz committed Nov 14, 2022
1 parent 788c613 commit c1527b4
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 27 deletions.
5 changes: 5 additions & 0 deletions pkg/ccl/serverccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ go_test(
"chart_catalog_test.go",
"main_test.go",
"role_authentication_test.go",
"server_controller_test.go",
"server_sql_test.go",
"tenant_decommissioned_host_test.go",
"tenant_vars_test.go",
Expand Down Expand Up @@ -64,6 +65,8 @@ go_test(
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/distsql",
"//pkg/sql/lexbase",
"//pkg/sql/sem/catconstants",
"//pkg/sql/sqlinstance/instancestorage",
"//pkg/sql/tests",
"//pkg/testutils",
Expand All @@ -74,13 +77,15 @@ go_test(
"//pkg/util",
"//pkg/util/contextutil",
"//pkg/util/envutil",
"//pkg/util/httputil",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@com_github_elastic_gosigar//:gosigar",
"@com_github_lib_pq//:pq",
"@com_github_prometheus_client_model//go",
Expand Down
129 changes: 129 additions & 0 deletions pkg/ccl/serverccl/server_controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright 2018 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package serverccl

import (
"context"
"fmt"
"io"
"net/http"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/sql/lexbase"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/httputil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

func TestServerControllerHTTP(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
DisableDefaultTestTenant: true,
})
defer s.Stopper().Stop(ctx)

// Retrieve a privileged HTTP client. NB: this also populates
// system.web_sessions.
url := s.AdminURL()
client, err := s.GetAdminHTTPClient()
require.NoError(t, err)

// Now retrieve the entry in the system tenant's web sessions.
row := db.QueryRow(`SELECT id,"hashedSecret",username,"createdAt","expiresAt" FROM system.web_sessions`)
var id int64
var secret string
var username string
var created, expires time.Time
require.NoError(t, row.Scan(&id, &secret, &username, &created, &expires))

// Create our own test tenant with a known name.
_, err = db.Exec("SELECT crdb_internal.create_tenant(10, 'hello')")
require.NoError(t, err)

// Get a SQL connection to the test tenant.
sqlAddr := s.(*server.TestServer).TestingGetSQLAddrForTenant("hello")
db2 := serverutils.OpenDBConn(t, sqlAddr, "defaultdb", false, s.Stopper())

// Instantiate the HTTP test username and privileges into the test tenant.
_, err = db2.Exec(fmt.Sprintf(`CREATE USER %s`, lexbase.EscapeSQLIdent(username)))
require.NoError(t, err)
_, err = db2.Exec(fmt.Sprintf(`GRANT admin TO %s`, lexbase.EscapeSQLIdent(username)))
require.NoError(t, err)

// Copy the session entry to the test tenant.
_, err = db2.Exec(`INSERT INTO system.web_sessions(id, "hashedSecret", username, "createdAt", "expiresAt")
VALUES($1, $2, $3, $4, $5)`, id, secret, username, created, expires)
require.NoError(t, err)

// From this point, we are expecting the ability to access both tenants using
// the same cookie jar.
// Let's assert this is true by retrieving session lists, asserting
// they are different and that each of them contains the appropriate entries.

// Make our session to the system tenant recognizable in session lists.
_, err = db.Exec("SET application_name = 'hello system'")
require.NoError(t, err)

// Ditto for the test tenant.
_, err = db2.Exec("SET application_name = 'hello hello'")
require.NoError(t, err)

get := func(req *http.Request) (*serverpb.ListSessionsResponse, error) {
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, errors.Newf("request failed: %v", resp.StatusCode)
}
if resp.StatusCode != http.StatusOK {
return nil, errors.Newf("request failed: %v / %q", resp.StatusCode, string(body))
}
var ls serverpb.ListSessionsResponse
if err := protoutil.Unmarshal(body, &ls); err != nil {
return nil, err
}
return &ls, err
}

req, err := http.NewRequest("GET", url+"/_status/sessions", nil)
require.NoError(t, err)
req.Header.Set("Content-Type", httputil.ProtoContentType)

// Retrieve the session list for the system tenant.
req.Header.Set(server.TenantSelectHeader, catconstants.SystemTenantName)
body, err := get(req)
require.NoError(t, err)
t.Logf("first response:\n%#v", body)
require.Equal(t, len(body.Sessions), 1)
require.Equal(t, body.Sessions[0].ApplicationName, "hello system")

// Ditto for the test tenant.
req.Header.Set(server.TenantSelectHeader, "hello")
body, err = get(req)
require.NoError(t, err)
t.Logf("second response:\n%#v", body)
require.Equal(t, len(body.Sessions), 1)
require.Equal(t, body.Sessions[0].ApplicationName, "hello hello")
}
6 changes: 6 additions & 0 deletions pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,12 @@ type BaseConfig struct {
// diagnostics to Cockroach Labs.
// Should remain disabled during unit testing.
StartDiagnosticsReporting bool

// DisableOwnHTTPListener prevents this server from starting a TCP
// listener for the HTTP service. Instead, it is expected
// that some other service (typically, the server controller)
// will accept and route requests instead.
DisableOwnHTTPListener bool
}

// MakeBaseConfig returns a BaseConfig with default values.
Expand Down
3 changes: 2 additions & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1122,7 +1122,8 @@ func (s *Server) PreStart(ctx context.Context) error {
// Start the admin UI server. This opens the HTTP listen socket,
// optionally sets up TLS, and dispatches the server worker for the
// web UI.
if err := s.http.start(ctx, workersCtx, uiTLSConfig, s.stopper); err != nil {
if err := startHTTPService(ctx,
workersCtx, &s.cfg.BaseConfig, uiTLSConfig, s.stopper, s.serverController.httpMux); err != nil {
return err
}

Expand Down
81 changes: 73 additions & 8 deletions pkg/server/server_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ package server

import (
"context"
"fmt"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
Expand Down Expand Up @@ -42,6 +44,15 @@ import (
// onDemandServer represents a server that can be started on demand.
type onDemandServer interface {
stop(context.Context)

// getHTTPHandlerFn retrieves the function that can serve HTTP
// requests for this server.
getHTTPHandlerFn() http.HandlerFunc

// testingGetSQLAddr retrieves the address of the SQL listener.
// Used until the following issue is resolved:
// https://github.com/cockroachdb/cockroach/issues/84585
testingGetSQLAddr() string
}

type serverEntry struct {
Expand Down Expand Up @@ -154,6 +165,41 @@ func (c *serverController) Close() {
}
}

// TenantSelectHeader is the HTTP header used to select a particular tenant.
const TenantSelectHeader = `X-Cockroach-Tenant`

// httpMux redirects incoming HTTP requests to the server selected by
// the special HTTP request header.
// If no tenant is specified, the default tenant is used.
func (c *serverController) httpMux(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
tenantName := r.Header.Get(TenantSelectHeader)
if tenantName == "" {
// TODO(knz): Make the default tenant route for HTTP configurable.
// See: https://github.com/cockroachdb/cockroach/issues/91741
tenantName = catconstants.SystemTenantName
}
s, err := c.get(ctx, tenantName)
if err != nil {
log.Warningf(ctx, "unable to find tserver for tenant %q: %v", tenantName, err)
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "cannot find tenant")
return
}

s.getHTTPHandlerFn()(w, r)
}

// TestingGetSQLAddrForTenant extracts the SQL address for the target tenant.
// Used in tests until https://github.com/cockroachdb/cockroach/issues/84585 is resolved.
func (s *Server) TestingGetSQLAddrForTenant(tenant string) string {
ts, err := s.serverController.get(context.Background(), tenant)
if err != nil {
panic(err)
}
return ts.testingGetSQLAddr()
}

// newServerForTenant is a constructor function suitable for use with newTenantController.
// It instantiates SQL servers for secondary tenants.
func (s *Server) newServerForTenant(
Expand Down Expand Up @@ -206,6 +252,14 @@ func (t *tenantServerWrapper) stop(ctx context.Context) {
t.deregister()
}

func (t *tenantServerWrapper) getHTTPHandlerFn() http.HandlerFunc {
return t.server.http.baseHandler
}

func (t *tenantServerWrapper) testingGetSQLAddr() string {
return t.server.sqlServer.cfg.SQLAddr
}

// systemServerWrapper implements the onDemandServer interface for Server.
//
// TODO(knz): Evaluate if this can be eliminated.
Expand All @@ -219,6 +273,14 @@ func (s *systemServerWrapper) stop(ctx context.Context) {
// Do nothing.
}

func (t *systemServerWrapper) getHTTPHandlerFn() http.HandlerFunc {
return t.s.http.baseHandler
}

func (t *systemServerWrapper) testingGetSQLAddr() string {
return t.s.cfg.SQLAddr
}

// startInMemoryTenantServerInternal starts an in-memory server for the given target tenant ID.
// The resulting stopper should be closed in any case, even when an error is returned.
func (s *Server) startInMemoryTenantServerInternal(
Expand Down Expand Up @@ -339,20 +401,23 @@ func makeInMemoryTenantServerConfig(
// TODO(knz): use a single network interface for all tenant servers.
// See: https://github.com/cockroachdb/cockroach/issues/84585
portOffset := kvServerCfg.Config.SecondaryTenantPortOffset
var err1, err2, err3, err4, err5, err6 error
var err1, err2, err3, err4 error
baseCfg.Addr, err1 = rederivePort(index, kvServerCfg.Config.Addr, "", portOffset)
baseCfg.AdvertiseAddr, err2 = rederivePort(index, kvServerCfg.Config.AdvertiseAddr, baseCfg.Addr, portOffset)
baseCfg.HTTPAddr, err3 = rederivePort(index, kvServerCfg.Config.HTTPAddr, "", portOffset)
baseCfg.HTTPAdvertiseAddr, err4 = rederivePort(index, kvServerCfg.Config.HTTPAdvertiseAddr, baseCfg.HTTPAddr, portOffset)
baseCfg.SQLAddr, err5 = rederivePort(index, kvServerCfg.Config.SQLAddr, "", portOffset)
baseCfg.SQLAdvertiseAddr, err6 = rederivePort(index, kvServerCfg.Config.SQLAdvertiseAddr, baseCfg.SQLAddr, portOffset)
baseCfg.SQLAddr, err3 = rederivePort(index, kvServerCfg.Config.SQLAddr, "", portOffset)
baseCfg.SQLAdvertiseAddr, err4 = rederivePort(index, kvServerCfg.Config.SQLAdvertiseAddr, baseCfg.SQLAddr, portOffset)
if err := errors.CombineErrors(err1,
errors.CombineErrors(err2,
errors.CombineErrors(err3,
errors.CombineErrors(err4,
errors.CombineErrors(err5, err6))))); err != nil {
errors.CombineErrors(err3, err4))); err != nil {
return baseCfg, sqlCfg, err
}

// The parent server will route HTTP requests to us.
baseCfg.DisableOwnHTTPListener = true
// Nevertheless, we like to know our own HTTP address.
baseCfg.HTTPAddr = kvServerCfg.Config.HTTPAddr
baseCfg.HTTPAdvertiseAddr = kvServerCfg.Config.HTTPAdvertiseAddr

// Define the unix socket intelligently.
// See: https://github.com/cockroachdb/cockroach/issues/84585
baseCfg.SocketFile = ""
Expand Down
20 changes: 12 additions & 8 deletions pkg/server/server_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,16 +197,20 @@ func makeAdminAuthzCheckHandler(
})
}

// start starts the network listener for the HTTP interface
// startHTTPService starts the network listener for the HTTP interface
// and also starts accepting incoming HTTP connections.
func (s *httpServer) start(
ctx, workersCtx context.Context, uiTLSConfig *tls.Config, stopper *stop.Stopper,
func startHTTPService(
ctx, workersCtx context.Context,
cfg *BaseConfig,
uiTLSConfig *tls.Config,
stopper *stop.Stopper,
handler http.HandlerFunc,
) error {
httpLn, err := ListenAndUpdateAddrs(ctx, &s.cfg.HTTPAddr, &s.cfg.HTTPAdvertiseAddr, "http")
httpLn, err := ListenAndUpdateAddrs(ctx, &cfg.HTTPAddr, &cfg.HTTPAdvertiseAddr, "http")
if err != nil {
return err
}
log.Eventf(ctx, "listening on http port %s", s.cfg.HTTPAddr)
log.Eventf(ctx, "listening on http port %s", cfg.HTTPAddr)

// The HTTP listener shutdown worker, which closes everything under
// the HTTP port when the stopper indicates we are shutting down.
Expand Down Expand Up @@ -244,12 +248,12 @@ func (s *httpServer) start(
if err := stopper.RunAsyncTask(workersCtx, "serve-health", func(context.Context) {
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if HSTSEnabled.Get(&s.cfg.Settings.SV) {
if HSTSEnabled.Get(&cfg.Settings.SV) {
w.Header().Set(hstsHeaderKey, hstsHeaderValue)
}
http.Redirect(w, r, "https://"+r.Host+r.RequestURI, http.StatusTemporaryRedirect)
})
mux.Handle(healthPath, http.HandlerFunc(s.baseHandler))
mux.Handle(healthPath, handler)

plainRedirectServer := netutil.MakeHTTPServer(workersCtx, stopper, nil /* tlsConfig */, mux)

Expand All @@ -263,7 +267,7 @@ func (s *httpServer) start(

// The connManager is responsible for tearing down the net.Conn
// objects when the stopper tells us to shut down.
connManager := netutil.MakeHTTPServer(workersCtx, stopper, uiTLSConfig, http.HandlerFunc(s.baseHandler))
connManager := netutil.MakeHTTPServer(workersCtx, stopper, uiTLSConfig, handler)

// Serve the HTTP endpoint. This will be the original httpLn
// listening on --http-addr without TLS if uiTLSConfig was
Expand Down
24 changes: 14 additions & 10 deletions pkg/server/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,17 +325,21 @@ func (s *SQLServerWrapper) PreStart(ctx context.Context) error {
// Start a context for the asynchronous network workers.
workersCtx := s.AnnotateCtx(context.Background())

// Load the TLS configuration for the HTTP server.
uiTLSConfig, err := s.rpcContext.GetUIServerTLSConfig()
if err != nil {
return err
}
// If DisableOwnHTTPListener is set, we are relying on the HTTP request
// routing performed by the server controller.
if !s.sqlServer.cfg.DisableOwnHTTPListener {
// Load the TLS configuration for the HTTP server.
uiTLSConfig, err := s.rpcContext.GetUIServerTLSConfig()
if err != nil {
return err
}

// Start the admin UI server. This opens the HTTP listen socket,
// optionally sets up TLS, and dispatches the server worker for the
// web UI.
if err := s.http.start(ctx, workersCtx, uiTLSConfig, s.stopper); err != nil {
return err
// Start the admin UI server. This opens the HTTP listen socket,
// optionally sets up TLS, and dispatches the server worker for the
// web UI.
if err := startHTTPService(ctx, workersCtx, s.sqlServer.cfg, uiTLSConfig, s.stopper, s.http.baseHandler); err != nil {
return err
}
}

// Initialize the external storage builders configuration params now that the
Expand Down

0 comments on commit c1527b4

Please sign in to comment.