Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
88419: ui: add connected components for insights r=ericharmeling a=ericharmeling

This commit adds connected components for the workload and schema insights pages, for use in the CC Console.

Fixes #87693.

https://www.loom.com/share/08163a7c125948119ca71ac097099a29

Release note: None

88510: ui: move `latestQuery`, `latestFormattedQuery` from redux to local state r=xinhaoz a=xinhaoz

Previously, the fields `latestQuery` and `latestFormattedQuery` representing the latest non-empty query string for a statement viewed from the detaisl page was being stored in redux. The purpose of these fields was to preserve the query when changing tabs in the stmt details page. Saving this in the redux store was unnecessary and so this commit moves these fields to the stmt details local state.

Release note: None


https://www.loom.com/share/a28d412fb83a429391210935982404de

88594: cloud: add version gate for auth via assume role in AWS and GCP stora… r=rhu713 a=rhu713

…ge and KMS

Add a version gate for auth via assume role in AWS and GCP storage and KMS to prevent this type of auth until all nodes in the cluster has been upgraded to 22.2. The gate prevents a class of job failures where sometimes a job can succeed with assume role auth if its processors happen to all be on 22.2 nodes, but fail at times when one of its processor nodes don't support assume role. This version gate preempts the issue by preventing this type of auth until the cluster has been finalized on 22.2 and gives a better error message of why the auth cannot be used.

It's important to note that this gate does not prevent a user from creating a BACKUP job that uses assume role auth, e.g. via the DETACHED option, because the destination storage is not accessed during planning. This is inline with existing behavior for other types of auth errors, e.g. if the user enters incorrect credentials. The BACKUP job will still fail with the version gate error when it eventually executes.

Release note: None

88596: kv: use RaftAppliedIndexTerm to generate SnapshotMetadata, don't scan log r=nvanbenschoten a=nvanbenschoten

This commit replaces the call to `Term(raftAppliedIndex)` with direct use of the new `RaftAppliedIndexTerm` field (added in c3bc064) when generating a `SnapshotMetadata` in service of the `raft.Storage.Snapshot` interface. As of v22.2, this field has been fully migrated in.

First and foremost, this is a code simplification. However, it also helps with projects like #87050, where async Raft log writes make it possible for a Raft leader to apply an entry before it has been appended to the leader's own log. Such flexibility[^1] would help smooth out tail latency in any single replica's local log writes, even if that replica is the leader itself. This is an important characteristic of quorum systems that we fail to provide because of the tight coupling between the Raft leader's own log writes and the Raft leader's acknowledgment of committed proposals.

Release justification: None. Don't backport to release-22.2.

Release note: None.

[^1]: if safe, I haven't convinced myself that it is in all cases. It certainly is not for operations like non-loosely coupled log truncation.

88800: kvserver: use type-safe atomics in raftSendQueue r=nvanbenschoten a=pavelkalinnikov

Go 1.19 introduced atomic types that enforce atomic access to variables, which in many situation is less error-prone. This commit resolves a TODO to take advantage of these types.

Release note: None

Co-authored-by: Eric Harmeling <eric.harmeling@cockroachlabs.com>
Co-authored-by: Xin Hao Zhang <xzhang@cockroachlabs.com>
Co-authored-by: Rui Hu <rui@cockroachlabs.com>
Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
Co-authored-by: Pavel Kalinnikov <pavel@cockroachlabs.com>
  • Loading branch information
6 people committed Sep 27, 2022
6 parents 1b63943 + 15e8355 + 27299e0 + cdb0bf4 + 89058d5 + 0b7fec8 commit 03b8afb
Show file tree
Hide file tree
Showing 59 changed files with 796 additions and 328 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -298,4 +298,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 1000022.1-72 set the active cluster version in the format '<major>.<minor>'
version version 1000022.1-74 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,6 @@
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.span_registry.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://<ui>/#/debug/tracez</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>1000022.1-72</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>1000022.1-74</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
1 change: 1 addition & 0 deletions pkg/cloud/amazon/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"//pkg/cloud/externalconn",
"//pkg/cloud/externalconn/connectionpb",
"//pkg/cloud/externalconn/utils",
"//pkg/clusterversion",
"//pkg/security/username",
"//pkg/server/telemetry",
"//pkg/settings",
Expand Down
5 changes: 5 additions & 0 deletions pkg/cloud/amazon/aws_kms.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kms"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -161,6 +162,10 @@ func MakeAWSKMS(ctx context.Context, uri string, env cloud.KMSEnv) (cloud.KMS, e
}

if kmsURIParams.roleARN != "" {
if !env.ClusterSettings().Version.IsActive(ctx, clusterversion.SupportAssumeRoleAuth) {
return nil, errors.New("cannot authenticate to KMS via assume role until cluster has fully upgraded to 22.2")
}

// If there are delegate roles in the assume-role chain, we create a session
// for each role in order for it to fetch the credentials from the next role
// in the chain.
Expand Down
5 changes: 5 additions & 0 deletions pkg/cloud/amazon/s3_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/cloud/cloudpb"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -470,6 +471,10 @@ func newClient(
}

if conf.roleARN != "" {
if !settings.Version.IsActive(ctx, clusterversion.SupportAssumeRoleAuth) {
return s3Client{}, "", errors.New("cannot authenticate to cloud storage via assume role until cluster has fully upgraded to 22.2")
}

for _, role := range conf.delegateRoleARNs {
intermediateCreds := stscreds.NewCredentials(sess, role)
opts.Config.Credentials = intermediateCreds
Expand Down
1 change: 1 addition & 0 deletions pkg/cloud/gcp/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"//pkg/cloud/externalconn",
"//pkg/cloud/externalconn/connectionpb",
"//pkg/cloud/externalconn/utils",
"//pkg/clusterversion",
"//pkg/security/username",
"//pkg/server/telemetry",
"//pkg/settings",
Expand Down
5 changes: 5 additions & 0 deletions pkg/cloud/gcp/gcp_kms.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

kms "cloud.google.com/go/kms/apiv1"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/errors"
"google.golang.org/api/option"
kmspb "google.golang.org/genproto/googleapis/cloud/kms/v1"
Expand Down Expand Up @@ -126,6 +127,10 @@ func MakeGCSKMS(ctx context.Context, uri string, env cloud.KMSEnv) (cloud.KMS, e
if kmsURIParams.assumeRole == "" {
opts = append(opts, credentialsOpt...)
} else {
if !env.ClusterSettings().Version.IsActive(ctx, clusterversion.SupportAssumeRoleAuth) {
return nil, errors.New("cannot authenticate to KMS via assume role until cluster has fully upgraded to 22.2")
}

assumeOpt, err := createImpersonateCredentials(ctx, kmsURIParams.assumeRole, kmsURIParams.delegateRoles, kms.DefaultAuthScopes(), credentialsOpt...)
if err != nil {
return nil, errors.Wrapf(err, "failed to assume role")
Expand Down
5 changes: 5 additions & 0 deletions pkg/cloud/gcp/gcs_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/cloud/cloudpb"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -181,6 +182,10 @@ func makeGCSStorage(
if conf.AssumeRole == "" {
opts = append(opts, credentialsOpt...)
} else {
if !args.Settings.Version.IsActive(ctx, clusterversion.SupportAssumeRoleAuth) {
return nil, errors.New("cannot authenticate to cloud storage via assume role until cluster has fully upgraded to 22.2")
}

assumeOpt, err := createImpersonateCredentials(ctx, conf.AssumeRole, conf.AssumeRoleDelegates, []string{scope}, credentialsOpt...)
if err != nil {
return nil, errors.Wrapf(err, "failed to assume role")
Expand Down
7 changes: 7 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ const (
// leases to nodes that (i) don't expect them for certain keyspans, and (ii)
// don't know to upgrade them to efficient epoch-based ones.
EnableLeaseUpgrade
// SupportAssumeRoleAuth is the version where assume role authorization is
// supported in cloud storage and KMS.
SupportAssumeRoleAuth

// *************************************************
// Step (1): Add new versions here.
Expand Down Expand Up @@ -481,6 +484,10 @@ var rawVersionsSingleton = keyedVersions{
Key: EnableLeaseUpgrade,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 72},
},
{
Key: SupportAssumeRoleAuth,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 74},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 6 additions & 7 deletions pkg/kv/kvserver/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,7 @@ type raftSendQueue struct {
reqs chan *kvserverpb.RaftMessageRequest
// The number of bytes in flight. Must be updated *atomically* on sending and
// receiving from the reqs channel.
// TODO(pavelkalinnikov): replace by atomic.Int64 when CRDB uses Go 1.19.
bytes int64
bytes atomic.Int64
}

// NewDummyRaftTransport returns a dummy raft transport for use in tests which
Expand Down Expand Up @@ -228,7 +227,7 @@ func (t *RaftTransport) queueMessageCount() int64 {
// queueByteSize returns the total bytes size of outgoing messages in the queue.
func (t *RaftTransport) queueByteSize() int64 {
var size int64
t.visitQueues(func(q *raftSendQueue) { size += atomic.LoadInt64(&q.bytes) })
t.visitQueues(func(q *raftSendQueue) { size += q.bytes.Load() })
return size
}

Expand Down Expand Up @@ -441,7 +440,7 @@ func (t *RaftTransport) processQueue(
return err
case req := <-q.reqs:
size := int64(req.Size())
atomic.AddInt64(&q.bytes, -size)
q.bytes.Add(-size)
budget := targetRaftOutgoingBatchSize.Get(&t.st.SV) - size
batch.Requests = append(batch.Requests, *req)
releaseRaftMessageRequest(req)
Expand All @@ -450,7 +449,7 @@ func (t *RaftTransport) processQueue(
select {
case req = <-q.reqs:
size := int64(req.Size())
atomic.AddInt64(&q.bytes, -size)
q.bytes.Add(-size)
budget -= size
batch.Requests = append(batch.Requests, *req)
releaseRaftMessageRequest(req)
Expand Down Expand Up @@ -532,7 +531,7 @@ func (t *RaftTransport) SendAsync(
size := int64(req.Size())
select {
case q.reqs <- req:
atomic.AddInt64(&q.bytes, size)
q.bytes.Add(size)
return true
default:
if logRaftSendQueueFullEvery.ShouldLog() {
Expand Down Expand Up @@ -564,7 +563,7 @@ func (t *RaftTransport) startProcessNewQueue(
for {
select {
case req := <-q.reqs:
atomic.AddInt64(&q.bytes, -int64(req.Size()))
q.bytes.Add(-int64(req.Size()))
t.metrics.MessagesDropped.Inc(1)
releaseRaftMessageRequest(req)
default:
Expand Down
20 changes: 3 additions & 17 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
Expand Down Expand Up @@ -925,22 +924,11 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch(
func (b *replicaAppBatch) stageTrivialReplicatedEvalResult(
ctx context.Context, cmd *replicatedCmd,
) {
raftAppliedIndex := cmd.ent.Index
if raftAppliedIndex == 0 {
if cmd.ent.Index == 0 {
log.Fatalf(ctx, "raft entry with index 0")
}
b.state.RaftAppliedIndex = raftAppliedIndex
rs := cmd.decodedRaftEntry.replicatedResult().State
// We are post migration or this replicatedCmd is doing the migration.
if b.state.RaftAppliedIndexTerm > 0 || (rs != nil &&
rs.RaftAppliedIndexTerm == stateloader.RaftLogTermSignalForAddRaftAppliedIndexTermMigration) {
// Once we populate b.state.RaftAppliedIndexTerm it will flow into the
// persisted RangeAppliedState and into the in-memory representation in
// Replica.mu.state. The latter is used to initialize b.state, so future
// calls to this method will see that the migration has already happened
// and will continue to populate the term.
b.state.RaftAppliedIndexTerm = cmd.ent.Term
}
b.state.RaftAppliedIndex = cmd.ent.Index
b.state.RaftAppliedIndexTerm = cmd.ent.Term

if leaseAppliedIndex := cmd.leaseIndex; leaseAppliedIndex != 0 {
b.state.LeaseAppliedIndex = leaseAppliedIndex
Expand Down Expand Up @@ -1001,8 +989,6 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error {
r := b.r
r.mu.Lock()
r.mu.state.RaftAppliedIndex = b.state.RaftAppliedIndex
// RaftAppliedIndexTerm will be non-zero only when the
// AddRaftAppliedIndexTermMigration has happened.
r.mu.state.RaftAppliedIndexTerm = b.state.RaftAppliedIndexTerm
r.mu.state.LeaseAppliedIndex = b.state.LeaseAppliedIndex

Expand Down
26 changes: 4 additions & 22 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,15 +414,10 @@ func (r *Replica) GetLeaseAppliedIndex() uint64 {
// Snapshot method.
func (r *replicaRaftStorage) Snapshot() (raftpb.Snapshot, error) {
r.mu.AssertHeld()
appliedIndex := r.mu.state.RaftAppliedIndex
term, err := r.Term(appliedIndex)
if err != nil {
return raftpb.Snapshot{}, err
}
return raftpb.Snapshot{
Metadata: raftpb.SnapshotMetadata{
Index: appliedIndex,
Term: term,
Index: r.mu.state.RaftAppliedIndex,
Term: r.mu.state.RaftAppliedIndexTerm,
},
}, nil
}
Expand Down Expand Up @@ -605,17 +600,6 @@ func snapshot(
return OutgoingSnapshot{}, err
}

term, err := term(ctx, rsl, snap, rangeID, eCache, state.RaftAppliedIndex)
// If we've migrated to populating RaftAppliedIndexTerm, check that the term
// from the two sources are equal.
if state.RaftAppliedIndexTerm != 0 && term != state.RaftAppliedIndexTerm {
return OutgoingSnapshot{},
errors.AssertionFailedf("unequal terms %d != %d", term, state.RaftAppliedIndexTerm)
}
if err != nil {
return OutgoingSnapshot{}, errors.Wrapf(err, "failed to fetch term of %d", state.RaftAppliedIndex)
}

return OutgoingSnapshot{
RaftEntryCache: eCache,
WithSideloaded: withSideloaded,
Expand All @@ -626,7 +610,7 @@ func snapshot(
Data: snapUUID.GetBytes(),
Metadata: raftpb.SnapshotMetadata{
Index: state.RaftAppliedIndex,
Term: term,
Term: state.RaftAppliedIndexTerm,
// Synthesize our raftpb.ConfState from desc.
ConfState: desc.Replicas().ConfState(),
},
Expand Down Expand Up @@ -979,9 +963,7 @@ func (r *Replica) applySnapshot(
log.Fatalf(ctx, "snapshot RaftAppliedIndex %d doesn't match its metadata index %d",
state.RaftAppliedIndex, nonemptySnap.Metadata.Index)
}
// If we've migrated to populating RaftAppliedIndexTerm, check that the term
// from the two sources are equal.
if state.RaftAppliedIndexTerm != 0 && state.RaftAppliedIndexTerm != nonemptySnap.Metadata.Term {
if state.RaftAppliedIndexTerm != nonemptySnap.Metadata.Term {
log.Fatalf(ctx, "snapshot RaftAppliedIndexTerm %d doesn't match its metadata term %d",
state.RaftAppliedIndexTerm, nonemptySnap.Metadata.Term)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@
export * from "./indexUsageStatsRec";
export * from "./schemaInsightsView";
export * from "./emptySchemaInsightsTablePlaceholder";
export * from "./schemaInsightsPageConnected";
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

import { connect } from "react-redux";
import { RouteComponentProps, withRouter } from "react-router-dom";
import {
actions,
selectSchemaInsights,
selectSchemaInsightsDatabases,
selectSchemaInsightsError,
selectSchemaInsightsTypes,
selectFilters,
selectSortSetting,
} from "src/store/schemaInsights";
import { AppState } from "src/store";
import {
SchemaInsightsView,
SchemaInsightsViewDispatchProps,
SchemaInsightsViewStateProps,
} from "./schemaInsightsView";
import { SchemaInsightEventFilters } from "../types";
import { SortSetting } from "src/sortedtable";
import { actions as localStorageActions } from "../../store/localStorage";

const mapStateToProps = (
state: AppState,
_props: RouteComponentProps,
): SchemaInsightsViewStateProps => ({
schemaInsights: selectSchemaInsights(state),
schemaInsightsDatabases: selectSchemaInsightsDatabases(state),
schemaInsightsTypes: selectSchemaInsightsTypes(state),
schemaInsightsError: selectSchemaInsightsError(state),
filters: selectFilters(state),
sortSetting: selectSortSetting(state),
});

const mapDispatchToProps = {
onFiltersChange: (filters: SchemaInsightEventFilters) =>
localStorageActions.update({
key: "filters/SchemaInsightsPage",
value: filters,
}),
onSortChange: (ss: SortSetting) =>
localStorageActions.update({
key: "sortSetting/SchemaInsightsPage",
value: ss,
}),
refreshSchemaInsights: actions.refresh,
};

export const SchemaInsightsPageConnected = withRouter(
connect<
SchemaInsightsViewStateProps,
SchemaInsightsViewDispatchProps,
RouteComponentProps
>(
mapStateToProps,
mapDispatchToProps,
)(SchemaInsightsView),
);
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ export const SchemaInsightsView: React.FC<SchemaInsightsViewProps> = ({
</PageConfig>
<div className={cx("table-area")}>
<Loading
loading={schemaInsights == null}
loading={schemaInsights === null}
page="schema insights"
error={schemaInsightsError}
renderError={() => InsightsError()}
Expand All @@ -227,7 +227,7 @@ export const SchemaInsightsView: React.FC<SchemaInsightsViewProps> = ({
renderNoResult={
<EmptySchemaInsightsTablePlaceholder
isEmptySearchResults={
search?.length > 0 && filteredSchemaInsights?.length == 0
search?.length > 0 && filteredSchemaInsights?.length === 0
}
/>
}
Expand Down
Loading

0 comments on commit 03b8afb

Please sign in to comment.