Skip to content

Commit

Permalink
Merge #75835 #75839
Browse files Browse the repository at this point in the history
75835: server: remove contention event registry from baseStatusServer r=Azhng a=Azhng

Previously, baseStatusServer holds a reference to contention.Registry.
This reference of contention.Registry is used to power the
/ListContentionEvents endpoint. However, this is not ideal for two
reasons:
1. baseStatusServer already holds a reference to *server.SQLServer,
   which in turn contains contention.Registry through its ExecutorConfig
   field. This means that there's no good reason to have another field
   in baseStatusServer to hold this additional reference.
2. The ongoing contention event registry work will make contention
   registry depend on status server to perform transaction ID resolution
   protocol. As it stand today, the status server's construction depends
   on the creation of contention.Registry. By introducing transaction ID
   resolution protocol into contention.Registry, we will be introducing
   a cyclical reference, which can lead to ugly API design.

This commit removes the baseStatusServer's reference on
contention.Registry, and instead directly fetching from executor config.

Release note: None

75839: types: make WithoutTypeModifier less error prone r=mgartner a=otan

WithoutTypeModifier previously did not zero out geometry/geography
correctly, causing a bug in active record (which I cannot reproduce
easily with a test). Nonetheless, this commit fixes that and makes
future types less prone to errors when using `WithoutTypeModifier`.

Release note: None

Co-authored-by: Azhng <archer.xn@gmail.com>
Co-authored-by: Oliver Tan <otan@cockroachlabs.com>
  • Loading branch information
3 people committed Feb 3, 2022
3 parents 088a18b + 39690fc + 9297205 commit a950be5
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 77 deletions.
4 changes: 2 additions & 2 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
sAdmin := newAdminServer(lateBoundServer, adminAuthzCheck, internalExecutor)
sHTTP := newHTTPServer(cfg)
sessionRegistry := sql.NewSessionRegistry()
contentionRegistry := contention.NewRegistry()
flowScheduler := flowinfra.NewFlowScheduler(cfg.AmbientCtx, stopper, st)

sStatus := newStatusServer(
Expand All @@ -656,10 +655,11 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
node.stores,
stopper,
sessionRegistry,
contentionRegistry,
flowScheduler,
internalExecutor,
)

contentionRegistry := contention.NewRegistry()
// TODO(tbg): don't pass all of Server into this to avoid this hack.
sAuth := newAuthenticationServer(lateBoundServer)
for i, gw := range []grpcGatewayServer{sAdmin, sStatus, sAuth, &sTS} {
Expand Down
33 changes: 15 additions & 18 deletions pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,13 @@ type baseStatusServer struct {
serverpb.UnimplementedStatusServer

log.AmbientContext
privilegeChecker *adminPrivilegeChecker
sessionRegistry *sql.SessionRegistry
contentionRegistry *contention.Registry
flowScheduler *flowinfra.FlowScheduler
st *cluster.Settings
sqlServer *SQLServer
rpcCtx *rpc.Context
stopper *stop.Stopper
privilegeChecker *adminPrivilegeChecker
sessionRegistry *sql.SessionRegistry
flowScheduler *flowinfra.FlowScheduler
st *cluster.Settings
sqlServer *SQLServer
rpcCtx *rpc.Context
stopper *stop.Stopper
}

// getLocalSessions returns a list of local sessions on this node. Note that the
Expand Down Expand Up @@ -307,7 +306,7 @@ func (b *baseStatusServer) ListLocalContentionEvents(
}

return &serverpb.ListContentionEventsResponse{
Events: b.contentionRegistry.Serialize(),
Events: b.sqlServer.execCfg.ContentionRegistry.Serialize(),
}, nil
}

Expand Down Expand Up @@ -415,21 +414,19 @@ func newStatusServer(
stores *kvserver.Stores,
stopper *stop.Stopper,
sessionRegistry *sql.SessionRegistry,
contentionRegistry *contention.Registry,
flowScheduler *flowinfra.FlowScheduler,
internalExecutor *sql.InternalExecutor,
) *statusServer {
ambient.AddLogTag("status", nil)
server := &statusServer{
baseStatusServer: &baseStatusServer{
AmbientContext: ambient,
privilegeChecker: adminAuthzCheck,
sessionRegistry: sessionRegistry,
contentionRegistry: contentionRegistry,
flowScheduler: flowScheduler,
st: st,
rpcCtx: rpcCtx,
stopper: stopper,
AmbientContext: ambient,
privilegeChecker: adminAuthzCheck,
sessionRegistry: sessionRegistry,
flowScheduler: flowScheduler,
st: st,
rpcCtx: rpcCtx,
stopper: stopper,
},
cfg: cfg,
admin: adminServer,
Expand Down
5 changes: 2 additions & 3 deletions pkg/server/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,10 @@ func StartTenant(
// the SQL server object.
tenantStatusServer := newTenantStatusServer(
baseCfg.AmbientCtx, &adminPrivilegeChecker{ie: args.circularInternalExecutor},
args.sessionRegistry, args.contentionRegistry, args.flowScheduler, baseCfg.Settings, nil,
args.sessionRegistry, args.flowScheduler, baseCfg.Settings, nil,
args.rpcContext, args.stopper,
)
args.contentionRegistry = contention.NewRegistry()
args.sqlStatusServer = tenantStatusServer
s, err := newSQLServer(ctx, args)
tenantStatusServer.sqlServer = s
Expand Down Expand Up @@ -499,7 +500,6 @@ func makeTenantSQLServerArgs(
// writing): the blob service and DistSQL.
dummyRPCServer := rpc.NewServer(rpcContext)
sessionRegistry := sql.NewSessionRegistry()
contentionRegistry := contention.NewRegistry()
flowScheduler := flowinfra.NewFlowScheduler(baseCfg.AmbientCtx, stopper, st)
return sqlServerArgs{
sqlServerOptionalKVArgs: sqlServerOptionalKVArgs{
Expand Down Expand Up @@ -535,7 +535,6 @@ func makeTenantSQLServerArgs(
registry: registry,
recorder: recorder,
sessionRegistry: sessionRegistry,
contentionRegistry: contentionRegistry,
flowScheduler: flowScheduler,
circularInternalExecutor: circularInternalExecutor,
circularJobRegistry: circularJobRegistry,
Expand Down
18 changes: 8 additions & 10 deletions pkg/server/tenant_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ func newTenantStatusServer(
ambient log.AmbientContext,
privilegeChecker *adminPrivilegeChecker,
sessionRegistry *sql.SessionRegistry,
contentionRegistry *contention.Registry,
flowScheduler *flowinfra.FlowScheduler,
st *cluster.Settings,
sqlServer *SQLServer,
Expand All @@ -87,15 +86,14 @@ func newTenantStatusServer(
ambient.AddLogTag("tenant-status", nil)
return &tenantStatusServer{
baseStatusServer: baseStatusServer{
AmbientContext: ambient,
privilegeChecker: privilegeChecker,
sessionRegistry: sessionRegistry,
contentionRegistry: contentionRegistry,
flowScheduler: flowScheduler,
st: st,
sqlServer: sqlServer,
rpcCtx: rpcCtx,
stopper: stopper,
AmbientContext: ambient,
privilegeChecker: privilegeChecker,
sessionRegistry: sessionRegistry,
flowScheduler: flowScheduler,
st: st,
sqlServer: sqlServer,
rpcCtx: rpcCtx,
stopper: stopper,
},
}
}
Expand Down
60 changes: 19 additions & 41 deletions pkg/sql/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,15 @@ func MakeChar(width int32) *T {
Family: StringFamily, Oid: oid.T_bpchar, Width: width, Locale: &emptyLocale}}
}

// oidCanBeCollatedString returns true if the given oid is can be a CollatedString.
func oidCanBeCollatedString(o oid.Oid) bool {
switch o {
case oid.T_text, oid.T_varchar, oid.T_bpchar, oid.T_char, oid.T_name:
return true
}
return false
}

// MakeCollatedString constructs a new instance of a CollatedStringFamily type
// that is collated according to the given locale. The new type is based upon
// the given string type, having the same oid and width values. For example:
Expand All @@ -851,8 +860,7 @@ func MakeChar(width int32) *T {
// VARCHAR(20) => VARCHAR(20) COLLATE EN
//
func MakeCollatedString(strType *T, locale string) *T {
switch strType.Oid() {
case oid.T_text, oid.T_varchar, oid.T_bpchar, oid.T_char, oid.T_name:
if oidCanBeCollatedString(strType.Oid()) {
return &T{InternalType: InternalType{
Family: CollatedStringFamily, Oid: strType.Oid(), Width: strType.Width(), Locale: &locale}}
}
Expand Down Expand Up @@ -1278,50 +1286,20 @@ func (t *T) WithoutTypeModifiers() *T {
return t
}

switch t.Oid() {
case oid.T_bit:
return MakeBit(0)
case oid.T_bpchar, oid.T_char, oid.T_text, oid.T_varchar:
// For string-like types, we copy the type and set the width to 0 rather
// than returning typeBpChar, typeQChar, VarChar, or String so that
// we retain the locale value if the type is collated.
// For types that can be a collated string, we copy the type and set the width
// to 0 rather than returning the default OidToType type so that we retain the
// locale value if the type is collated.
if oidCanBeCollatedString(t.Oid()) {
newT := *t
newT.InternalType.Width = 0
return &newT
case oid.T_interval:
return Interval
case oid.T_numeric:
return Decimal
case oid.T_time:
return Time
case oid.T_timestamp:
return Timestamp
case oid.T_timestamptz:
return TimestampTZ
case oid.T_timetz:
return TimeTZ
case oid.T_varbit:
return VarBit
case oid.T_anyelement,
oid.T_bool,
oid.T_bytea,
oid.T_date,
oidext.T_box2d,
oid.T_float4, oid.T_float8,
oidext.T_geography, oidext.T_geometry,
oid.T_inet,
oid.T_int2, oid.T_int4, oid.T_int8,
oid.T_jsonb,
oid.T_name,
oid.T_oid,
oid.T_regclass, oid.T_regnamespace, oid.T_regproc, oid.T_regprocedure, oid.T_regrole, oid.T_regtype,
oid.T_unknown,
oid.T_uuid,
oid.T_void:
return t
default:
}

t, ok := OidToType[t.Oid()]
if !ok {
panic(errors.AssertionFailedf("unexpected OID: %d", t.Oid()))
}
return t
}

// Scale is an alias method for Width, used for clarity for types in
Expand Down
10 changes: 7 additions & 3 deletions pkg/sql/types/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,8 @@ func TestWithoutTypeModifiers(t *testing.T) {
{MakeArray(MakeDecimal(5, 1)), DecimalArray},
{MakeTuple([]*T{MakeString(2), Time, MakeDecimal(5, 1)}),
MakeTuple([]*T{String, Time, Decimal})},
{MakeGeography(geopb.ShapeType_Point, 3857), Geography},
{MakeGeometry(geopb.ShapeType_PointZ, 4326), Geometry},

// Types without modifiers.
{Bool, Bool},
Expand All @@ -1026,8 +1028,10 @@ func TestWithoutTypeModifiers(t *testing.T) {
}

for _, tc := range testCases {
if actual := tc.t.WithoutTypeModifiers(); !actual.Identical(tc.expected) {
t.Errorf("expected <%v>, got <%v>", tc.expected.DebugString(), actual.DebugString())
}
t.Run(tc.t.SQLString(), func(t *testing.T) {
if actual := tc.t.WithoutTypeModifiers(); !actual.Identical(tc.expected) {
t.Errorf("expected <%v>, got <%v>", tc.expected.DebugString(), actual.DebugString())
}
})
}
}

0 comments on commit a950be5

Please sign in to comment.