Skip to content

Commit

Permalink
kvserver: self delegated snapshots
Browse files Browse the repository at this point in the history
This commit adds a new rpc stream for sending raft message requests between
replicas which allows for delegating snapshots. Currently this patch implements
the leaseholder delegating to itself, but in future patches the leaseholder
will be able to delegate snapshot sending to follower replicas. A new message
request type of `DelegatedSnapshotRequest` includes a `SnapshotRequest` and the
replica descriptor of the new sender replica. This allows the leaseholder to
fill in some snapshot metadata before delegating to the new sender store to
generate the snapshot and transmit it to the recipient.

Fixes: cockroachdb#42491
Release note: None

Release justification:
  • Loading branch information
amygao9 committed Mar 18, 2022
1 parent 0e53a5c commit 83942c8
Show file tree
Hide file tree
Showing 13 changed files with 505 additions and 44 deletions.
12 changes: 12 additions & 0 deletions pkg/kv/kvserver/client_raft_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,18 @@ func (h *testClusterStoreRaftMessageHandler) HandleSnapshot(
return store.HandleSnapshot(ctx, header, respStream)
}

func (h *testClusterStoreRaftMessageHandler) SendDelegatedSnapshot(
ctx context.Context,
req *kvserverpb.DelegatedSnapshotRequest,
respStream kvserver.DelegateSnapshotResponseStream,
) error {
store, err := h.getStore()
if err != nil {
return err
}
return store.SendDelegatedSnapshot(ctx, req, respStream)
}

// testClusterPartitionedRange is a convenient abstraction to create a range on a node
// in a multiTestContext which can be partitioned and unpartitioned.
type testClusterPartitionedRange struct {
Expand Down
14 changes: 12 additions & 2 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3452,6 +3452,14 @@ func (errorChannelTestHandler) HandleSnapshot(
panic("unimplemented")
}

func (errorChannelTestHandler) SendDelegatedSnapshot(
_ context.Context,
req *kvserverpb.DelegatedSnapshotRequest,
respStream kvserver.DelegateSnapshotResponseStream,
) error {
panic("unimplemented")
}

// This test simulates a scenario where one replica has been removed from the
// range's Raft group but it is unaware of the fact. We check that this replica
// coming back from the dead cannot cause elections.
Expand All @@ -3460,10 +3468,12 @@ func TestReplicateRemovedNodeDisruptiveElection(t *testing.T) {
defer log.Scope(t).Close(t)

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 4,
tc := testcluster.StartTestCluster(
t, 4,
base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
})
},
)
defer tc.Stopper().Stop(ctx)

// Move the first range from the first node to the other three.
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/kvserverpb/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ message SnapshotRequest {
reserved 3;
}

message DelegatedSnapshotRequest {
SnapshotRequest snapRequest = 1;
roachpb.ReplicaDescriptor delegated_sender = 2 [(gogoproto.nullable) = false];
}

message SnapshotResponse {
enum Status {
UNKNOWN = 0;
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/raft_snapshot_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (rq *raftSnapshotQueue) processRaftSnapshot(
}
}

err := repl.sendSnapshot(ctx, repDesc, snapType, kvserverpb.SnapshotRequest_RECOVERY)
err := repl.sendDelegate(ctx, repDesc, snapType, kvserverpb.SnapshotRequest_RECOVERY)

// NB: if the snapshot fails because of an overlapping replica on the
// recipient which is also waiting for a snapshot, the "smart" thing is to
Expand Down
115 changes: 114 additions & 1 deletion pkg/kv/kvserver/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ type SnapshotResponseStream interface {
Recv() (*kvserverpb.SnapshotRequest, error)
}

// DelegateSnapshotResponseStream is the subset of the
// MultiRaft_RaftSnapshotServer interface that is needed for sending delegated responses.
type DelegateSnapshotResponseStream interface {
Send(request *kvserverpb.SnapshotResponse) error
Recv() (*kvserverpb.DelegatedSnapshotRequest, error)
}

// RaftMessageHandler is the interface that must be implemented by
// arguments to RaftTransport.Listen.
type RaftMessageHandler interface {
Expand All @@ -112,7 +119,8 @@ type RaftMessageHandler interface {
// If an error is encountered during asynchronous processing, it will be
// streamed back to the sender of the message as a RaftMessageResponse.
HandleRaftRequest(ctx context.Context, req *kvserverpb.RaftMessageRequest,
respStream RaftMessageResponseStream) *roachpb.Error
respStream RaftMessageResponseStream,
) *roachpb.Error

// HandleRaftResponse is called for each raft response. Note that
// not all messages receive a response. An error is returned if and only if
Expand All @@ -122,6 +130,13 @@ type RaftMessageHandler interface {
// HandleSnapshot is called for each new incoming snapshot stream, after
// parsing the initial SnapshotRequest_Header on the stream.
HandleSnapshot(ctx context.Context, header *kvserverpb.SnapshotRequest_Header, respStream SnapshotResponseStream) error

// SendDelegatedSnapshot is called for each incoming delegated snapshot
// request.
SendDelegatedSnapshot(
ctx context.Context, req *kvserverpb.DelegatedSnapshotRequest,
respStream DelegateSnapshotResponseStream,
) error
}

type raftTransportStats struct {
Expand Down Expand Up @@ -406,6 +421,78 @@ func (t *RaftTransport) RaftMessageBatch(stream MultiRaft_RaftMessageBatchServer
}
}

// DelegateRaftSnapshot handles incoming delegated snapshot requests and parses
// the request to pass off to the new sender store.
func (t *RaftTransport) DelegateRaftSnapshot(stream MultiRaft_DelegateRaftSnapshotServer) error {
errCh := make(chan error, 1)
taskCtx, cancel := t.stopper.WithCancelOnQuiesce(stream.Context())
defer cancel()
if err := t.stopper.RunAsyncTaskEx(
taskCtx,
stop.TaskOpts{
TaskName: "storage.RaftTransport: processing snapshot",
SpanOpt: stop.ChildSpan,
}, func(ctx context.Context) {
errCh <- func() error {
req, err := stream.Recv()
if err != nil {
return err
}
// check to ensure the header is valid.
if req.SnapRequest.Header == nil {
return stream.Send(
&kvserverpb.SnapshotResponse{
Status: kvserverpb.SnapshotResponse_ERROR,
Message: "client error: no header in first snapshot request message",
},
)
}
// get the handler of the new sender store.
handler, ok := t.getHandler(req.DelegatedSender.StoreID)
if !ok {
log.Warningf(
ctx, "unable to accept Raft message from leaseholder: %+v: no handler registered for"+
" new sender store"+
" %+v",
req.SnapRequest.Header.RaftMessageRequest.FromReplica.StoreID,
req.DelegatedSender.StoreID,
)
return roachpb.NewStoreNotFoundError(req.DelegatedSender.StoreID)
}
// acknowledge to the leaseholder that the request has been accepted.
if err := stream.Send(&kvserverpb.SnapshotResponse{Status: kvserverpb.SnapshotResponse_ACCEPTED}); err != nil {
return err
}
// pass off the request to the new sender store.
err = handler.SendDelegatedSnapshot(ctx, req, stream)
if err != nil {
log.Infof(ctx, "error: %v", err)
return stream.Send(
&kvserverpb.SnapshotResponse{
Status: kvserverpb.SnapshotResponse_ERROR,
Message: err.Error(),
},
)
}
return stream.Send(
&kvserverpb.SnapshotResponse{
Status: kvserverpb.SnapshotResponse_APPLIED,
Message: "accepted!",
},
)
}()
},
); err != nil {
return err
}
select {
case <-t.stopper.ShouldQuiesce():
return nil
case err := <-errCh:
return err
}
}

// RaftSnapshot handles incoming streaming snapshot requests.
func (t *RaftTransport) RaftSnapshot(stream MultiRaft_RaftSnapshotServer) error {
errCh := make(chan error, 1)
Expand Down Expand Up @@ -705,7 +792,33 @@ func (t *RaftTransport) SendSnapshot(
log.Warningf(ctx, "failed to close snapshot stream: %+v", err)
}
}()

return sendSnapshot(
ctx, t.st, stream, storePool, header, snap, newBatch, sent,
)
}

// SendDelegatedSnapshot creates a rpc stream between the leaseholder and the
// new sender for delegated snapshot requests.
func (t *RaftTransport) SendDelegatedSnapshot(
ctx context.Context, storePool *StorePool, req *kvserverpb.DelegatedSnapshotRequest,
) error {
nodeID := req.DelegatedSender.NodeID
conn, err := t.dialer.Dial(ctx, nodeID, rpc.DefaultClass)
if err != nil {
return err
}
client := NewMultiRaftClient(conn)

// creates rpc stream between leaseholder and the new sender.
stream, err := client.DelegateRaftSnapshot(ctx)
if err != nil {
return err
}
defer func() {
if err := stream.CloseSend(); err != nil {
log.Warningf(ctx, "failed to close snapshot stream: %+v", err)
}
}()
return sendDelegatedSnapshot(ctx, stream, storePool, req)
}
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/raft_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ func (s channelServer) HandleSnapshot(
panic("unexpected HandleSnapshot")
}

func (s channelServer) SendDelegatedSnapshot(
_ context.Context,
header *kvserverpb.DelegatedSnapshotRequest,
stream kvserver.DelegateSnapshotResponseStream,
) error {
panic("unimplemented")
}

// raftTransportTestContext contains objects needed to test RaftTransport.
// Typical usage will add multiple nodes with AddNode, attach channels
// to at least one store with ListenStore, and send messages with Send.
Expand Down
Loading

0 comments on commit 83942c8

Please sign in to comment.