Skip to content

Commit

Permalink
Merge #98331 #98428
Browse files Browse the repository at this point in the history
98331: ui: remove polling from fingerprints pages, allow new stats requests while one is in flight r=maryliag,j82w a=xinhaoz

See individual commits.

The changes here serve as a base for the performance improvements we'll be making.
Since they're not really related to those changes specifically (e.g. adding limit/sort and splitting the api calls), I've put them in their own PR here to make reviewing easier.

Loom verifying behaviour and showing new requests being dispatched while ones are pending
Pages shown:
- stmt fingerprints
- txn fingerprints
- txn fingerprint details
- stmt fingerprint details

https://www.loom.com/share/3448117bfecf404c8d698f4ad1240e8c

CC:
https://www.loom.com/share/bb30b51ebe5144528ab0c6fabdbfb2f1

98428: flowinfra: fix a couple of rare bugs with flow cleanup r=yuzefovich a=yuzefovich

**sql: minor cleanup around flow's wait group**

This commit makes minor adjustments so that `WaitGroup.Done` is called
closer (visibly) to where `Add` is. This aids when understanding
whether the wait group is cleaned up properly.

Release note: None

**flowinfra: fix a couple of rare bugs with flow cleanup**

This commit fixes a couple of rare bugs around the flow cleanup that
were encountered when running `tpch_concurrency` roachtest at the
concurrency of 1000 or so (i.e. for the bugs to reproduce we needed
extremely overloaded nodes).

The setup for the bugs is as follows:
- we have some number of inbound streams on the node. The inbound
streams in this context are the server side of the `FlowStream` RPCs
(i.e. the inbox side)
- each inbound stream needs a separate goroutine (created by the gRPC
framework), and these goroutines are tracked against the wait group of
the flow
- the main goroutine of the flow registers the flow with the
`FlowRegistry` and gives it a timeout (10 seconds by default) to let the
inbound streams to arrive
- we have synchronization in place to block the inbound streams until
the flow is registered
- once the inbound stream finds its flow in the registry, it sends
a handshake RPC to the producer (i.e. to the outbox).

The first bug is about not decrementing the wait group of the flow in
some cases. In particular, if the inbound stream timeout occurs _while_
the inbound stream is performing the "handshake" RPC _and_ that RPC
results in an error, then the wait group wouldn't be decremented. This
was the case because
1. the "timeout" goroutine would observe that the stream is "connected",
so it would treat the timeout cancellation as a no-op
2. the inbound stream goroutine would get an error on the handshake, so
it would bubble the error up but would never call
`InboundStreamInfo.onFinish`.

This bug is now fixed by only marking the stream as "connected" once the
handshake succeeds. This bug was introduced about a year ago in 62ea0c6
when we refactored the locking around the flow registry.

The second bug is related. Previously, if `ConnectInboundStream`
resulted in an error, this could leave one of the inbox goroutines
blocked forever in `Inbox.Init`. In particular, the inbox waits until
of the following occurs:
- the stream arrives and successfully performs the handshake
- the inbound stream timeout occurs
- the flow context is canceled
- the inbox context is canceled.

In the scenario from the first bug, the first point wasn't true because
the handshake resulted in an error. The second point wasn't true because
the stream was marked as "connected" so it was skipped when
`flowEntry.streamTimer` fired (meaning that
`InboundStreamHandler.Timeout` wasn't called). The contexts weren't
explicitly canceled either (the incorrect assumption was that the flow
context would be canceled by the outboxes on the node, but we might have
plans where the gateway flow doesn't have any outboxes).

This second bug is now fixed by explicitly cancelling the flow whenever
inbound stream connection results in an error. The additional
nice-to-have improvement was made to the outboxes to cancel the flow
context on their nodes whenever the `FlowStream` RPC fails - the query
is doomed, so we might as well cancel the flow sooner.

I decided to not include a release note here since I believe the
prerequisite for these bugs to occur is that the inbound streams time out
exactly while performing the handshake RPC. 10 seconds default value for
the timeout makes it extremely unlikely to happen, and the only way we
ran into this was by severely overloading the nodes (and the user would
have bigger problems at that point than these deadlocks).

Fixes: #94113.

Release note: None

**roachtest: increase bounds for tpch_concurrency**

This commit adjusts the bounds for `tpch_concurrency` roachtest. Given
that we now set GOMEMLIMIT by default, we can sustain much higher
concurrency without falling over. I ran the test manually with
`[128, 1024)` range and got 972.938 on average (I had 20 runs of which
4 timed out). To prevent the timeouts this commit uses a small interval
(less than 64 in length) so that we make 6 iterations in the binary
search. As a result, the new search range is [970, 1030).

Additionally, it was observed that Q15 (which performs two schema
changes) can take non-trivial amount of time, so it is skipped from
this test. This commit also makes a minor improvement to fail the test
if all iterations resulted in a node crash.

Release note: None

Co-authored-by: Xin Hao Zhang <xzhang@cockroachlabs.com>
Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
3 people committed Mar 16, 2023
3 parents 82fd797 + d12634b + 060b475 commit a6c94d3
Show file tree
Hide file tree
Showing 38 changed files with 522 additions and 444 deletions.
52 changes: 38 additions & 14 deletions pkg/cmd/roachtest/tests/tpch_concurrency.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package tests
import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
Expand Down Expand Up @@ -94,6 +95,13 @@ func registerTPCHConcurrency(r registry.Registry) {
t.Status(fmt.Sprintf("running with concurrency = %d", concurrency))
// Run each query once on each connection.
for queryNum := 1; queryNum <= tpch.NumQueries; queryNum++ {
if queryNum == 15 {
// Skip Q15 because it involves a schema change which - when
// run with high concurrency - takes non-trivial amount of
// time.
t.Status("skipping Q", queryNum)
continue
}
t.Status("running Q", queryNum)
// The way --max-ops flag works is as follows: the global ops
// counter is incremented **after** each worker completes a
Expand Down Expand Up @@ -153,41 +161,56 @@ func registerTPCHConcurrency(r registry.Registry) {
disableStreamer bool,
) {
setupCluster(ctx, t, c, disableStreamer)
// TODO(yuzefovich): once we have a good grasp on the expected value for
// max supported concurrency, we should use search.Searcher instead of
// the binary search here. Additionally, we should introduce an
// additional step to ensure that some kind of lower bound for the
// supported concurrency is always sustained and fail the test if it
// isn't.
minConcurrency, maxConcurrency := 50, 110
// We use [970, 1030) range to perform the binary search over because
// - the interval length is smaller than 64, so we will need 6
// iterations. Each iteration can take 2-3 hours, and we have 18 hour
// timeout, so 6 iterations is the maximum without marking the test as
// "weekly".
// - the interval is centered around 1000. We often can push this a bit
// higher, but then the iterations also get longer. 1000 concurrently
// running analytical queries on the 3 node cluster that doesn't crash
// is much more than we expect our users to run.
const minConcurrency, maxConcurrency = 970, 1030
min, max := minConcurrency, maxConcurrency
// Run the binary search to find the largest concurrency that doesn't
// crash a node in the cluster. The current range is represented by
// [minConcurrency, maxConcurrency).
for minConcurrency < maxConcurrency-1 {
concurrency := (minConcurrency + maxConcurrency) / 2
// [min, max).
for min < max-1 {
concurrency := (min + max) / 2
if err := checkConcurrency(ctx, t, c, concurrency); err != nil {
maxConcurrency = concurrency
max = concurrency
} else {
minConcurrency = concurrency
min = concurrency
}
}
// Restart the cluster so that if any nodes crashed in the last
// iteration, it doesn't fail the test.
restartCluster(ctx, c, t)
t.Status(fmt.Sprintf("max supported concurrency is %d", minConcurrency))
t.Status(fmt.Sprintf("max supported concurrency is %d", min))
// Write the concurrency number into the stats.json file to be used by
// the roachperf.
c.Run(ctx, c.Node(numNodes), "mkdir", t.PerfArtifactsDir())
cmd := fmt.Sprintf(
`echo '{ "max_concurrency": %d }' > %s/stats.json`,
minConcurrency, t.PerfArtifactsDir(),
min, t.PerfArtifactsDir(),
)
c.Run(ctx, c.Node(numNodes), cmd)
if min == minConcurrency {
// In this case, we had a node crash in each iteration of the binary
// search. This is unexpected, so fail the test.
t.Fatal("couldn't sustain minimum concurrency")
}
}

// Each iteration of the binary search can take on the order of 2-3 hours
// (with concurrency around 1000), so use the longest timeout allowed by the
// roachtest infra (without marking the test as "weekly").
const timeout = 18 * time.Hour

r.Add(registry.TestSpec{
Name: "tpch_concurrency",
Owner: registry.OwnerSQLQueries,
Timeout: timeout,
Cluster: r.MakeClusterSpec(numNodes),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runTPCHConcurrency(ctx, t, c, false /* disableStreamer */)
Expand All @@ -197,6 +220,7 @@ func registerTPCHConcurrency(r registry.Registry) {
r.Add(registry.TestSpec{
Name: "tpch_concurrency/no_streamer",
Owner: registry.OwnerSQLQueries,
Timeout: timeout,
Cluster: r.MakeClusterSpec(numNodes),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runTPCHConcurrency(ctx, t, c, true /* disableStreamer */)
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/colexec/parallel_unordered_synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,12 @@ func (s *ParallelUnorderedSynchronizer) init() {
go func(input colexecargs.OpWithMetaInfo, inputIdx int) {
span := s.tracingSpans[inputIdx]
defer func() {
if span != nil {
defer span.Finish()
}
if int(atomic.AddUint32(&s.numFinishedInputs, 1)) == len(s.inputs) {
close(s.batchCh)
}
if span != nil {
span.Finish()
}
s.internalWaitGroup.Done()
s.externalWaitGroup.Done()
}()
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/colflow/colrpc/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ func (o *Outbox) Run(
return err
}

// TODO(yuzefovich): the row-based outbox sends the header as part of
// the first message with data, consider doing that here too.
log.VEvent(ctx, 2, "Outbox sending header")
// Send header message to establish the remote server (consumer).
if err := stream.Send(
Expand All @@ -218,7 +220,9 @@ func (o *Outbox) Run(
}
return nil
}(); err != nil {
// error during stream set up.
// An error during stream setup - the whole query will fail, so we might
// as well proactively cancel the flow on this node.
flowCtxCancel()
o.close(ctx)
return
}
Expand Down
8 changes: 2 additions & 6 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -1326,14 +1326,10 @@ func (r *vectorizedFlowCreatorHelper) checkInboundStreamID(sid execinfrapb.Strea
func (r *vectorizedFlowCreatorHelper) accumulateAsyncComponent(run runFn) {
r.f.AddStartable(
flowinfra.StartableFn(func(ctx context.Context, wg *sync.WaitGroup, flowCtxCancel context.CancelFunc) {
if wg != nil {
wg.Add(1)
}
wg.Add(1)
go func() {
defer wg.Done()
run(ctx, flowCtxCancel)
if wg != nil {
wg.Done()
}
}()
}))
}
Expand Down
77 changes: 36 additions & 41 deletions pkg/sql/flowinfra/flow_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,31 +90,6 @@ func NewInboundStreamInfo(
}
}

// connect marks s as connected. It is an error if s was already marked either
// as connected or canceled. It should be called without holding any mutexes.
func (s *InboundStreamInfo) connect(
flowID execinfrapb.FlowID, streamID execinfrapb.StreamID,
) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.mu.connected {
return errors.Errorf("flow %s: inbound stream %d already connected", flowID, streamID)
}
if s.mu.canceled {
return errors.Errorf("flow %s: inbound stream %d came too late", flowID, streamID)
}
s.mu.connected = true
return nil
}

// disconnect marks s as not connected. It should be called without holding any
// mutexes.
func (s *InboundStreamInfo) disconnect() {
s.mu.Lock()
s.mu.connected = false
s.mu.Unlock()
}

// finishLocked marks s as finished and calls onFinish. The mutex of s must be
// held when calling this method.
func (s *InboundStreamInfo) finishLocked() {
Expand All @@ -124,7 +99,6 @@ func (s *InboundStreamInfo) finishLocked() {
if s.mu.finished {
panic("double finish")
}

s.mu.finished = true
s.onFinish()
}
Expand Down Expand Up @@ -484,11 +458,10 @@ func (fr *FlowRegistry) Drain(
}
// Now cancel all still running flows.
for _, f := range fr.flows {
if f.flow != nil && f.flow.ctxCancel != nil {
if f.flow != nil {
// f.flow might be nil when ConnectInboundStream() was
// called, but the consumer of that inbound stream hasn't
// been scheduled yet.
// f.flow.ctxCancel might be nil in tests.
f.flow.ctxCancel()
}
}
Expand Down Expand Up @@ -571,7 +544,7 @@ func (fr *FlowRegistry) ConnectInboundStream(
streamID execinfrapb.StreamID,
stream execinfrapb.DistSQL_FlowStreamServer,
timeout time.Duration,
) (*FlowBase, InboundStreamHandler, func(), error) {
) (_ *FlowBase, _ InboundStreamHandler, cleanup func(), retErr error) {
fr.Lock()
entry := fr.getEntryLocked(flowID)
flow := entry.flow
Expand Down Expand Up @@ -604,30 +577,52 @@ func (fr *FlowRegistry) ConnectInboundStream(
}
}

defer func() {
if retErr != nil {
// If any error is encountered below, we know that the distributed
// query execution will fail, so we cancel the flow on this node. If
// this node is the gateway, this might actually be required for
// proper shutdown of the whole distributed plan.
flow.ctxCancel()
}
}()

// entry.inboundStreams is safe to access without holding the mutex since
// the map is not modified after Flow.Setup.
s, ok := entry.inboundStreams[streamID]
if !ok {
return nil, nil, nil, errors.Errorf("flow %s: no inbound stream %d", flowID, streamID)
}
// We now mark the stream as connected but, if an error happens later
// because the handshake fails, we reset the state; we want the stream to be
// considered timed out when the moment comes just as if this connection
// attempt never happened.
if err := s.connect(flowID, streamID); err != nil {
return nil, nil, nil, err
}

if err := stream.Send(&execinfrapb.ConsumerSignal{
// Don't mark s as connected until after the handshake succeeds.
handshakeErr := stream.Send(&execinfrapb.ConsumerSignal{
Handshake: &execinfrapb.ConsumerHandshake{
ConsumerScheduled: true,
Version: execinfra.Version,
MinAcceptedVersion: execinfra.MinAcceptedVersion,
},
}); err != nil {
s.disconnect()
return nil, nil, nil, err
}
})

s.mu.Lock()
defer s.mu.Unlock()
if s.mu.canceled {
// Regardless of whether the handshake succeeded or not, this inbound
// stream has already been canceled and properly finished.
return nil, nil, nil, errors.Errorf("flow %s: inbound stream %d came too late", flowID, streamID)
}
if handshakeErr != nil {
// The handshake failed, so we're canceling this stream.
s.mu.canceled = true
s.finishLocked()
return nil, nil, nil, handshakeErr
}
if s.mu.connected {
// This is unexpected - the FlowStream RPC was issued twice by the
// outboxes for the same stream. We are processing the second RPC call
// right now, so there is another goroutine that will handle the
// cleanup, so we defer the cleanup to that goroutine.
return nil, nil, nil, errors.AssertionFailedf("flow %s: inbound stream %d already connected", flowID, streamID)
}
s.mu.connected = true
return flow, s.receiver, s.finish, nil
}
Loading

0 comments on commit a6c94d3

Please sign in to comment.