diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go index 1e73b666881c..aab10254453f 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go @@ -398,7 +398,6 @@ type rawEventFeed []kvpb.RangeFeedEvent func (f rawEventFeed) run( ctx context.Context, spans []kvcoord.SpanTimePair, - withDiff bool, eventC chan<- kvcoord.RangeFeedMessage, opts ...kvcoord.RangeFeedOption, ) error { diff --git a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go index 190a5b7eecff..1eb61891c5fe 100644 --- a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go @@ -37,7 +37,6 @@ type rangeFeedConfig struct { type rangefeedFactory func( ctx context.Context, spans []kvcoord.SpanTimePair, - withDiff bool, eventC chan<- kvcoord.RangeFeedMessage, opts ...kvcoord.RangeFeedOption, ) error @@ -79,9 +78,12 @@ func (p rangefeedFactory) Run(ctx context.Context, sink kvevent.Writer, cfg rang if cfg.UseMux { rfOpts = append(rfOpts, kvcoord.WithMuxRangeFeed()) } + if cfg.WithDiff { + rfOpts = append(rfOpts, kvcoord.WithDiff()) + } g.GoCtx(func(ctx context.Context) error { - return p(ctx, cfg.Spans, cfg.WithDiff, feed.eventC, rfOpts...) + return p(ctx, cfg.Spans, feed.eventC, rfOpts...) }) return g.Wait() } diff --git a/pkg/kv/kvclient/kvcoord/BUILD.bazel b/pkg/kv/kvclient/kvcoord/BUILD.bazel index ced73d0b9909..f85bb1e61855 100644 --- a/pkg/kv/kvclient/kvcoord/BUILD.bazel +++ b/pkg/kv/kvclient/kvcoord/BUILD.bazel @@ -69,6 +69,7 @@ go_library( "//pkg/util/ctxgroup", "//pkg/util/envutil", "//pkg/util/errorutil/unimplemented", + "//pkg/util/future", "//pkg/util/grpcutil", "//pkg/util/hlc", "//pkg/util/iterutil", diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go index 38086a99c1ff..2792f71bc941 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go @@ -12,16 +12,24 @@ package kvcoord import ( "context" - "sync" + "io" + "net" "sync/atomic" + "time" "unsafe" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/future" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/limit" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/pprofutil" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -32,24 +40,15 @@ import ( // rangefeeds. rangefeedMuxer caches MuxRangeFeed stream per node, and executes // each range feed request on an appropriate node. type rangefeedMuxer struct { - // eventCh receives events from all active muxStreams. - eventCh chan *kvpb.MuxRangeFeedEvent - // Context group controlling execution of MuxRangeFeed calls. When this group - // cancels, the entire muxer shuts down. The goroutines started in `g` will - // always return `nil` errors except when they detect that the mux is shutting - // down. + // cancels, the entire muxer shuts down. g ctxgroup.Group - // When g cancels, demuxLoopDone gets closed. - demuxLoopDone chan struct{} - - mu struct { - syncutil.Mutex - - // map of active MuxRangeFeed clients. - clients map[roachpb.NodeID]*muxClientState - } + ds *DistSender + cfg rangeFeedConfig + registry *rangeFeedRegistry + catchupSem *limit.ConcurrentRequestLimiter + eventCh chan<- RangeFeedMessage // Each call to start new range feed gets a unique ID which is echoed back // by MuxRangeFeed rpc. This is done as a safety mechanism to make sure @@ -58,161 +57,233 @@ type rangefeedMuxer struct { // Accessed atomically. seqID int64 - // producers is a map of all rangefeeds running across all nodes. - // streamID -> *channelRangeFeedEventProducer. - producers syncutil.IntMap + // muxClient is a nodeID -> *muxStreamOrError + muxClients syncutil.IntMap } -// muxClientState is the state maintained for each MuxRangeFeed rpc. -type muxClientState struct { - initCtx termCtx // signaled when client ready to be used. - doneCtx terminationContext // signaled when client shuts down. - - // RPC state. Valid only after initCtx.Done(). - client kvpb.Internal_MuxRangeFeedClient - cancel context.CancelFunc +// muxRangeFeed is an entry point to establish MuxRangeFeed +// RPC for the specified spans. Waits for rangefeed to complete. +func muxRangeFeed( + ctx context.Context, + cfg rangeFeedConfig, + spans []SpanTimePair, + ds *DistSender, + rr *rangeFeedRegistry, + catchupSem *limit.ConcurrentRequestLimiter, + eventCh chan<- RangeFeedMessage, +) (retErr error) { + if log.V(1) { + log.Infof(ctx, "Establishing MuxRangeFeed (%s...; %d spans)", spans[0], len(spans)) + start := timeutil.Now() + defer func() { + log.Infof(ctx, "MuxRangeFeed terminating after %s with err=%v", timeutil.Since(start), retErr) + }() + } - // Number of consumers (ranges) running on this node; accessed under rangefeedMuxer lock. - numStreams int + m := &rangefeedMuxer{ + g: ctxgroup.WithContext(ctx), + registry: rr, + ds: ds, + cfg: cfg, + catchupSem: catchupSem, + eventCh: eventCh, + } + divideAllSpansOnRangeBoundaries(spans, m.startSingleRangeFeed, ds, &m.g) + return errors.CombineErrors(m.g.Wait(), ctx.Err()) } -func newRangefeedMuxer(g ctxgroup.Group) *rangefeedMuxer { - m := &rangefeedMuxer{ - eventCh: make(chan *kvpb.MuxRangeFeedEvent), - demuxLoopDone: make(chan struct{}), - g: g, +// muxStream represents MuxRangeFeed RPC established with a node. +// +// MuxRangeFeed is a bidirectional RPC: the muxStream.sender is the client -> +// server portion of the stream, and muxStream.receiver is the server -> client +// portion. Any number of RangeFeedRequests may be initiated with the server +// (sender.Send). The server will send MuxRangeFeed for all the range feeds, and +// those events are received via receiver.Recv. If an error occurs with one of +// the logical range feeds, a MuxRangeFeedEvent describing the error will be +// emitted. This error can be handled appropriately, and rangefeed may be +// restarted. The sender and receiver may continue to be used to handle other +// requests and events. However, if either sender or receiver return an error, +// the entire stream must be torn down, and all active range feeds should be +// restarted. +type muxStream struct { + nodeID roachpb.NodeID + streams syncutil.IntMap // streamID -> *activeMuxRangeFeed. + + // mu must be held when starting rangefeed. + mu struct { + syncutil.Mutex + sender rangeFeedRequestSender + closed bool } +} - m.mu.clients = make(map[roachpb.NodeID]*muxClientState) - m.g.GoCtx(m.demuxLoop) +// muxStreamOrError is a tuple of mux stream connection or an error that +// occurred while connecting to the node. +type muxStreamOrError struct { + stream *muxStream + err error +} - return m +// activeMuxRangeFeed augments activeRangeFeed with additional state. +type activeMuxRangeFeed struct { + *activeRangeFeed + token rangecache.EvictionToken + startAfter hlc.Timestamp + catchupRes limit.Reservation } -// channelRangeFeedEventProducer is a rangeFeedEventProducer which receives -// events on input channel, and returns events when Recv is called. -type channelRangeFeedEventProducer struct { - // Event producer utilizes two contexts: - // - // - callerCtx connected to singleRangeFeed, i.e. a context that will cancel - // if a single-range rangefeed fails (range stuck, parent ctx cancels). - // - muxClientCtx connected to receiveEventsFromNode, i.e. a streaming RPC to - // a node serving multiple rangefeeds. This cancels if, for example, the - // remote node goes down or there are networking issues. - // - // When singleRangeFeed blocks in Recv(), we have to respect cancellations in - // both contexts. The implementation of Recv() on this type does this. - callerCtx context.Context - muxClientCtx terminationContext - - streamID int64 // stream ID for this producer. - eventCh chan *kvpb.RangeFeedEvent // consumer event channel. +func (a *activeMuxRangeFeed) release() { + a.activeRangeFeed.release() + if a.catchupRes != nil { + a.catchupRes.Release() + } } -var _ kvpb.RangeFeedEventProducer = (*channelRangeFeedEventProducer)(nil) +// the "Send" portion of the kvpb.Internal_MuxRangeFeedClient +type rangeFeedRequestSender interface { + Send(req *kvpb.RangeFeedRequest) error +} -// Recv implements rangeFeedEventProducer interface. -func (c *channelRangeFeedEventProducer) Recv() (*kvpb.RangeFeedEvent, error) { - select { - case <-c.callerCtx.Done(): - return nil, c.callerCtx.Err() - case <-c.muxClientCtx.Done(): - return nil, c.muxClientCtx.Err() - case e := <-c.eventCh: - return e, nil - } +// the "Recv" portion of the kvpb.Internal_MuxRangeFeedClient. +type muxRangeFeedEventReceiver interface { + Recv() (*kvpb.MuxRangeFeedEvent, error) } -// startMuxRangeFeed begins the execution of rangefeed for the specified -// RangeFeedRequest. -// The passed in client is only needed to establish MuxRangeFeed RPC. -func (m *rangefeedMuxer) startMuxRangeFeed( - ctx context.Context, client rpc.RestrictedInternalClient, req *kvpb.RangeFeedRequest, -) (kvpb.RangeFeedEventProducer, func(), error) { - ms, err := m.establishMuxConnection(ctx, client, req.Replica.NodeID) - if err != nil { - return nil, nil, err +// startSingleRangeFeed looks up routing information for the +// span, and begins execution of rangefeed. +func (m *rangefeedMuxer) startSingleRangeFeed( + ctx context.Context, rs roachpb.RSpan, startAfter hlc.Timestamp, token rangecache.EvictionToken, +) error { + // Bound the partial rangefeed to the partial span. + span := rs.AsRawSpanWithNoLocals() + + var releaseTransport func() + maybeReleaseTransport := func() { + if releaseTransport != nil { + releaseTransport() + releaseTransport = nil + } } + defer maybeReleaseTransport() - req.StreamID = atomic.AddInt64(&m.seqID, 1) - streamCtx := logtags.AddTag(ctx, "stream", req.StreamID) - producer := &channelRangeFeedEventProducer{ - callerCtx: streamCtx, - muxClientCtx: ms.doneCtx, - streamID: req.StreamID, - eventCh: make(chan *kvpb.RangeFeedEvent), + // Before starting single rangefeed, acquire catchup scan quota. + catchupRes, err := acquireCatchupScanQuota(ctx, m.ds, m.catchupSem) + if err != nil { + return err } - m.producers.Store(req.StreamID, unsafe.Pointer(producer)) - if log.V(1) { - log.Info(streamCtx, "starting rangefeed") + // Register active mux range feed. + stream := &activeMuxRangeFeed{ + activeRangeFeed: newActiveRangeFeed(span, startAfter, m.registry, m.ds.metrics.RangefeedRanges), + startAfter: startAfter, + catchupRes: catchupRes, + token: token, } + streamID := atomic.AddInt64(&m.seqID, 1) + + // stream ownership gets transferred (indicated by setting stream to nil) when + // we successfully send request. If we fail to do so, cleanup. + defer func() { + if stream != nil { + stream.release() + } + }() + + // Start a retry loop for sending the batch to the range. + for r := retry.StartWithCtx(ctx, m.ds.rpcRetryOptions); r.Next(); { + maybeReleaseTransport() + + // If we've cleared the descriptor on failure, re-lookup. + if !token.Valid() { + var err error + ri, err := m.ds.getRoutingInfo(ctx, rs.Key, rangecache.EvictionToken{}, false) + if err != nil { + log.VErrEventf(ctx, 1, "range descriptor re-lookup failed: %s", err) + if !rangecache.IsRangeLookupErrorRetryable(err) { + return err + } + continue + } + token = ri + } - cleanup := func() { - m.producers.Delete(req.StreamID) + // Establish a RangeFeed for a single Range. + log.VEventf(ctx, 1, "MuxRangeFeed starting for range %s@%s (rangeID %d)", + span, startAfter, token.Desc().RangeID) + transport, err := newTransportForRange(ctx, token.Desc(), m.ds) + if err != nil { + log.VErrEventf(ctx, 1, "Failed to create transport for %s ", token.String()) + continue + } + releaseTransport = transport.Release - m.mu.Lock() - defer m.mu.Unlock() + for !transport.IsExhausted() { + args := makeRangeFeedRequest(span, token.Desc().RangeID, m.cfg.overSystemTable, startAfter, m.cfg.withDiff) + args.Replica = transport.NextReplica() + args.StreamID = streamID - ms.numStreams-- - if ms.numStreams == 0 { - delete(m.mu.clients, req.Replica.NodeID) - if log.V(1) { - log.InfofDepth(streamCtx, 1, "shut down inactive mux for node %d", req.Replica.NodeID) + rpcClient, err := transport.NextInternalClient(ctx) + if err != nil { + log.VErrEventf(ctx, 1, "RPC error connecting to replica %s: %s", args.Replica, err) + continue + } + + conn, err := m.establishMuxConnection(ctx, rpcClient, args.Replica.NodeID) + if err != nil { + return err + } + + if err := conn.startRangeFeed(streamID, stream, &args); err != nil { + log.VErrEventf(ctx, 1, + "RPC error establishing mux rangefeed to replica %s: %s", args.Replica, err) + continue } - ms.cancel() + // We successfully established rangefeed, so the responsibility + // for releasing the stream is transferred to the mux go routine. + stream = nil + return nil } - } - if err := ms.client.Send(req); err != nil { - cleanup() - return nil, nil, err + // If the transport is exhausted, we evict routing token and retry range + // resolution. + token.Evict(ctx) + token = rangecache.EvictionToken{} } - return producer, cleanup, nil + + return ctx.Err() } // establishMuxConnection establishes MuxRangeFeed RPC with the node. func (m *rangefeedMuxer) establishMuxConnection( ctx context.Context, client rpc.RestrictedInternalClient, nodeID roachpb.NodeID, -) (*muxClientState, error) { - // NB: the `ctx` in scope here belongs to a client for a single range feed, and must - // not influence the lifetime of the mux connection. At the time of writing, the caller - // is `singleRangeFeed` which calls into this method through its streamProducerFactory - // argument. - m.mu.Lock() - ms, found := m.mu.clients[nodeID] - if !found { - // Initialize muxClientState. - // Only initCtx is initialized here since we need to block on it. - // The rest of the initialization happens in startNodeMuxRangeFeed. - ms = &muxClientState{initCtx: makeTerminationContext()} - // Kick off client initialization on another Go routine. - // It is important that we start MuxRangeFeed RPC using long-lived - // context available in the main context group used for this muxer. +) (*muxStream, error) { + ptr, exists := m.muxClients.LoadOrStore(int64(nodeID), unsafe.Pointer(future.Make[muxStreamOrError]())) + muxClient := (*future.Future[muxStreamOrError])(ptr) + if !exists { + // Start mux rangefeed go routine responsible for receiving MuxRangeFeedEvents. m.g.GoCtx(func(ctx context.Context) error { - return m.startNodeMuxRangeFeed(ctx, ms, client, nodeID) + return m.startNodeMuxRangeFeed(ctx, client, nodeID, muxClient) }) - m.mu.clients[nodeID] = ms } - ms.numStreams++ - m.mu.Unlock() // Ensure mux client is ready. + init := future.MakeAwaitableFuture(muxClient) select { case <-ctx.Done(): return nil, ctx.Err() - case <-ms.initCtx.Done(): - return ms, ms.initCtx.Err() + case <-init.Done(): + c := init.Get() + return c.stream, c.err } } // startNodeMuxRangeFeedLocked establishes MuxRangeFeed RPC with the node. func (m *rangefeedMuxer) startNodeMuxRangeFeed( ctx context.Context, - ms *muxClientState, client rpc.RestrictedInternalClient, nodeID roachpb.NodeID, -) error { + stream *future.Future[muxStreamOrError], +) (retErr error) { ctx = logtags.AddTag(ctx, "mux_n", nodeID) // Add "generation" number to the context so that log messages and stacks can // differentiate between multiple instances of mux rangefeed Go routine @@ -222,141 +293,249 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed( defer restore() if log.V(1) { - log.Info(ctx, "Establishing MuxRangeFeed") + log.Infof(ctx, "Establishing MuxRangeFeed to node %d", nodeID) start := timeutil.Now() defer func() { - log.Infof(ctx, "MuxRangeFeed terminating after %s", timeutil.Since(start)) + log.Infof(ctx, "MuxRangeFeed to node %d terminating after %s with err=%v", + nodeID, timeutil.Since(start), retErr) }() } - doneCtx := makeTerminationContext() - ms.doneCtx = &doneCtx ctx, cancel := context.WithCancel(ctx) + defer cancel() + + mux, err := client.MuxRangeFeed(ctx) + if err != nil { + return future.MustSet(stream, muxStreamOrError{err: err}) + } - ms.cancel = func() { - cancel() - doneCtx.close(context.Canceled) + ms := muxStream{nodeID: nodeID} + ms.mu.sender = mux + if err := future.MustSet(stream, muxStreamOrError{stream: &ms}); err != nil { + return err } - defer ms.cancel() - - // NB: it is important that this Go routine never returns an error. Errors - // should be propagated to the caller either via initCtx.err, or doneCtx.err. - // We do this to make sure that this error does not kill entire context group. - // We want the caller (singleRangeFeed) to decide if this error is retry-able. - var err error - ms.client, err = client.MuxRangeFeed(ctx) - ms.initCtx.close(err) - - if err == nil { - err = m.receiveEventsFromNode(ctx, ms) + + stuckWatcher := newStuckRangeFeedCanceler(cancel, defaultStuckRangeThreshold(m.ds.st)) + defer stuckWatcher.stop() + + if recvErr := m.receiveEventsFromNode(ctx, mux, stuckWatcher, &ms); recvErr != nil { + // Clear out this client, and restart all streams on this node. + // Note: there is a race here where we may delete this muxClient, while + // another go routine loaded it. That's fine, since we would not + // be able to send new request on this stream anymore, and we'll retry + // against another node. + m.muxClients.Delete(int64(nodeID)) + + if recvErr == io.EOF { + recvErr = nil + } + + return ms.closeWithRestart(ctx, recvErr, func(_ int64, a *activeMuxRangeFeed) error { + return m.restartActiveRangeFeed(ctx, a, recvErr) + }) } - doneCtx.close(err) - // We propagated error to the caller via init/done context. - return nil //nolint:returnerrcheck + return nil } -// demuxLoop de-multiplexes events and sends them to appropriate rangefeed event -// consumer. -func (m *rangefeedMuxer) demuxLoop(ctx context.Context) (retErr error) { - defer close(m.demuxLoopDone) +// receiveEventsFromNode receives mux rangefeed events from a node. +func (m *rangefeedMuxer) receiveEventsFromNode( + ctx context.Context, + receiver muxRangeFeedEventReceiver, + stuckWatcher *stuckRangeFeedCanceler, + ms *muxStream, +) error { + stuckThreshold := defaultStuckRangeThreshold(m.ds.st) + stuckCheckFreq := func() time.Duration { + if threshold := stuckThreshold(); threshold > 0 { + return threshold + } + return time.Minute + } + nextStuckCheck := timeutil.Now().Add(stuckCheckFreq()) + var event *kvpb.MuxRangeFeedEvent for { - select { - case <-ctx.Done(): - return ctx.Err() - case e := <-m.eventCh: - var producer *channelRangeFeedEventProducer - if v, found := m.producers.Load(e.StreamID); found { - producer = (*channelRangeFeedEventProducer)(v) + if err := stuckWatcher.do(func() (err error) { + event, err = receiver.Recv() + return err + }); err != nil { + return err + } + + active := ms.lookupStream(event.StreamID) + + // The stream may already have terminated. That's fine -- we may have + // encountered range split or similar rangefeed error, causing the caller to + // exit (and terminate this stream), but the server side stream termination + // is async and probabilistic (rangefeed registration output loop may have a + // checkpoint event available, *and* it may have context cancellation, but + // which one executes is a coin flip) and so it is possible that we may see + // additional event(s) arriving for a stream that is no longer active. + if active == nil { + if log.V(1) { + log.Infof(ctx, "received stray event stream %d: %v", event.StreamID, event) } + continue + } - // The stream may already have terminated (either producer is nil, or - // producer.muxClientCtx.Done()). That's fine -- we may have encountered range - // split or similar rangefeed error, causing the caller to exit (and - // terminate this stream), but the server side stream termination is async - // and probabilistic (rangefeed registration output loop may have a - // checkpoint event available, *and* it may have context cancellation, but - // which one executes is a coin flip) and so it is possible that we may - // see additional event(s) arriving for a stream that is no longer active. - if producer == nil { - if log.V(1) { - log.Infof(ctx, "received stray event stream %d: %v", e.StreamID, e) + switch t := event.GetValue().(type) { + case *kvpb.RangeFeedCheckpoint: + if t.Span.Contains(active.Span) { + // If we see the first non-empty checkpoint, we know we're done with the catchup scan. + if !t.ResolvedTS.IsEmpty() && active.catchupRes != nil { + active.catchupRes.Release() + active.catchupRes = nil } - continue + // Note that this timestamp means that all rows in the span with + // writes at or before the timestamp have now been seen. The + // Timestamp field in the request is exclusive, meaning if we send + // the request with exactly the ResolveTS, we'll see only rows after + // that timestamp. + active.startAfter.Forward(t.ResolvedTS) + } + case *kvpb.RangeFeedError: + log.VErrEventf(ctx, 2, "RangeFeedError: %s", t.Error.GoError()) + if active.catchupRes != nil { + m.ds.metrics.RangefeedErrorCatchup.Inc(1) + } + ms.streams.Delete(event.StreamID) + if err := m.restartActiveRangeFeed(ctx, active, t.Error.GoError()); err != nil { + return err } + continue + } - select { - case <-ctx.Done(): - return ctx.Err() - case producer.eventCh <- &e.RangeFeedEvent: - case <-producer.callerCtx.Done(): - if log.V(1) { - log.Infof(ctx, "received stray event, but caller exited: stream=%d: e=%v", e.StreamID, e) - } - case <-producer.muxClientCtx.Done(): - if log.V(1) { - log.Infof(ctx, "received stray event, but node mux exited: stream=%d: e=%v", e.StreamID, e) + active.onRangeEvent(ms.nodeID, event.RangeID, &event.RangeFeedEvent) + msg := RangeFeedMessage{RangeFeedEvent: &event.RangeFeedEvent, RegisteredSpan: active.Span} + select { + case <-ctx.Done(): + return ctx.Err() + case m.eventCh <- msg: + } + + // Piggyback on this loop to check if any of the active ranges + // on this node appear to be stuck. + // NB: this does not notify the server in any way. We may have to add + // a more complex protocol -- or better yet, figure out why ranges + // get stuck in the first place. + if timeutil.Now().Before(nextStuckCheck) { + if threshold := stuckThreshold(); threshold > 0 { + if _, err := ms.eachStream(func(id int64, a *activeMuxRangeFeed) error { + if !a.startAfter.IsEmpty() && timeutil.Since(a.startAfter.GoTime()) > stuckThreshold() { + ms.streams.Delete(id) + return m.restartActiveRangeFeed(ctx, a, errRestartStuckRange) + } + return nil + }); err != nil { + return err } } + nextStuckCheck = timeutil.Now().Add(stuckCheckFreq()) } } } -// terminationContext (inspired by context.Context) describes -// termination information. -type terminationContext interface { - Done() <-chan struct{} - Err() error -} +// restartActiveRangeFeed restarts rangefeed after it encountered "reason" error. +func (m *rangefeedMuxer) restartActiveRangeFeed( + ctx context.Context, active *activeMuxRangeFeed, reason error, +) error { + if log.V(1) { + log.Infof(ctx, "RangeFeed %s@%s disconnected with last checkpoint %s ago: %v", + active.Span, active.StartAfter, timeutil.Since(active.Resolved.GoTime()), reason) + } + active.setLastError(reason) + active.release() -// termCtx implements terminationContext, and allows error to be set. -type termCtx struct { - sync.Once - done chan struct{} - err error -} + errInfo, err := handleRangefeedError(ctx, reason) + if err != nil { + // If this is an error we cannot recover from, terminate the rangefeed. + return err + } -func makeTerminationContext() termCtx { - return termCtx{done: make(chan struct{})} -} + if errInfo.evict && active.token.Valid() { + active.token.Evict(ctx) + active.token = rangecache.EvictionToken{} + } -func (tc *termCtx) Done() <-chan struct{} { - return tc.done -} -func (tc *termCtx) Err() error { - return tc.err + rs, err := keys.SpanAddr(active.Span) + if err != nil { + return err + } + + if errInfo.resolveSpan { + return divideSpanOnRangeBoundaries(ctx, m.ds, rs, active.startAfter, m.startSingleRangeFeed) + } + return m.startSingleRangeFeed(ctx, rs, active.startAfter, active.token) } -// close closes this context with specified error. -func (tc *termCtx) close(err error) { - tc.Do(func() { - tc.err = err - close(tc.done) - }) +// startRangeFeed initiates rangefeed for the specified request running +// on this node connection. If no error returned, registers stream +// with this connection. Otherwise, stream is not registered. +func (c *muxStream) startRangeFeed( + streamID int64, stream *activeMuxRangeFeed, req *kvpb.RangeFeedRequest, +) error { + // NB: lock must be held for the duration of this method. + c.mu.Lock() + defer c.mu.Unlock() + + if c.mu.closed { + return net.ErrClosed + } + + // Concurrent Send calls are not thread safe; thus Send calls must be + // synchronized. + if err := c.mu.sender.Send(req); err != nil { + return err + } + + // mu must be held while marking this stream in flight (streams.Store) to + // synchronize with mux termination. When node mux terminates, it invokes + // c.closeWithRestart(), which marks this mux stream connection closed and + // restarts all active streams. Thus, we must make sure that this streamID + // gets properly recorded even if mux go routine terminates right after the + // above sender.Send() succeeded. + c.streams.Store(streamID, unsafe.Pointer(stream)) + return nil } -// receiveEventsFromNode receives mux rangefeed events, and forwards them to the -// demuxLoop. -// Passed in context must be the context used to create ms.client. -func (m *rangefeedMuxer) receiveEventsFromNode(ctx context.Context, ms *muxClientState) error { - for { - event, streamErr := ms.client.Recv() +func (c *muxStream) lookupStream(streamID int64) *activeMuxRangeFeed { + v, ok := c.streams.Load(streamID) + if ok { + return (*activeMuxRangeFeed)(v) + } + return nil +} - if streamErr != nil { - return streamErr - } +func (c *muxStream) closeWithRestart( + ctx context.Context, reason error, restartFn func(streamID int64, a *activeMuxRangeFeed) error, +) error { + c.mu.Lock() + c.mu.closed = true + c.mu.Unlock() + + // make sure that the underlying error is not fatal. If it is, there is no + // reason to restart each rangefeed, so just bail out. + if _, err := handleRangefeedError(ctx, reason); err != nil { + return err + } - select { - case <-ctx.Done(): - // Normally, when ctx is done, we would receive streamErr above. - // But it's possible that the context was canceled right after the last Recv(), - // and in that case we must exit. - return ctx.Err() - case <-m.demuxLoopDone: - // demuxLoop exited, and so should we (happens when main context group completes) - return errors.Wrapf(context.Canceled, "demux loop terminated") - case m.eventCh <- event: - } + n, err := c.eachStream(restartFn) + if log.V(1) { + log.Infof(ctx, "mux to node %d restarted %d streams: err=%v", c.nodeID, n, err) } + return err +} + +// eachStream invokes provided function for each stream. If the function +// returns an error, iteration stops. Returns number of streams processed. +func (c *muxStream) eachStream( + fn func(streamID int64, a *activeMuxRangeFeed) error, +) (n int, err error) { + c.streams.Range(func(key int64, value unsafe.Pointer) bool { + err = fn(key, (*activeMuxRangeFeed)(value)) + n++ + return err == nil + }) + return n, err } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index 2c75ef7c72ab..3f2fb1d92e1d 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" @@ -37,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/iterutil" "github.com/cockroachdb/cockroach/pkg/util/limit" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/pprofutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -88,6 +90,7 @@ func maxConcurrentCatchupScans(sv *settings.Values) int { type rangeFeedConfig struct { useMuxRangeFeed bool overSystemTable bool + withDiff bool } // RangeFeedOption configures a RangeFeed. @@ -114,6 +117,13 @@ func WithSystemTablePriority() RangeFeedOption { }) } +// WithDiff turns on "diff" option for the rangefeed. +func WithDiff() RangeFeedOption { + return optionFunc(func(c *rangeFeedConfig) { + c.withDiff = true + }) +} + // A "kill switch" to disable multiplexing rangefeed if severe issues discovered with new implementation. var enableMuxRangeFeed = envutil.EnvOrDefaultBool("COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED", true) @@ -134,7 +144,6 @@ func (ds *DistSender) RangeFeed( ctx context.Context, spans []roachpb.Span, startAfter hlc.Timestamp, // exclusive - withDiff bool, eventCh chan<- RangeFeedMessage, opts ...RangeFeedOption, ) error { @@ -145,7 +154,7 @@ func (ds *DistSender) RangeFeed( StartAfter: startAfter, }) } - return ds.RangeFeedSpans(ctx, timedSpans, withDiff, eventCh, opts...) + return ds.RangeFeedSpans(ctx, timedSpans, eventCh, opts...) } // SpanTimePair is a pair of span along with its starting time. The starting @@ -156,12 +165,15 @@ type SpanTimePair struct { StartAfter hlc.Timestamp // exclusive } +func (p SpanTimePair) String() string { + return fmt.Sprintf("%s@%s", p.Span, p.StartAfter) +} + // RangeFeedSpans is similar to RangeFeed but allows specification of different // starting time for each span. func (ds *DistSender) RangeFeedSpans( ctx context.Context, spans []SpanTimePair, - withDiff bool, eventCh chan<- RangeFeedMessage, opts ...RangeFeedOption, ) error { @@ -178,26 +190,21 @@ func (ds *DistSender) RangeFeedSpans( ctx, sp := tracing.EnsureChildSpan(ctx, ds.AmbientContext.Tracer, "dist sender") defer sp.Finish() - rr := newRangeFeedRegistry(ctx, withDiff) + rr := newRangeFeedRegistry(ctx, cfg.withDiff) ds.activeRangeFeeds.Store(rr, nil) defer ds.activeRangeFeeds.Delete(rr) catchupSem := limit.MakeConcurrentRequestLimiter( "distSenderCatchupLimit", maxConcurrentCatchupScans(&ds.st.SV)) - g := ctxgroup.WithContext(ctx) - - var eventProducer rangeFeedEventProducerFactory if ds.st.Version.IsActive(ctx, clusterversion.TODODelete_V22_2RangefeedUseOneStreamPerNode) && enableMuxRangeFeed && cfg.useMuxRangeFeed { - m := newRangefeedMuxer(g) - eventProducer = m.startMuxRangeFeed - } else { - eventProducer = legacyRangeFeedEventProducer + return muxRangeFeed(ctx, cfg, spans, ds, rr, &catchupSem, eventCh) } // Goroutine that processes subdivided ranges and creates a rangefeed for // each. + g := ctxgroup.WithContext(ctx) rangeCh := make(chan singleRangeInfo, 16) g.GoCtx(func(ctx context.Context) error { for { @@ -205,8 +212,8 @@ func (ds *DistSender) RangeFeedSpans( case sri := <-rangeCh: // Spawn a child goroutine to process this feed. g.GoCtx(func(ctx context.Context) error { - return ds.partialRangeFeed(ctx, rr, eventProducer, sri.rs, sri.startAfter, - sri.token, withDiff, &catchupSem, rangeCh, eventCh, cfg) + return ds.partialRangeFeed(ctx, rr, sri.rs, sri.startAfter, + sri.token, &catchupSem, rangeCh, eventCh, cfg) }) case <-ctx.Done(): return ctx.Err() @@ -215,6 +222,17 @@ func (ds *DistSender) RangeFeedSpans( }) // Kick off the initial set of ranges. + divideAllSpansOnRangeBoundaries(spans, sendSingleRangeInfo(rangeCh), ds, &g) + + return g.Wait() +} + +// divideAllSpansOnRangeBoundaries divides all spans on range boundaries and invokes +// provided onRange function for each range. Resolution happens concurrently using provided +// context group. +func divideAllSpansOnRangeBoundaries( + spans []SpanTimePair, onRange onRangeFn, ds *DistSender, g *ctxgroup.Group, +) { for _, s := range spans { func(stp SpanTimePair) { g.GoCtx(func(ctx context.Context) error { @@ -222,12 +240,10 @@ func (ds *DistSender) RangeFeedSpans( if err != nil { return err } - return ds.divideAndSendRangeFeedToRanges(ctx, rs, stp.StartAfter, rangeCh) + return divideSpanOnRangeBoundaries(ctx, ds, rs, stp.StartAfter, onRange) }) }(s) } - - return g.Wait() } // RangeFeedContext is the structure containing arguments passed to @@ -287,6 +303,7 @@ func (ds *DistSender) ForEachActiveRangeFeed(fn ActiveRangeFeedIterFn) (iterErr // activeRangeFeed is a thread safe PartialRangeFeed. type activeRangeFeed struct { + release func() syncutil.Mutex PartialRangeFeed } @@ -334,8 +351,31 @@ func newRangeFeedRegistry(ctx context.Context, withDiff bool) *rangeFeedRegistry return rr } -func (ds *DistSender) divideAndSendRangeFeedToRanges( - ctx context.Context, rs roachpb.RSpan, startAfter hlc.Timestamp, rangeCh chan<- singleRangeInfo, +func sendSingleRangeInfo(rangeCh chan<- singleRangeInfo) onRangeFn { + return func(ctx context.Context, rs roachpb.RSpan, startAfter hlc.Timestamp, token rangecache.EvictionToken) error { + select { + case rangeCh <- singleRangeInfo{ + rs: rs, + startAfter: startAfter, + token: token, + }: + return nil + case <-ctx.Done(): + return ctx.Err() + } + } +} + +type onRangeFn func( + ctx context.Context, rs roachpb.RSpan, startAfter hlc.Timestamp, token rangecache.EvictionToken, +) error + +func divideSpanOnRangeBoundaries( + ctx context.Context, + ds *DistSender, + rs roachpb.RSpan, + startAfter hlc.Timestamp, + onRange onRangeFn, ) error { // As RangeIterator iterates, it can return overlapping descriptors (and // during splits, this happens frequently), but divideAndSendRangeFeedToRanges @@ -351,14 +391,8 @@ func (ds *DistSender) divideAndSendRangeFeedToRanges( return err } nextRS.Key = partialRS.EndKey - select { - case rangeCh <- singleRangeInfo{ - rs: partialRS, - startAfter: startAfter, - token: ri.Token(), - }: - case <-ctx.Done(): - return ctx.Err() + if err := onRange(ctx, partialRS, startAfter, ri.Token()); err != nil { + return err } if !ri.NeedAnother(nextRS) { break @@ -367,6 +401,29 @@ func (ds *DistSender) divideAndSendRangeFeedToRanges( return ri.Error() } +// newActiveRangeFeed registers active rangefeed with rangefeedRegistry. +// The caller must call active.release() in order to cleanup. +func newActiveRangeFeed( + span roachpb.Span, startAfter hlc.Timestamp, rr *rangeFeedRegistry, c *metric.Gauge, +) *activeRangeFeed { + // Register partial range feed with registry. + active := &activeRangeFeed{ + PartialRangeFeed: PartialRangeFeed{ + Span: span, + StartAfter: startAfter, + CreatedTime: timeutil.Now(), + }, + release: func() { + rr.ranges.Delete(active) + c.Dec(1) + }, + } + rr.ranges.Store(active, nil) + c.Inc(1) + + return active +} + // partialRangeFeed establishes a RangeFeed to the range specified by desc. It // manages lifecycle events of the range in order to maintain the RangeFeed // connection; this may involve instructing higher-level functions to retry @@ -374,11 +431,9 @@ func (ds *DistSender) divideAndSendRangeFeedToRanges( func (ds *DistSender) partialRangeFeed( ctx context.Context, rr *rangeFeedRegistry, - streamProducerFactory rangeFeedEventProducerFactory, rs roachpb.RSpan, startAfter hlc.Timestamp, token rangecache.EvictionToken, - withDiff bool, catchupSem *limit.ConcurrentRequestLimiter, rangeCh chan<- singleRangeInfo, eventCh chan<- RangeFeedMessage, @@ -388,17 +443,8 @@ func (ds *DistSender) partialRangeFeed( span := rs.AsRawSpanWithNoLocals() // Register partial range feed with registry. - active := &activeRangeFeed{ - PartialRangeFeed: PartialRangeFeed{ - Span: span, - StartAfter: startAfter, - CreatedTime: timeutil.Now(), - }, - } - rr.ranges.Store(active, nil) - ds.metrics.RangefeedRanges.Inc(1) - defer rr.ranges.Delete(active) - defer ds.metrics.RangefeedRanges.Dec(1) + active := newActiveRangeFeed(span, startAfter, rr, ds.metrics.RangefeedRanges) + defer active.release() // Start a retry loop for sending the batch to the range. for r := retry.StartWithCtx(ctx, ds.rpcRetryOptions); r.Next(); { @@ -422,79 +468,172 @@ func (ds *DistSender) partialRangeFeed( } maxTS, err := ds.singleRangeFeed( - ctx, span, startAfter, withDiff, token.Desc(), - catchupSem, eventCh, streamProducerFactory, active.onRangeEvent, cfg) + ctx, span, startAfter, token.Desc(), + catchupSem, eventCh, active.onRangeEvent, cfg) // Forward the timestamp in case we end up sending it again. startAfter.Forward(maxTS) - if err != nil { - active.setLastError(err) + if log.V(1) { + log.Infof(ctx, "RangeFeed %s@%s disconnected with last checkpoint %s ago: %v", + active.Span, active.StartAfter, timeutil.Since(active.Resolved.GoTime()), err) + } + active.setLastError(err) - if log.V(1) { - log.Infof(ctx, "RangeFeed %s@%s disconnected with last checkpoint %s ago: %v", - span, startAfter, timeutil.Since(startAfter.GoTime()), err) - } - switch { - case errors.HasType(err, (*kvpb.StoreNotFoundError)(nil)) || - errors.HasType(err, (*kvpb.NodeUnavailableError)(nil)): - // These errors are likely to be unique to the replica that - // reported them, so no action is required before the next - // retry. - case errors.Is(err, errRestartStuckRange): - // Stuck ranges indicate a bug somewhere in the system. We are being - // defensive and attempt to restart this rangefeed. Usually, any - // stuck-ness is cleared out if we just attempt to re-resolve range - // descriptor and retry. - // - // The error contains the replica which we were waiting for. - log.Warningf(ctx, "restarting stuck rangefeed: %s", err) - token.Evict(ctx) - token = rangecache.EvictionToken{} - continue - case IsSendError(err), errors.HasType(err, (*kvpb.RangeNotFoundError)(nil)): - // Evict the descriptor from the cache and reload on next attempt. - token.Evict(ctx) - token = rangecache.EvictionToken{} - continue - case errors.HasType(err, (*kvpb.RangeKeyMismatchError)(nil)): - // Evict the descriptor from the cache. - token.Evict(ctx) - return ds.divideAndSendRangeFeedToRanges(ctx, rs, startAfter, rangeCh) - case errors.HasType(err, (*kvpb.RangeFeedRetryError)(nil)): - var t *kvpb.RangeFeedRetryError - if ok := errors.As(err, &t); !ok { - return errors.AssertionFailedf("wrong error type: %T", err) - } - switch t.Reason { - case kvpb.RangeFeedRetryError_REASON_REPLICA_REMOVED, - kvpb.RangeFeedRetryError_REASON_RAFT_SNAPSHOT, - kvpb.RangeFeedRetryError_REASON_LOGICAL_OPS_MISSING, - kvpb.RangeFeedRetryError_REASON_SLOW_CONSUMER: - // Try again with same descriptor. These are transient - // errors that should not show up again. - continue - case kvpb.RangeFeedRetryError_REASON_RANGE_SPLIT, - kvpb.RangeFeedRetryError_REASON_RANGE_MERGED, - kvpb.RangeFeedRetryError_REASON_NO_LEASEHOLDER: - // Evict the descriptor from the cache. - token.Evict(ctx) - return ds.divideAndSendRangeFeedToRanges(ctx, rs, startAfter, rangeCh) - default: - return errors.AssertionFailedf("unrecognized retryable error type: %T", err) - } - default: - return err - } + errInfo, err := handleRangefeedError(ctx, err) + if err != nil { + return err + } + if errInfo.evict { + token.Evict(ctx) + token = rangecache.EvictionToken{} + } + if errInfo.resolveSpan { + return divideSpanOnRangeBoundaries(ctx, ds, rs, active.StartAfter, sendSingleRangeInfo(rangeCh)) } } return ctx.Err() } +type rangefeedErrorInfo struct { + resolveSpan bool // true if the span resolution needs to be performed, and rangefeed restarted. + evict bool // true if routing info needs to be updated prior to retry. +} + +// handleRangefeedError handles an error that occurred while running rangefeed. +// Returns rangefeedErrorInfo describing how the error should be handled for the +// range. Returns an error if the entire rangefeed should terminate. +func handleRangefeedError(ctx context.Context, err error) (rangefeedErrorInfo, error) { + if err == nil { + return rangefeedErrorInfo{}, nil + } + + switch { + case errors.HasType(err, (*kvpb.StoreNotFoundError)(nil)) || + errors.HasType(err, (*kvpb.NodeUnavailableError)(nil)): + // These errors are likely to be unique to the replica that + // reported them, so no action is required before the next + // retry. + return rangefeedErrorInfo{}, nil + case errors.Is(err, errRestartStuckRange): + // Stuck ranges indicate a bug somewhere in the system. We are being + // defensive and attempt to restart this rangefeed. Usually, any + // stuck-ness is cleared out if we just attempt to re-resolve range + // descriptor and retry. + // + // The error contains the replica which we were waiting for. + log.Warningf(ctx, "restarting stuck rangefeed: %s", err) + return rangefeedErrorInfo{evict: true}, nil + case IsSendError(err), errors.HasType(err, (*kvpb.RangeNotFoundError)(nil)): + return rangefeedErrorInfo{evict: true}, nil + case errors.HasType(err, (*kvpb.RangeKeyMismatchError)(nil)): + return rangefeedErrorInfo{evict: true, resolveSpan: true}, nil + case errors.HasType(err, (*kvpb.RangeFeedRetryError)(nil)): + var t *kvpb.RangeFeedRetryError + if ok := errors.As(err, &t); !ok { + return rangefeedErrorInfo{}, errors.AssertionFailedf("wrong error type: %T", err) + } + switch t.Reason { + case kvpb.RangeFeedRetryError_REASON_REPLICA_REMOVED, + kvpb.RangeFeedRetryError_REASON_RAFT_SNAPSHOT, + kvpb.RangeFeedRetryError_REASON_LOGICAL_OPS_MISSING, + kvpb.RangeFeedRetryError_REASON_SLOW_CONSUMER: + // Try again with same descriptor. These are transient + // errors that should not show up again. + return rangefeedErrorInfo{}, nil + case kvpb.RangeFeedRetryError_REASON_RANGE_SPLIT, + kvpb.RangeFeedRetryError_REASON_RANGE_MERGED, + kvpb.RangeFeedRetryError_REASON_NO_LEASEHOLDER: + return rangefeedErrorInfo{evict: true, resolveSpan: true}, nil + default: + return rangefeedErrorInfo{}, errors.AssertionFailedf("unrecognized retryable error type: %T", err) + } + default: + return rangefeedErrorInfo{}, err + } +} + +func acquireCatchupScanQuota( + ctx context.Context, ds *DistSender, catchupSem *limit.ConcurrentRequestLimiter, +) (limit.Reservation, error) { + // Indicate catchup scan is starting; Before potentially blocking on a semaphore, take + // opportunity to update semaphore limit. + ds.metrics.RangefeedCatchupRanges.Inc(1) + catchupSem.SetLimit(maxConcurrentCatchupScans(&ds.st.SV)) + return catchupSem.Begin(ctx) +} + +// nweTransportForRange returns Transport for the specified range descriptor. +func newTransportForRange( + ctx context.Context, desc *roachpb.RangeDescriptor, ds *DistSender, +) (Transport, error) { + var latencyFn LatencyFunc + if ds.rpcContext != nil { + latencyFn = ds.rpcContext.RemoteClocks.Latency + } + replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, nil, AllExtantReplicas) + if err != nil { + return nil, err + } + replicas.OptimizeReplicaOrder(ds.getNodeID(), latencyFn, ds.locality) + opts := SendOptions{class: connectionClass(&ds.st.SV)} + return ds.transportFactory(opts, ds.nodeDialer, replicas) +} + // onRangeEventCb is invoked for each non-error range event. // nodeID identifies the node ID which generated the event. type onRangeEventCb func(nodeID roachpb.NodeID, rangeID roachpb.RangeID, event *kvpb.RangeFeedEvent) +// makeRangeFeedRequest constructs kvpb.RangeFeedRequest for specified span and +// rangeID. Request is constructed to watch event after specified timestamp, and +// with optional diff. If the request corresponds to a system range, request +// receives higher admission priority. +func makeRangeFeedRequest( + span roachpb.Span, + rangeID roachpb.RangeID, + isSystemRange bool, + startAfter hlc.Timestamp, + withDiff bool, +) kvpb.RangeFeedRequest { + admissionPri := admissionpb.BulkNormalPri + if isSystemRange { + admissionPri = admissionpb.NormalPri + } + return kvpb.RangeFeedRequest{ + Span: span, + Header: kvpb.Header{ + Timestamp: startAfter, + RangeID: rangeID, + }, + WithDiff: withDiff, + AdmissionHeader: kvpb.AdmissionHeader{ + // NB: AdmissionHeader is used only at the start of the range feed + // stream since the initial catch-up scan is expensive. + Priority: int32(admissionPri), + CreateTime: timeutil.Now().UnixNano(), + Source: kvpb.AdmissionHeader_FROM_SQL, + NoMemoryReservedAtSource: true, + }, + } +} + +func defaultStuckRangeThreshold(st *cluster.Settings) func() time.Duration { + return func() time.Duration { + // Before the introduction of kv.rangefeed.range_stuck_threshold = 1m, + // clusters may already have kv.closed_timestamp.side_transport_interval set + // to >1m. This would cause rangefeeds to continually restart. We therefore + // conservatively use the highest value. + threshold := rangefeedRangeStuckThreshold.Get(&st.SV) + if threshold > 0 { + if t := time.Duration(math.Round( + 1.2 * float64(closedts.SideTransportCloseInterval.Get(&st.SV)))); t > threshold { + threshold = t + } + } + return threshold + } +} + // singleRangeFeed gathers and rearranges the replicas, and makes a RangeFeed // RPC call. Results will be sent on the provided channel. Returns the timestamp // of the maximum rangefeed checkpoint seen, which can be used to re-establish @@ -505,11 +644,9 @@ func (ds *DistSender) singleRangeFeed( ctx context.Context, span roachpb.Span, startAfter hlc.Timestamp, - withDiff bool, desc *roachpb.RangeDescriptor, catchupSem *limit.ConcurrentRequestLimiter, eventCh chan<- RangeFeedMessage, - streamProducerFactory rangeFeedEventProducerFactory, onRangeEvent onRangeEventCb, cfg rangeFeedConfig, ) (_ hlc.Timestamp, retErr error) { @@ -522,38 +659,8 @@ func (ds *DistSender) singleRangeFeed( cancelFeed() }() - admissionPri := admissionpb.BulkNormalPri - if cfg.overSystemTable { - admissionPri = admissionpb.NormalPri - } - args := kvpb.RangeFeedRequest{ - Span: span, - Header: kvpb.Header{ - Timestamp: startAfter, - RangeID: desc.RangeID, - }, - WithDiff: withDiff, - AdmissionHeader: kvpb.AdmissionHeader{ - // NB: AdmissionHeader is used only at the start of the range feed - // stream since the initial catch-up scan is expensive. - Priority: int32(admissionPri), - CreateTime: timeutil.Now().UnixNano(), - Source: kvpb.AdmissionHeader_FROM_SQL, - NoMemoryReservedAtSource: true, - }, - } - - var latencyFn LatencyFunc - if ds.rpcContext != nil { - latencyFn = ds.rpcContext.RemoteClocks.Latency - } - replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, nil, AllExtantReplicas) - if err != nil { - return args.Timestamp, err - } - replicas.OptimizeReplicaOrder(ds.getNodeID(), latencyFn, ds.locality) - opts := SendOptions{class: connectionClass(&ds.st.SV)} - transport, err := ds.transportFactory(opts, ds.nodeDialer, replicas) + args := makeRangeFeedRequest(span, desc.RangeID, cfg.overSystemTable, startAfter, cfg.withDiff) + transport, err := newTransportForRange(ctx, desc, ds) if err != nil { return args.Timestamp, err } @@ -561,12 +668,11 @@ func (ds *DistSender) singleRangeFeed( // Indicate catchup scan is starting; Before potentially blocking on a semaphore, take // opportunity to update semaphore limit. - ds.metrics.RangefeedCatchupRanges.Inc(1) - catchupSem.SetLimit(maxConcurrentCatchupScans(&ds.st.SV)) - catchupRes, err := catchupSem.Begin(ctx) + catchupRes, err := acquireCatchupScanQuota(ctx, ds, catchupSem) if err != nil { return hlc.Timestamp{}, err } + finishCatchupScan := func() { if catchupRes != nil { catchupRes.Release() @@ -577,20 +683,7 @@ func (ds *DistSender) singleRangeFeed( // cleanup catchup reservation in case of early termination. defer finishCatchupScan() - stuckWatcher := newStuckRangeFeedCanceler(cancelFeed, func() time.Duration { - // Before the introduction of kv.rangefeed.range_stuck_threshold = 1m, - // clusters may already have kv.closed_timestamp.side_transport_interval set - // to >1m. This would cause rangefeeds to continually restart. We therefore - // conservatively use the highest value. - threshold := rangefeedRangeStuckThreshold.Get(&ds.st.SV) - if threshold > 0 { - if t := time.Duration(math.Round( - 1.2 * float64(closedts.SideTransportCloseInterval.Get(&ds.st.SV)))); t > threshold { - threshold = t - } - } - return threshold - }) + stuckWatcher := newStuckRangeFeedCanceler(cancelFeed, defaultStuckRangeThreshold(ds.st)) defer stuckWatcher.stop() var streamCleanup func() @@ -605,8 +698,7 @@ func (ds *DistSender) singleRangeFeed( for { stuckWatcher.stop() // if timer is running from previous iteration, stop it now if transport.IsExhausted() { - return args.Timestamp, newSendError( - fmt.Sprintf("sending to all %d replicas failed", len(replicas))) + return args.Timestamp, newSendError("sending to all replicas failed") } maybeCleanupStream() @@ -624,9 +716,9 @@ func (ds *DistSender) singleRangeFeed( ctx = logtags.AddTag(ctx, "dest_s", args.Replica.StoreID) ctx = logtags.AddTag(ctx, "dest_r", args.RangeID) ctx, restore := pprofutil.SetProfilerLabelsFromCtxTags(ctx) + streamCleanup = restore - var stream kvpb.RangeFeedEventProducer - stream, streamCleanup, err = streamProducerFactory(ctx, client, &args) + stream, err := client.RangeFeed(ctx, &args) if err != nil { restore() log.VErrEventf(ctx, 2, "RPC error: %s", err) @@ -636,13 +728,6 @@ func (ds *DistSender) singleRangeFeed( } continue } - { - origStreamCleanup := streamCleanup - streamCleanup = func() { - origStreamCleanup() - restore() - } - } var event *kvpb.RangeFeedEvent for { @@ -676,7 +761,6 @@ func (ds *DistSender) singleRangeFeed( // that timestamp. args.Timestamp.Forward(t.ResolvedTS) } - case *kvpb.RangeFeedSSTable: case *kvpb.RangeFeedError: log.VErrEventf(ctx, 2, "RangeFeedError: %s", t.Error.GoError()) if catchupRes != nil { @@ -709,22 +793,6 @@ func connectionClass(sv *settings.Values) rpc.ConnectionClass { return rpc.DefaultClass } -type rangeFeedEventProducerFactory func( - ctx context.Context, - client rpc.RestrictedInternalClient, - req *kvpb.RangeFeedRequest, -) (kvpb.RangeFeedEventProducer, func(), error) - -// legacyRangeFeedEventProducer is a rangeFeedEventProducerFactory using -// legacy RangeFeed RPC. -func legacyRangeFeedEventProducer( - ctx context.Context, client rpc.RestrictedInternalClient, req *kvpb.RangeFeedRequest, -) (producer kvpb.RangeFeedEventProducer, cleanup func(), err error) { - cleanup = func() {} - producer, err = client.RangeFeed(ctx, req) - return producer, cleanup, err -} - func (ds *DistSender) handleStuckEvent( args *kvpb.RangeFeedRequest, afterCatchupScan bool, threshold time.Duration, ) error { diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go index 480b8992ea28..f11cbf6c29d9 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache/rangecachemock" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvpb/kvpbmock" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" @@ -128,13 +129,22 @@ func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) { // returning a range descriptor and a client that immediately // cancels the context and closes the range feed stream. if spec.expectRetry { - rangeDB.EXPECT().FirstRange().Return(&desc, nil) + rangeDB.EXPECT().FirstRange().MinTimes(1).Return(&desc, nil) client := kvpbmock.NewMockInternalClient(ctrl) if useMuxRangeFeed { + recvCalled := make(chan struct{}) + sendCalled := make(chan struct{}) stream := kvpbmock.NewMockInternal_MuxRangeFeedClient(ctrl) - stream.EXPECT().Send(gomock.Any()).Return(nil) - stream.EXPECT().Recv().Do(cancel).Return(nil, io.EOF) + stream.EXPECT().Send(gomock.Any()).Do(func(r *kvpb.RangeFeedRequest) { + close(sendCalled) + <-recvCalled + cancel() + }).Return(nil) + stream.EXPECT().Recv().Do(func() { + close(recvCalled) + <-sendCalled + }).Return(nil, io.EOF) client.EXPECT().MuxRangeFeed(gomock.Any()).Return(stream, nil) } else { stream := kvpbmock.NewMockInternal_RangeFeedClient(ctrl) @@ -172,8 +182,7 @@ func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) { if useMuxRangeFeed { opts = append(opts, WithMuxRangeFeed()) } - err := ds.RangeFeed(ctx, []roachpb.Span{{Key: keys.MinKey, EndKey: keys.MaxKey}}, hlc.Timestamp{}, - false, nil, opts...) + err := ds.RangeFeed(ctx, []roachpb.Span{{Key: keys.MinKey, EndKey: keys.MaxKey}}, hlc.Timestamp{}, nil, opts...) require.Error(t, err) }) } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go index 3b6ecaf1369c..44954854ce0c 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go @@ -204,7 +204,7 @@ func rangeFeed( opts = append(opts, kvcoord.WithMuxRangeFeed()) ctx = context.WithValue(ctx, useMuxRangeFeedCtxKey{}, struct{}{}) } - return ds.RangeFeed(ctx, []roachpb.Span{sp}, startFrom, false, events, opts...) + return ds.RangeFeed(ctx, []roachpb.Span{sp}, startFrom, events, opts...) }) g.GoCtx(func(ctx context.Context) error { for { @@ -552,7 +552,7 @@ func TestRestartsStuckRangeFeedsSecondImplementation(t *testing.T) { g := ctxgroup.WithContext(ctx) g.GoCtx(func(ctx context.Context) error { defer close(events) - err := ds.RangeFeed(ctx, []roachpb.Span{sp}, startFrom, false, events) + err := ds.RangeFeed(ctx, []roachpb.Span{sp}, startFrom, events) t.Logf("from RangeFeed: %v", err) return err }) diff --git a/pkg/kv/kvclient/rangefeed/db_adapter.go b/pkg/kv/kvclient/rangefeed/db_adapter.go index 0a73d507b058..a2b0cb9dff0f 100644 --- a/pkg/kv/kvclient/rangefeed/db_adapter.go +++ b/pkg/kv/kvclient/rangefeed/db_adapter.go @@ -74,11 +74,10 @@ func (dbc *dbAdapter) RangeFeed( ctx context.Context, spans []roachpb.Span, startFrom hlc.Timestamp, - withDiff bool, eventC chan<- kvcoord.RangeFeedMessage, opts ...kvcoord.RangeFeedOption, ) error { - return dbc.distSender.RangeFeed(ctx, spans, startFrom, withDiff, eventC, opts...) + return dbc.distSender.RangeFeed(ctx, spans, startFrom, eventC, opts...) } // concurrentBoundAccount is a thread safe bound account. diff --git a/pkg/kv/kvclient/rangefeed/mocks_generated_test.go b/pkg/kv/kvclient/rangefeed/mocks_generated_test.go index 8c211dcb108d..2e6298b320b8 100644 --- a/pkg/kv/kvclient/rangefeed/mocks_generated_test.go +++ b/pkg/kv/kvclient/rangefeed/mocks_generated_test.go @@ -38,10 +38,10 @@ func (m *MockDB) EXPECT() *MockDBMockRecorder { } // RangeFeed mocks base method. -func (m *MockDB) RangeFeed(arg0 context.Context, arg1 []roachpb.Span, arg2 hlc.Timestamp, arg3 bool, arg4 chan<- kvcoord.RangeFeedMessage, arg5 ...kvcoord.RangeFeedOption) error { +func (m *MockDB) RangeFeed(arg0 context.Context, arg1 []roachpb.Span, arg2 hlc.Timestamp, arg3 chan<- kvcoord.RangeFeedMessage, arg4 ...kvcoord.RangeFeedOption) error { m.ctrl.T.Helper() - varargs := []interface{}{arg0, arg1, arg2, arg3, arg4} - for _, a := range arg5 { + varargs := []interface{}{arg0, arg1, arg2, arg3} + for _, a := range arg4 { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "RangeFeed", varargs...) @@ -50,9 +50,9 @@ func (m *MockDB) RangeFeed(arg0 context.Context, arg1 []roachpb.Span, arg2 hlc.T } // RangeFeed indicates an expected call of RangeFeed. -func (mr *MockDBMockRecorder) RangeFeed(arg0, arg1, arg2, arg3, arg4 interface{}, arg5 ...interface{}) *gomock.Call { +func (mr *MockDBMockRecorder) RangeFeed(arg0, arg1, arg2, arg3 interface{}, arg4 ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{arg0, arg1, arg2, arg3, arg4}, arg5...) + varargs := append([]interface{}{arg0, arg1, arg2, arg3}, arg4...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RangeFeed", reflect.TypeOf((*MockDB)(nil).RangeFeed), varargs...) } diff --git a/pkg/kv/kvclient/rangefeed/rangefeed.go b/pkg/kv/kvclient/rangefeed/rangefeed.go index 9253ee38d9b6..74d47558e30a 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed.go @@ -55,7 +55,6 @@ type DB interface { ctx context.Context, spans []roachpb.Span, startFrom hlc.Timestamp, - withDiff bool, eventC chan<- kvcoord.RangeFeedMessage, opts ...kvcoord.RangeFeedOption, ) error @@ -299,6 +298,9 @@ func (f *RangeFeed) run(ctx context.Context, frontier *span.Frontier) { if useMuxRangeFeed { rangefeedOpts = append(rangefeedOpts, kvcoord.WithMuxRangeFeed()) } + if f.withDiff { + rangefeedOpts = append(rangefeedOpts, kvcoord.WithDiff()) + } for i := 0; r.Next(); i++ { ts := frontier.Frontier() @@ -309,7 +311,7 @@ func (f *RangeFeed) run(ctx context.Context, frontier *span.Frontier) { start := timeutil.Now() rangeFeedTask := func(ctx context.Context) error { - return f.client.RangeFeed(ctx, f.spans, ts, f.withDiff, eventCh, rangefeedOpts...) + return f.client.RangeFeed(ctx, f.spans, ts, eventCh, rangefeedOpts...) } processEventsTask := func(ctx context.Context) error { return f.processEvents(ctx, frontier, eventCh) diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go index 54ae877fdd83..1f1e18b8d28f 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go @@ -36,7 +36,6 @@ type mockClient struct { ctx context.Context, spans []roachpb.Span, startFrom hlc.Timestamp, - withDiff bool, eventC chan<- kvcoord.RangeFeedMessage, ) error @@ -53,11 +52,10 @@ func (m *mockClient) RangeFeed( ctx context.Context, spans []roachpb.Span, startFrom hlc.Timestamp, - withDiff bool, eventC chan<- kvcoord.RangeFeedMessage, opts ...kvcoord.RangeFeedOption, ) error { - return m.rangefeed(ctx, spans, startFrom, withDiff, eventC) + return m.rangefeed(ctx, spans, startFrom, eventC) } func (m *mockClient) Scan( @@ -165,9 +163,8 @@ func TestRangeFeedMock(t *testing.T) { return nil }, rangefeed: func( - ctx context.Context, spans []roachpb.Span, startFrom hlc.Timestamp, withDiff bool, eventC chan<- kvcoord.RangeFeedMessage, + ctx context.Context, spans []roachpb.Span, startFrom hlc.Timestamp, eventC chan<- kvcoord.RangeFeedMessage, ) error { - assert.False(t, withDiff) // it was not set sendEvent := func(ts hlc.Timestamp) { eventC <- kvcoord.RangeFeedMessage{ RangeFeedEvent: &kvpb.RangeFeedEvent{ @@ -269,9 +266,8 @@ func TestRangeFeedMock(t *testing.T) { return nil }, rangefeed: func( - ctx context.Context, spans []roachpb.Span, startFrom hlc.Timestamp, withDiff bool, eventC chan<- kvcoord.RangeFeedMessage, + ctx context.Context, spans []roachpb.Span, startFrom hlc.Timestamp, eventC chan<- kvcoord.RangeFeedMessage, ) error { - assert.True(t, withDiff) eventC <- kvcoord.RangeFeedMessage{ RangeFeedEvent: &kvpb.RangeFeedEvent{ Val: &kvpb.RangeFeedValue{ @@ -362,11 +358,11 @@ func TestBackoffOnRangefeedFailure(t *testing.T) { // Make sure rangefeed is retried even after 3 failures, then succeed and cancel context // (which signals the rangefeed to shut down gracefully). - db.EXPECT().RangeFeed(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + db.EXPECT().RangeFeed(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Times(3). Return(errors.New("rangefeed failed")) - db.EXPECT().RangeFeed(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - Do(func(context.Context, []roachpb.Span, hlc.Timestamp, bool, chan<- kvcoord.RangeFeedMessage, ...kvcoord.RangeFeedOption) { + db.EXPECT().RangeFeed(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Do(func(context.Context, []roachpb.Span, hlc.Timestamp, chan<- kvcoord.RangeFeedMessage, ...kvcoord.RangeFeedOption) { cancel() }). Return(nil) diff --git a/pkg/kv/kvnemesis/watcher.go b/pkg/kv/kvnemesis/watcher.go index d301b3a30868..c3ac5d532d4d 100644 --- a/pkg/kv/kvnemesis/watcher.go +++ b/pkg/kv/kvnemesis/watcher.go @@ -87,7 +87,7 @@ func Watch(ctx context.Context, env *Env, dbs []*kv.DB, dataSpan roachpb.Span) ( w.mu.Unlock() ds := dss[i] - err := ds.RangeFeed(ctx, []roachpb.Span{dataSpan}, ts, true /* withDiff */, eventC) + err := ds.RangeFeed(ctx, []roachpb.Span{dataSpan}, ts, eventC, kvcoord.WithDiff()) if isRetryableRangeFeedErr(err) { log.Infof(ctx, "got retryable RangeFeed error: %+v", err) continue diff --git a/pkg/kv/kvserver/client_rangefeed_test.go b/pkg/kv/kvserver/client_rangefeed_test.go index fcd8efa455f2..b4e068695a0c 100644 --- a/pkg/kv/kvserver/client_rangefeed_test.go +++ b/pkg/kv/kvserver/client_rangefeed_test.go @@ -77,7 +77,7 @@ func TestRangefeedWorksOnSystemRangesUnconditionally(t *testing.T) { rangefeedErrChan := make(chan error, 1) ctxToCancel, cancel := context.WithCancel(ctx) go func() { - rangefeedErrChan <- ds.RangeFeed(ctxToCancel, []roachpb.Span{descTableSpan}, startTS, false /* withDiff */, evChan) + rangefeedErrChan <- ds.RangeFeed(ctxToCancel, []roachpb.Span{descTableSpan}, startTS, evChan) }() // Note: 42 is a system descriptor. @@ -137,7 +137,7 @@ func TestRangefeedWorksOnSystemRangesUnconditionally(t *testing.T) { }) evChan := make(chan kvcoord.RangeFeedMessage) require.Regexp(t, `rangefeeds require the kv\.rangefeed.enabled setting`, - ds.RangeFeed(ctx, []roachpb.Span{scratchSpan}, startTS, false /* withDiff */, evChan)) + ds.RangeFeed(ctx, []roachpb.Span{scratchSpan}, startTS, evChan)) }) } @@ -191,7 +191,6 @@ func TestMergeOfRangeEventTableWhileRunningRangefeed(t *testing.T) { rangefeedErrChan <- ds.RangeFeed(rangefeedCtx, []roachpb.Span{lhsRepl.Desc().RSpan().AsRawSpanWithNoLocals()}, start, - false, /* withDiff */ eventCh) }() @@ -256,7 +255,6 @@ func TestRangefeedIsRoutedToNonVoter(t *testing.T) { rangefeedCtx, []roachpb.Span{desc.RSpan().AsRawSpanWithNoLocals()}, startTS, - false, /* withDiff */ eventCh, ) }() @@ -310,7 +308,7 @@ func TestRangefeedWorksOnLivenessRange(t *testing.T) { eventC := make(chan kvcoord.RangeFeedMessage) errC := make(chan error, 1) go func() { - errC <- ds.RangeFeed(ctx, []roachpb.Span{keys.NodeLivenessSpan}, startTS, false /* withDiff */, eventC) + errC <- ds.RangeFeed(ctx, []roachpb.Span{keys.NodeLivenessSpan}, startTS, eventC) }() // Wait for a liveness update. diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index 316f6c950181..123c55eac771 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -1160,7 +1160,7 @@ func TestReplicaRangefeedPushesTransactions(t *testing.T) { span := roachpb.Span{ Key: desc.StartKey.AsRawKey(), EndKey: desc.EndKey.AsRawKey(), } - rangeFeedErrC <- ds.RangeFeed(rangeFeedCtx, []roachpb.Span{span}, ts1, false /* withDiff */, rangeFeedCh) + rangeFeedErrC <- ds.RangeFeed(rangeFeedCtx, []roachpb.Span{span}, ts1, rangeFeedCh) }() } @@ -1309,7 +1309,7 @@ func TestRangefeedCheckpointsRecoverFromLeaseExpiration(t *testing.T) { span := roachpb.Span{ Key: desc.StartKey.AsRawKey(), EndKey: desc.EndKey.AsRawKey(), } - rangeFeedErrC <- ds.RangeFeed(rangeFeedCtx, []roachpb.Span{span}, ts1, false /* withDiff */, rangeFeedCh) + rangeFeedErrC <- ds.RangeFeed(rangeFeedCtx, []roachpb.Span{span}, ts1, rangeFeedCh) }() // Wait for a checkpoint above ts. @@ -1490,7 +1490,7 @@ func TestNewRangefeedForceLeaseRetry(t *testing.T) { } startRangefeed := func() { span := rangefeedSpan - rangeFeedErrC <- ds.RangeFeed(rangeFeedCtx, []roachpb.Span{span}, ts1, false /* withDiff */, rangeFeedCh) + rangeFeedErrC <- ds.RangeFeed(rangeFeedCtx, []roachpb.Span{span}, ts1, rangeFeedCh) } // Wait for a checkpoint above ts.