From b4d46e5cdecfa4589e8796360acbf346a88fa5c3 Mon Sep 17 00:00:00 2001 From: Tamir Duberstein Date: Thu, 16 Jul 2015 14:24:30 -0400 Subject: [PATCH 1/7] addReadOnlyCmd --- storage/range.go | 52 +++++++++++++++++++------------------------ storage/range_test.go | 20 ++++++++--------- 2 files changed, 32 insertions(+), 40 deletions(-) diff --git a/storage/range.go b/storage/range.go index 4fe7b30f0a91..dbdd59454198 100644 --- a/storage/range.go +++ b/storage/range.go @@ -507,19 +507,29 @@ func (r *Range) SetLastVerificationTimestamp(timestamp proto.Timestamp) error { // either along the read-only execution path or the read-write Raft // command queue. func (r *Range) AddCmd(ctx context.Context, call proto.Call) error { - args, reply := call.Args, call.Reply + args, cReply := call.Args, call.Reply // TODO(tschottdorf) Some (internal) requests go here directly, so they // won't be traced. trace := tracer.FromCtx(ctx) // Differentiate between admin, read-only and read-write. if proto.IsAdmin(args) { defer trace.Epoch("admin path")() - return r.addAdminCmd(ctx, args, reply) + return r.addAdminCmd(ctx, args, cReply) } else if proto.IsReadOnly(args) { defer trace.Epoch("read path")() - return r.addReadOnlyCmd(ctx, args, reply) + reply, err := r.addReadOnlyCmd(ctx, args) + if reply != nil { + gogoproto.Merge(cReply, reply) + } + if err != nil { + if cReply.Header().Error != nil { + panic("the world is on fire") + } + cReply.Header().SetGoError(err) + } + return err } - return r.addWriteCmd(ctx, args, reply, nil) + return r.addWriteCmd(ctx, args, cReply, nil) } func (r *Range) checkCmdHeader(header *proto.RequestHeader) error { @@ -595,38 +605,29 @@ func (r *Range) addAdminCmd(ctx context.Context, args proto.Request, reply proto // addReadOnlyCmd updates the read timestamp cache and waits for any // overlapping writes currently processing through Raft ahead of us to // clear via the read queue. -func (r *Range) addReadOnlyCmd(ctx context.Context, args proto.Request, reply proto.Response) error { +func (r *Range) addReadOnlyCmd(ctx context.Context, args proto.Request) (proto.Response, error) { header := args.Header() if err := r.checkCmdHeader(header); err != nil { - reply.Header().SetGoError(err) - return err + return nil, err } // If read-consistency is set to INCONSISTENT, run directly. if header.ReadConsistency == proto.INCONSISTENT { // But disallow any inconsistent reads within txns. if header.Txn != nil { - reply.Header().SetGoError(util.Error("cannot allow inconsistent reads within a transaction")) - return reply.Header().GoError() + return nil, util.Error("cannot allow inconsistent reads within a transaction") } if header.Timestamp.Equal(proto.ZeroTimestamp) { header.Timestamp = r.rm.Clock().Now() } - executedReply, intents, err := r.executeCmd(r.rm.Engine(), nil, args) - if executedReply.Header().Error != nil { - panic("the world is on fire") - } - executedReply.Header().SetGoError(err) - reply.Reset() - gogoproto.Merge(reply, executedReply) + reply, intents, err := r.executeCmd(r.rm.Engine(), nil, args) if err == nil { r.handleSkippedIntents(args, intents) } - return err + return reply, err } else if header.ReadConsistency == proto.CONSENSUS { - reply.Header().SetGoError(util.Error("consensus reads not implemented")) - return reply.Header().GoError() + return nil, util.Error("consensus reads not implemented") } // Add the read to the command queue to gate subsequent @@ -636,18 +637,11 @@ func (r *Range) addReadOnlyCmd(ctx context.Context, args proto.Request, reply pr // This replica must have leader lease to process a consistent read. if err := r.redirectOnOrAcquireLeaderLease(tracer.FromCtx(ctx), header.Timestamp); err != nil { r.endCmd(cmdKey, args, err, true /* readOnly */) - reply.Header().SetGoError(err) - return err + return nil, err } // Execute read-only command. - executedReply, intents, err := r.executeCmd(r.rm.Engine(), nil, args) - if executedReply.Header().Error != nil { - panic("the world is on fire") - } - executedReply.Header().SetGoError(err) - reply.Reset() - gogoproto.Merge(reply, executedReply) + reply, intents, err := r.executeCmd(r.rm.Engine(), nil, args) // Only update the timestamp cache if the command succeeded. r.endCmd(cmdKey, args, err, true /* readOnly */) @@ -655,7 +649,7 @@ func (r *Range) addReadOnlyCmd(ctx context.Context, args proto.Request, reply pr if err == nil { r.handleSkippedIntents(args, intents) } - return err + return reply, err } // addWriteCmd first adds the keys affected by this command as pending writes diff --git a/storage/range_test.go b/storage/range_test.go index e92cff4b2aec..9f10d143655f 100644 --- a/storage/range_test.go +++ b/storage/range_test.go @@ -303,21 +303,19 @@ func TestRangeReadConsistency(t *testing.T) { tc.Start(t) defer tc.Stop() - gArgs, gReply := getArgs(proto.Key("a"), 1, tc.store.StoreID()) + gArgs, _ := getArgs(proto.Key("a"), 1, tc.store.StoreID()) gArgs.Timestamp = tc.clock.Now() // Try consistent read and verify success. - if err := tc.rng.AddCmd(tc.rng.context(), proto.Call{Args: gArgs, Reply: gReply}); err != nil { - + if err := tc.rng.AddCmd(tc.rng.context(), proto.Call{Args: gArgs, Reply: gArgs.CreateReply()}); err != nil { t.Errorf("expected success on consistent read: %s", err) } // Try a consensus read and verify error. gArgs.ReadConsistency = proto.CONSENSUS - if err := tc.rng.AddCmd(tc.rng.context(), proto.Call{Args: gArgs, Reply: gReply}); err == nil { - + if err := tc.rng.AddCmd(tc.rng.context(), proto.Call{Args: gArgs, Reply: gArgs.CreateReply()}); err == nil { t.Errorf("expected error on consensus read") } @@ -325,8 +323,7 @@ func TestRangeReadConsistency(t *testing.T) { gArgs.ReadConsistency = proto.INCONSISTENT gArgs.Txn = newTransaction("test", proto.Key("a"), 1, proto.SERIALIZABLE, tc.clock) - if err := tc.rng.AddCmd(tc.rng.context(), proto.Call{Args: gArgs, Reply: gReply}); err == nil { - + if err := tc.rng.AddCmd(tc.rng.context(), proto.Call{Args: gArgs, Reply: gArgs.CreateReply()}); err == nil { t.Errorf("expected error on inconsistent read within a txn") } @@ -342,16 +339,14 @@ func TestRangeReadConsistency(t *testing.T) { gArgs.ReadConsistency = proto.CONSISTENT gArgs.Txn = nil - err := tc.rng.AddCmd(tc.rng.context(), proto.Call{Args: gArgs, Reply: gReply}) - + err := tc.rng.AddCmd(tc.rng.context(), proto.Call{Args: gArgs, Reply: gArgs.CreateReply()}) if _, ok := err.(*proto.NotLeaderError); !ok { t.Errorf("expected not leader error; got %s", err) } gArgs.ReadConsistency = proto.INCONSISTENT - if err := tc.rng.AddCmd(tc.rng.context(), proto.Call{Args: gArgs, Reply: gReply}); err != nil { - + if err := tc.rng.AddCmd(tc.rng.context(), proto.Call{Args: gArgs, Reply: gArgs.CreateReply()}); err != nil { t.Errorf("expected success reading with inconsistent: %s", err) } } @@ -2648,6 +2643,7 @@ func TestRangeDanglingMetaIntent(t *testing.T) { rlArgs.Key = keys.RangeMetaKey(proto.Key("A")) rlArgs.Timestamp = proto.ZeroTimestamp + rlReply.Reset() err = tc.rng.AddCmd(tc.rng.context(), proto.Call{Args: rlArgs, Reply: rlReply}) if err != nil { @@ -2659,6 +2655,7 @@ func TestRangeDanglingMetaIntent(t *testing.T) { // Switch to consistent lookups, which should run into the intent. rlArgs.ReadConsistency = proto.CONSISTENT + rlReply.Reset() err = tc.rng.AddCmd(tc.rng.context(), proto.Call{Args: rlArgs, Reply: rlReply}) if _, ok := err.(*proto.WriteIntentError); !ok { t.Fatalf("expected WriteIntentError, not %s", err) @@ -2667,6 +2664,7 @@ func TestRangeDanglingMetaIntent(t *testing.T) { // Try 100 lookups with IgnoreIntents. Expect to see each descriptor at least once. // First, try this consistently, which should not be allowed. rlArgs.IgnoreIntents = true + rlReply.Reset() err = tc.rng.AddCmd(tc.rng.context(), proto.Call{Args: rlArgs, Reply: rlReply}) if !testutils.IsError(err, "can not read consistently and skip intents") { t.Fatalf("wanted specific error, not %s", err) From 0451b79148b31e471202f6996c1a729f268c5dd7 Mon Sep 17 00:00:00 2001 From: Tamir Duberstein Date: Thu, 16 Jul 2015 14:51:31 -0400 Subject: [PATCH 2/7] addAdminCmd --- storage/client_merge_test.go | 10 +++---- storage/client_raft_test.go | 15 ++++------ storage/range.go | 53 ++++++++++++++++++++---------------- storage/range_command.go | 51 +++++++++++++++++----------------- 4 files changed, 65 insertions(+), 64 deletions(-) diff --git a/storage/client_merge_test.go b/storage/client_merge_test.go index f48aecae8aca..c1f1ccc1c135 100644 --- a/storage/client_merge_test.go +++ b/storage/client_merge_test.go @@ -20,7 +20,6 @@ package storage_test import ( "bytes" "reflect" - "strings" "testing" "golang.org/x/net/context" @@ -29,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/proto" "github.com/cockroachdb/cockroach/storage" "github.com/cockroachdb/cockroach/storage/engine" + "github.com/cockroachdb/cockroach/testutils" "github.com/cockroachdb/cockroach/util/leaktest" "github.com/cockroachdb/cockroach/util/log" ) @@ -253,11 +253,9 @@ func TestStoreRangeMergeNonConsecutive(t *testing.T) { t.Fatal(err) } - argsMerge, replyMerge := adminMergeArgs(rangeA.Desc().StartKey, 1, store.StoreID()) - rangeA.AdminMerge(argsMerge, replyMerge) - if replyMerge.Error == nil || - !strings.Contains(replyMerge.Error.Error(), "ranges not collocated") { - t.Fatalf("did not got expected error; got %s", replyMerge.Error) + argsMerge, _ := adminMergeArgs(rangeA.Desc().StartKey, 1, store.StoreID()) + if _, err := rangeA.AdminMerge(argsMerge); !testutils.IsError(err, "ranges not collocated") { + t.Fatalf("did not got expected error; got %s", err) } // Re-add the range. This is necessary for a clean shutdown. diff --git a/storage/client_raft_test.go b/storage/client_raft_test.go index b2bd9c22a105..4bb5e4e84481 100644 --- a/storage/client_raft_test.go +++ b/storage/client_raft_test.go @@ -788,11 +788,10 @@ func TestRangeDescriptorSnapshotRace(t *testing.T) { if rng == nil { t.Fatal("failed to look up min range") } - args, reply := adminSplitArgs(proto.KeyMin, []byte(fmt.Sprintf("A%03d", i)), rng.Desc().RaftID, + args, _ := adminSplitArgs(proto.KeyMin, []byte(fmt.Sprintf("A%03d", i)), rng.Desc().RaftID, mtc.stores[0].StoreID()) - rng.AdminSplit(args, reply) - if reply.GoError() != nil { - t.Fatal(reply.GoError()) + if _, err := rng.AdminSplit(args); err != nil { + t.Fatal(err) } } @@ -802,11 +801,9 @@ func TestRangeDescriptorSnapshotRace(t *testing.T) { if rng == nil { t.Fatal("failed to look up max range") } - args, reply := adminSplitArgs(proto.KeyMin, []byte(fmt.Sprintf("B%03d", i)), rng.Desc().RaftID, - mtc.stores[0].StoreID()) - rng.AdminSplit(args, reply) - if reply.GoError() != nil { - t.Fatal(reply.GoError()) + args, _ := adminSplitArgs(proto.KeyMin, []byte(fmt.Sprintf("B%03d", i)), rng.Desc().RaftID, mtc.stores[0].StoreID()) + if _, err := rng.AdminSplit(args); err != nil { + t.Fatal(err) } } } diff --git a/storage/range.go b/storage/range.go index dbdd59454198..0d8a03d9eeef 100644 --- a/storage/range.go +++ b/storage/range.go @@ -507,29 +507,37 @@ func (r *Range) SetLastVerificationTimestamp(timestamp proto.Timestamp) error { // either along the read-only execution path or the read-write Raft // command queue. func (r *Range) AddCmd(ctx context.Context, call proto.Call) error { - args, cReply := call.Args, call.Reply + args := call.Args // TODO(tschottdorf) Some (internal) requests go here directly, so they // won't be traced. trace := tracer.FromCtx(ctx) // Differentiate between admin, read-only and read-write. + var reply proto.Response + var err error if proto.IsAdmin(args) { defer trace.Epoch("admin path")() - return r.addAdminCmd(ctx, args, cReply) + reply, err = r.addAdminCmd(ctx, args) } else if proto.IsReadOnly(args) { defer trace.Epoch("read path")() - reply, err := r.addReadOnlyCmd(ctx, args) - if reply != nil { - gogoproto.Merge(cReply, reply) - } - if err != nil { - if cReply.Header().Error != nil { - panic("the world is on fire") - } - cReply.Header().SetGoError(err) + reply, err = r.addReadOnlyCmd(ctx, args) + } else if proto.IsWrite(args) { + return r.addWriteCmd(ctx, args, call.Reply, nil) + } else { + panic(fmt.Sprintf("don't know how to handle command %T", args)) + } + + if reply != nil { + gogoproto.Merge(call.Reply, reply) + } + + if err != nil { + if call.Reply.Header().Error != nil { + panic("the world is on fire") } - return err + call.Reply.Header().SetGoError(err) } - return r.addWriteCmd(ctx, args, cReply, nil) + + return err } func (r *Range) checkCmdHeader(header *proto.RequestHeader) error { @@ -577,29 +585,28 @@ func (r *Range) endCmd(cmdKey interface{}, args proto.Request, err error, readOn // with the command queue or the timestamp cache, as admin commands // are not meant to consistently access or modify the underlying data. // Admin commands must run on the leader replica. -func (r *Range) addAdminCmd(ctx context.Context, args proto.Request, reply proto.Response) error { +func (r *Range) addAdminCmd(ctx context.Context, args proto.Request) (proto.Response, error) { header := args.Header() if err := r.checkCmdHeader(header); err != nil { - reply.Header().SetGoError(err) - return err + return nil, err } // Admin commands always require the leader lease. if err := r.redirectOnOrAcquireLeaderLease(tracer.FromCtx(ctx), header.Timestamp); err != nil { - reply.Header().SetGoError(err) - return err + return nil, err } - switch args.(type) { + switch tArgs := args.(type) { case *proto.AdminSplitRequest: - r.AdminSplit(args.(*proto.AdminSplitRequest), reply.(*proto.AdminSplitResponse)) + resp, err := r.AdminSplit(tArgs) + return &resp, err case *proto.AdminMergeRequest: - r.AdminMerge(args.(*proto.AdminMergeRequest), reply.(*proto.AdminMergeResponse)) + resp, err := r.AdminMerge(tArgs) + return &resp, err default: - return util.Error("unrecognized admin command") + return nil, util.Error("unrecognized admin command") } - return reply.Header().GoError() } // addReadOnlyCmd updates the read timestamp cache and waits for any diff --git a/storage/range_command.go b/storage/range_command.go index 5bf61db3a667..8d8438be0ca8 100644 --- a/storage/range_command.go +++ b/storage/range_command.go @@ -836,7 +836,9 @@ func (r *Range) InternalLeaderLease(batch engine.Engine, ms *engine.MVCCStats, a // updates the range addressing metadata. The handover of responsibility for // the reassigned key range is carried out seamlessly through a split trigger // carried out as part of the commit of that transaction. -func (r *Range) AdminSplit(args *proto.AdminSplitRequest, reply *proto.AdminSplitResponse) { +func (r *Range) AdminSplit(args *proto.AdminSplitRequest) (proto.AdminSplitResponse, error) { + var reply proto.AdminSplitResponse + // Only allow a single split per range at a time. r.metaLock.Lock() defer r.metaLock.Unlock() @@ -849,34 +851,30 @@ func (r *Range) AdminSplit(args *proto.AdminSplitRequest, reply *proto.AdminSpli if len(splitKey) == 0 { snap := r.rm.NewSnapshot() defer snap.Close() - var err error - if splitKey, err = engine.MVCCFindSplitKey(snap, desc.RaftID, desc.StartKey, desc.EndKey); err != nil { - reply.SetGoError(util.Errorf("unable to determine split key: %s", err)) - return + foundSplitKey, err := engine.MVCCFindSplitKey(snap, desc.RaftID, desc.StartKey, desc.EndKey) + if err != nil { + return reply, util.Errorf("unable to determine split key: %s", err) } + splitKey = foundSplitKey } // First verify this condition so that it will not return // proto.NewRangeKeyMismatchError if splitKey equals to desc.EndKey, // otherwise it will cause infinite retry loop. if splitKey.Equal(desc.StartKey) || splitKey.Equal(desc.EndKey) { - reply.SetGoError(util.Errorf("range is already split at key %s", splitKey)) - return + return reply, util.Errorf("range is already split at key %s", splitKey) } // Verify some properties of split key. if !r.ContainsKey(splitKey) { - reply.SetGoError(proto.NewRangeKeyMismatchError(splitKey, splitKey, desc)) - return + return reply, proto.NewRangeKeyMismatchError(splitKey, splitKey, desc) } if !engine.IsValidSplitKey(splitKey) { - reply.SetGoError(util.Errorf("cannot split range at key %s", splitKey)) - return + return reply, util.Errorf("cannot split range at key %s", splitKey) } // Create new range descriptor with newly-allocated replica IDs and Raft IDs. newDesc, err := r.rm.NewRangeDescriptor(splitKey, desc.EndKey, desc.Replicas) if err != nil { - reply.SetGoError(util.Errorf("unable to allocate new range descriptor: %s", err)) - return + return reply, util.Errorf("unable to allocate new range descriptor: %s", err) } // Init updated version of existing range descriptor. @@ -885,7 +883,7 @@ func (r *Range) AdminSplit(args *proto.AdminSplitRequest, reply *proto.AdminSpli log.Infof("initiating a split of %s at key %s", r, splitKey) - if err = r.rm.DB().Txn(func(txn *client.Txn) error { + if err := r.rm.DB().Txn(func(txn *client.Txn) error { // Create range descriptor for second half of split. // Note that this put must go first in order to locate the // transaction record on the correct range. @@ -929,8 +927,10 @@ func (r *Range) AdminSplit(args *proto.AdminSplitRequest, reply *proto.AdminSpli }) return txn.Run(b) }); err != nil { - reply.SetGoError(util.Errorf("split at key %s failed: %s", splitKey, err)) + return reply, util.Errorf("split at key %s failed: %s", splitKey, err) } + + return reply, nil } // splitTrigger is called on a successful commit of an AdminSplit @@ -1024,7 +1024,9 @@ func (r *Range) splitTrigger(batch engine.Engine, split *proto.SplitTrigger) err // the reassigned key range is carried out seamlessly through a merge trigger // carried out as part of the commit of that transaction. // A merge requires that the two ranges are collocate on the same set of replicas. -func (r *Range) AdminMerge(args *proto.AdminMergeRequest, reply *proto.AdminMergeResponse) { +func (r *Range) AdminMerge(args *proto.AdminMergeRequest) (proto.AdminMergeResponse, error) { + var reply proto.AdminMergeResponse + // Only allow a single split/merge per range at a time. r.metaLock.Lock() defer r.metaLock.Unlock() @@ -1033,27 +1035,23 @@ func (r *Range) AdminMerge(args *proto.AdminMergeRequest, reply *proto.AdminMerg desc := r.Desc() if desc.EndKey.Equal(proto.KeyMax) { // Noop. - return + return reply, nil } subsumedRng := r.rm.LookupRange(desc.EndKey, nil) if subsumedRng == nil { - reply.SetGoError(util.Errorf("ranges not collocated; migration of ranges in anticipation of merge not yet implemented")) - return + return reply, util.Errorf("ranges not collocated; migration of ranges in anticipation of merge not yet implemented") } subsumedDesc := subsumedRng.Desc() // Make sure the range being subsumed follows this one. if !bytes.Equal(desc.EndKey, subsumedDesc.StartKey) { - reply.SetGoError(util.Errorf("Ranges that are not adjacent cannot be merged, %s != %s", - desc.EndKey, subsumedDesc.StartKey)) - return + return reply, util.Errorf("Ranges that are not adjacent cannot be merged, %s != %s", desc.EndKey, subsumedDesc.StartKey) } // Ensure that both ranges are collocate by intersecting the store ids from // their replicas. if !replicaSetsEqual(subsumedDesc.GetReplicas(), desc.GetReplicas()) { - reply.SetGoError(util.Error("The two ranges replicas are not collocate")) - return + return reply, util.Error("The two ranges replicas are not collocate") } // Init updated version of existing range descriptor. @@ -1097,9 +1095,10 @@ func (r *Range) AdminMerge(args *proto.AdminMergeRequest, reply *proto.AdminMerg }) return txn.Run(b) }); err != nil { - reply.SetGoError(util.Errorf("merge of range %d into %d failed: %s", - subsumedDesc.RaftID, desc.RaftID, err)) + return reply, util.Errorf("merge of range %d into %d failed: %s", subsumedDesc.RaftID, desc.RaftID, err) } + + return reply, nil } // mergeTrigger is called on a successful commit of an AdminMerge From 4eb7006d36265c09ca985567c271678c3f4a7395 Mon Sep 17 00:00:00 2001 From: Tamir Duberstein Date: Thu, 16 Jul 2015 15:05:27 -0400 Subject: [PATCH 3/7] addWriteCmd --- storage/range.go | 64 +++++++++++++++++++++++-------------------- storage/range_test.go | 14 ++++------ storage/store.go | 4 +-- 3 files changed, 41 insertions(+), 41 deletions(-) diff --git a/storage/range.go b/storage/range.go index 0d8a03d9eeef..11f0663bf34a 100644 --- a/storage/range.go +++ b/storage/range.go @@ -149,13 +149,17 @@ func usesTimestampCache(r proto.Request) bool { return tsCacheMethods[m] } -// A pendingCmd holds the reply buffer and a done channel for a command -// sent to Raft. Once committed to the Raft log, the command is -// executed and the result returned via the done channel. +type responseWithErr struct { + reply proto.Response + err error +} + +// A pendingCmd holds a done channel for a command sent to Raft. Once +// committed to the Raft log, the command is executed and the result returned +// via the done channel. type pendingCmd struct { - Reply proto.Response - ctx context.Context - done chan error // Used to signal waiting RPC handler + ctx context.Context + done chan responseWithErr // Used to signal waiting RPC handler } // A rangeManager is an interface satisfied by Store through which ranges @@ -354,11 +358,11 @@ func (r *Range) requestLeaderLease(timestamp proto.Timestamp) error { // checks from normal request machinery, (e.g. the command queue). // Note that the command itself isn't traced, but usually the caller // waiting for the result has an active Trace. - errChan, pendingCmd := r.proposeRaftCommand(r.context(), args, &proto.InternalLeaderLeaseResponse{}) + errChan, pendingCmd := r.proposeRaftCommand(r.context(), args) var err error if err = <-errChan; err == nil { // Next if the command was committed, wait for the range to apply it. - err = <-pendingCmd.done + err = (<-pendingCmd.done).err } return err } @@ -521,7 +525,7 @@ func (r *Range) AddCmd(ctx context.Context, call proto.Call) error { defer trace.Epoch("read path")() reply, err = r.addReadOnlyCmd(ctx, args) } else if proto.IsWrite(args) { - return r.addWriteCmd(ctx, args, call.Reply, nil) + reply, err = r.addWriteCmd(ctx, args, nil) } else { panic(fmt.Sprintf("don't know how to handle command %T", args)) } @@ -531,10 +535,14 @@ func (r *Range) AddCmd(ctx context.Context, call proto.Call) error { } if err != nil { - if call.Reply.Header().Error != nil { - panic("the world is on fire") + replyHeader := call.Reply.Header() + // Ideally we wouldn't allow any error to be set here, but in the + // `addWriteCmd` case these replies go through the raft machinery + // which hasn't been audited yet. + if replyHeader.Error != nil && replyHeader.Error.Error() != replyHeader.GoError().Error() { + panic(fmt.Sprintf("cannot set error to %s, already set to %s", err, replyHeader.GoError())) } - call.Reply.Header().SetGoError(err) + replyHeader.SetGoError(err) } return err @@ -667,7 +675,7 @@ func (r *Range) addReadOnlyCmd(ctx context.Context, args proto.Request) (proto.R // error returned. If a WaitGroup is supplied, it is signaled when the command // enters Raft or the function returns with a preprocessing error, whichever // happens earlier. -func (r *Range) addWriteCmd(ctx context.Context, args proto.Request, reply proto.Response, wg *sync.WaitGroup) error { +func (r *Range) addWriteCmd(ctx context.Context, args proto.Request, wg *sync.WaitGroup) (proto.Response, error) { signal := func() { if wg != nil { wg.Done() @@ -682,8 +690,7 @@ func (r *Range) addWriteCmd(ctx context.Context, args proto.Request, reply proto header := args.Header() if err := r.checkCmdHeader(args.Header()); err != nil { - reply.Header().SetGoError(err) - return err + return nil, err } trace := tracer.FromCtx(ctx) @@ -700,8 +707,7 @@ func (r *Range) addWriteCmd(ctx context.Context, args proto.Request, reply proto // This replica must have leader lease to process a write. if err := r.redirectOnOrAcquireLeaderLease(trace, header.Timestamp); err != nil { r.endCmd(cmdKey, args, err, false /* !readOnly */) - reply.Header().SetGoError(err) - return err + return nil, err } // Two important invariants of Cockroach: 1) encountering a more @@ -737,15 +743,17 @@ func (r *Range) addWriteCmd(ctx context.Context, args proto.Request, reply proto defer trace.Epoch("raft")() - errChan, pendingCmd := r.proposeRaftCommand(ctx, args, reply) + errChan, pendingCmd := r.proposeRaftCommand(ctx, args) signal() // First wait for raft to commit or abort the command. var err error + var reply proto.Response if err = <-errChan; err == nil { // Next if the command was committed, wait for the range to apply it. - err = <-pendingCmd.done + respWithErr := <-pendingCmd.done + reply, err = respWithErr.reply, respWithErr.err } else if err == multiraft.ErrGroupDeleted { // This error needs to be converted appropriately so that // clients will retry. @@ -755,18 +763,17 @@ func (r *Range) addWriteCmd(ctx context.Context, args proto.Request, reply proto // of this write on success. This ensures a strictly higher // timestamp for successive writes to the same key or key range. r.endCmd(cmdKey, args, err, false /* !readOnly */) - return err + return reply, err } // proposeRaftCommand prepares necessary pending command struct and // initializes a client command ID if one hasn't been. It then // proposes the command to Raft and returns the error channel and // pending command struct for receiving. -func (r *Range) proposeRaftCommand(ctx context.Context, args proto.Request, reply proto.Response) (<-chan error, *pendingCmd) { +func (r *Range) proposeRaftCommand(ctx context.Context, args proto.Request) (<-chan error, *pendingCmd) { pendingCmd := &pendingCmd{ - ctx: ctx, - Reply: reply, - done: make(chan error, 1), + ctx: ctx, + done: make(chan responseWithErr, 1), } raftCmd := proto.InternalRaftCommand{ RaftID: r.Desc().RaftID, @@ -804,16 +811,15 @@ func (r *Range) processRaftCommand(idKey cmdIDKey, index uint64, raftCmd proto.I var reply proto.Response var ctx context.Context if cmd != nil { - // We initiated this command, so use the caller-supplied reply. - reply = cmd.Reply + // We initiated this command, so use the caller-supplied context. ctx = cmd.ctx } else { - // This command originated elsewhere so we must create a new reply buffer. - reply = args.CreateReply() // TODO(tschottdorf): consider the Trace situation here. ctx = r.context() } + reply = args.CreateReply() + execDone := tracer.FromCtx(ctx).Epoch(fmt.Sprintf("applying %s", args.Method())) // applyRaftCommand will return "expected" errors, but may also indicate // replica corruption (as of now, signaled by a replicaCorruptionError). @@ -824,7 +830,7 @@ func (r *Range) processRaftCommand(idKey cmdIDKey, index uint64, raftCmd proto.I execDone() if cmd != nil { - cmd.done <- err + cmd.done <- responseWithErr{reply, err} } else if err != nil && log.V(1) { log.Errorc(r.context(), "error executing raft command %s: %s", args.Method(), err) } diff --git a/storage/range_test.go b/storage/range_test.go index 9f10d143655f..e0907e8062cb 100644 --- a/storage/range_test.go +++ b/storage/range_test.go @@ -282,12 +282,11 @@ func TestRangeContains(t *testing.T) { func setLeaderLease(t *testing.T, r *Range, l *proto.Lease) { args := &proto.InternalLeaderLeaseRequest{Lease: *l} - reply := &proto.InternalLeaderLeaseResponse{} - errChan, pendingCmd := r.proposeRaftCommand(r.context(), args, reply) + errChan, pendingCmd := r.proposeRaftCommand(r.context(), args) var err error if err = <-errChan; err == nil { // Next if the command was committed, wait for the range to apply it. - err = <-pendingCmd.done + err = (<-pendingCmd.done).err } if err != nil { t.Errorf("failed to set lease: %s", err) @@ -360,7 +359,7 @@ func TestApplyCmdLeaseError(t *testing.T) { tc.Start(t) defer tc.Stop() - pArgs, pReply := putArgs(proto.Key("a"), []byte("asd"), + pArgs, _ := putArgs(proto.Key("a"), []byte("asd"), tc.rng.Desc().RaftID, tc.store.StoreID()) pArgs.Timestamp = tc.clock.Now() @@ -374,18 +373,15 @@ func TestApplyCmdLeaseError(t *testing.T) { }) // Submit a proposal to Raft. - errChan, pendingCmd := tc.rng.proposeRaftCommand(tc.rng.context(), pArgs, pReply) + errChan, pendingCmd := tc.rng.proposeRaftCommand(tc.rng.context(), pArgs) if err := <-errChan; err != nil { t.Fatal(err) } - if err := <-pendingCmd.done; err == nil { + if err := (<-pendingCmd.done).err; err == nil { t.Fatalf("expected an error") } else if _, ok := err.(*proto.NotLeaderError); !ok { t.Fatalf("expected not leader error in return, got %s", err) } - if _, ok := pReply.GoError().(*proto.NotLeaderError); !ok { - t.Errorf("expected not leader error in reply header; got %s", pReply.GoError()) - } } func TestRangeRangeBoundsChecking(t *testing.T) { diff --git a/storage/store.go b/storage/store.go index 44f74465482c..97116f15696b 100644 --- a/storage/store.go +++ b/storage/store.go @@ -1412,13 +1412,11 @@ func (s *Store) resolveWriteIntentError(ctx context.Context, wiErr *proto.WriteI Txn: pushReply.PusheeTxn, }, } - resolveReply := &proto.InternalResolveIntentResponse{} wg.Add(1) ctx := tracer.ToCtx(ctx, trace.Fork()) if !s.stopper.RunAsyncTask(func() { - resolveErr := rng.addWriteCmd(ctx, resolveArgs, resolveReply, &wg) - if resolveErr != nil && log.V(1) { + if _, resolveErr := rng.addWriteCmd(ctx, resolveArgs, &wg); resolveErr != nil && log.V(1) { log.Warningc(ctx, "resolve for key %s failed: %s", intentKey, resolveErr) } }) { From c13ab7654071e31ecea4ae9ce3a35d934d530f03 Mon Sep 17 00:00:00 2001 From: Tamir Duberstein Date: Thu, 16 Jul 2015 16:37:13 -0400 Subject: [PATCH 4/7] Trace the rw path --- storage/range.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/storage/range.go b/storage/range.go index 11f0663bf34a..b388c4d5b7bf 100644 --- a/storage/range.go +++ b/storage/range.go @@ -522,9 +522,10 @@ func (r *Range) AddCmd(ctx context.Context, call proto.Call) error { defer trace.Epoch("admin path")() reply, err = r.addAdminCmd(ctx, args) } else if proto.IsReadOnly(args) { - defer trace.Epoch("read path")() + defer trace.Epoch("read-only path")() reply, err = r.addReadOnlyCmd(ctx, args) } else if proto.IsWrite(args) { + defer trace.Epoch("read-write path")() reply, err = r.addWriteCmd(ctx, args, nil) } else { panic(fmt.Sprintf("don't know how to handle command %T", args)) From 5bddbb13124afa0f24d563f8bf3e41b9ce5a5b01 Mon Sep 17 00:00:00 2001 From: Tamir Duberstein Date: Thu, 16 Jul 2015 15:13:30 -0400 Subject: [PATCH 5/7] applyRaftCommandInBatch --- storage/range.go | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/storage/range.go b/storage/range.go index b388c4d5b7bf..c0836102e045 100644 --- a/storage/range.go +++ b/storage/range.go @@ -857,7 +857,10 @@ func (r *Range) applyRaftCommand(ctx context.Context, index uint64, originNode p // Call the helper, which returns a batch containing data written // during command execution and any associated error. ms := engine.MVCCStats{} - batch, rErr := r.applyRaftCommandInBatch(ctx, index, originNode, args, reply, &ms) + batch, batchReply, rErr := r.applyRaftCommandInBatch(ctx, index, originNode, args, &ms) + if batchReply != nil { + gogoproto.Merge(reply, batchReply) + } // ALWAYS set the reply header error to the error returned by the // helper. This is the definitive result of the execution. The // error must be set before saving to the response cache. @@ -900,7 +903,7 @@ func (r *Range) applyRaftCommand(ctx context.Context, index uint64, originNode p // returns the batch containing the results. The caller is responsible // for committing the batch, even on error. func (r *Range) applyRaftCommandInBatch(ctx context.Context, index uint64, originNode proto.RaftNodeID, - args proto.Request, reply proto.Response, ms *engine.MVCCStats) (engine.Engine, error) { + args proto.Request, ms *engine.MVCCStats) (engine.Engine, proto.Response, error) { // Create a new batch for the command to ensure all or nothing semantics. batch := r.rm.Engine().NewBatch() @@ -922,31 +925,26 @@ func (r *Range) applyRaftCommandInBatch(ctx context.Context, index uint64, origi // same ClientCmdID and would get the distributed sender stuck in an // infinite loop, retrieving a stale NotLeaderError over and over // again, even when proposing at the correct replica. - return batch, r.newNotLeaderError(lease, originNode) + return batch, nil, r.newNotLeaderError(lease, originNode) } // Check the response cache to ensure idempotency. if proto.IsWrite(args) { - if cachedReply, err := r.respCache.GetResponse(batch, args.Header().CmdID); err != nil { + if reply, err := r.respCache.GetResponse(batch, args.Header().CmdID); err != nil { // Any error encountered while fetching the response cache entry means corruption. - return batch, newReplicaCorruptionError(util.Errorf("could not read from response cache"), err) - } else if cachedReply != nil { - // TODO(tamird): this shouldn't be needed - gogoproto.Merge(reply, cachedReply) + return batch, reply, newReplicaCorruptionError(util.Errorf("could not read from response cache"), err) + } else if reply != nil { if log.V(1) { log.Infoc(ctx, "found response cache entry for %+v", args.Header().CmdID) } // We successfully read from the response cache, so return whatever error // was present in the cached entry (if any). - return batch, cachedReply.Header().GoError() + return batch, reply, reply.Header().GoError() } } // Execute the command. - executedReply, intents, rErr := r.executeCmd(batch, ms, args) - executedReply.Header().SetGoError(rErr) - reply.Reset() - gogoproto.Merge(reply, executedReply) + reply, intents, rErr := r.executeCmd(batch, ms, args) // Regardless of error, add result to the response cache if this is // a write method. This must be done as part of the execution of // raft commands so that every replica maintains the same responses @@ -963,14 +961,20 @@ func (r *Range) applyRaftCommandInBatch(ctx context.Context, index uint64, origi batch.Close() batch = r.rm.Engine().NewBatch() } + // TODO(tamird): remove this when the response cache can handle these errors itself + if reply.Header().Error != nil { + panic("the world is on fire") + } + reply.Header().SetGoError(rErr) if err := r.respCache.PutResponse(batch, args.Header().CmdID, reply); err != nil { log.Fatalc(ctx, "putting a response cache entry in a batch should never fail: %s", err) } + reply.Header().Error = nil } // If the execution of the command wasn't successful, stop here. if rErr != nil { - return batch, rErr + return batch, reply, rErr } // On success and only on the replica on which this command originated, @@ -979,7 +983,7 @@ func (r *Range) applyRaftCommandInBatch(ctx context.Context, index uint64, origi r.handleSkippedIntents(args, intents) } - return batch, nil + return batch, reply, nil } // getLeaseForGossip tries to obtain a leader lease. Only one of the replicas From 8d1706698556ac64b81fda30a9b95f75dedfe304 Mon Sep 17 00:00:00 2001 From: Tamir Duberstein Date: Thu, 16 Jul 2015 16:48:15 -0400 Subject: [PATCH 6/7] Move the TODO up a level --- storage/range.go | 5 ++++- storage/range_command.go | 6 ++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/storage/range.go b/storage/range.go index c0836102e045..cf0e9b487c5b 100644 --- a/storage/range.go +++ b/storage/range.go @@ -961,7 +961,10 @@ func (r *Range) applyRaftCommandInBatch(ctx context.Context, index uint64, origi batch.Close() batch = r.rm.Engine().NewBatch() } - // TODO(tamird): remove this when the response cache can handle these errors itself + // TODO(tamird): move this into the response cache itself + if reply == nil { + reply = args.CreateReply() + } if reply.Header().Error != nil { panic("the world is on fire") } diff --git a/storage/range_command.go b/storage/range_command.go index 8d8438be0ca8..1601d81224be 100644 --- a/storage/range_command.go +++ b/storage/range_command.go @@ -45,15 +45,13 @@ func (r *Range) executeCmd(batch engine.Engine, ms *engine.MVCCStats, args proto header := args.Header() if err := r.checkCmdHeader(header); err != nil { - // TODO(tamird): Remove the CreateReply when upstream doesn't need it. - return args.CreateReply(), nil, err + return nil, nil, err } // If a unittest filter was installed, check for an injected error; otherwise, continue. if TestingCommandFilter != nil { if err := TestingCommandFilter(args); err != nil { - // TODO(tamird): Remove the CreateReply when upstream doesn't need it. - return args.CreateReply(), nil, err + return nil, nil, err } } From 140d8558ce8c2ab6897993d718ed363eff54e3ac Mon Sep 17 00:00:00 2001 From: Tamir Duberstein Date: Thu, 16 Jul 2015 16:53:30 -0400 Subject: [PATCH 7/7] applyRaftCommand --- storage/range.go | 31 ++++++++++--------------------- 1 file changed, 10 insertions(+), 21 deletions(-) diff --git a/storage/range.go b/storage/range.go index cf0e9b487c5b..d4d21d6b8d9c 100644 --- a/storage/range.go +++ b/storage/range.go @@ -537,11 +537,8 @@ func (r *Range) AddCmd(ctx context.Context, call proto.Call) error { if err != nil { replyHeader := call.Reply.Header() - // Ideally we wouldn't allow any error to be set here, but in the - // `addWriteCmd` case these replies go through the raft machinery - // which hasn't been audited yet. - if replyHeader.Error != nil && replyHeader.Error.Error() != replyHeader.GoError().Error() { - panic(fmt.Sprintf("cannot set error to %s, already set to %s", err, replyHeader.GoError())) + if replyHeader.Error != nil { + panic("the world is on fire") } replyHeader.SetGoError(err) } @@ -819,15 +816,12 @@ func (r *Range) processRaftCommand(idKey cmdIDKey, index uint64, raftCmd proto.I ctx = r.context() } - reply = args.CreateReply() - execDone := tracer.FromCtx(ctx).Epoch(fmt.Sprintf("applying %s", args.Method())) // applyRaftCommand will return "expected" errors, but may also indicate // replica corruption (as of now, signaled by a replicaCorruptionError). // We feed its return through maybeSetCorrupt to act when that happens. - err := r.maybeSetCorrupt( - r.applyRaftCommand(ctx, index, proto.RaftNodeID(raftCmd.OriginNodeID), args, reply), - ) + reply, err := r.applyRaftCommand(ctx, index, proto.RaftNodeID(raftCmd.OriginNodeID), args) + err = r.maybeSetCorrupt(err) execDone() if cmd != nil { @@ -843,7 +837,7 @@ func (r *Range) processRaftCommand(idKey cmdIDKey, index uint64, raftCmd proto.I // underlying state machine (i.e. the engine). // When certain critical operations fail, a replicaCorruptionError may be // returned and must be handled by the caller. -func (r *Range) applyRaftCommand(ctx context.Context, index uint64, originNode proto.RaftNodeID, args proto.Request, reply proto.Response) error { +func (r *Range) applyRaftCommand(ctx context.Context, index uint64, originNode proto.RaftNodeID, args proto.Request) (proto.Response, error) { if index <= 0 { log.Fatalc(ctx, "raft command index is <= 0") } @@ -851,20 +845,13 @@ func (r *Range) applyRaftCommand(ctx context.Context, index uint64, originNode p // If we have an out of order index, there's corruption. No sense in trying // to update anything or run the command. Simply return a corruption error. if oldIndex := atomic.LoadUint64(&r.appliedIndex); oldIndex >= index { - return newReplicaCorruptionError(util.Errorf("applied index moved backwards: %d >= %d", oldIndex, index)) + return nil, newReplicaCorruptionError(util.Errorf("applied index moved backwards: %d >= %d", oldIndex, index)) } // Call the helper, which returns a batch containing data written // during command execution and any associated error. ms := engine.MVCCStats{} - batch, batchReply, rErr := r.applyRaftCommandInBatch(ctx, index, originNode, args, &ms) - if batchReply != nil { - gogoproto.Merge(reply, batchReply) - } - // ALWAYS set the reply header error to the error returned by the - // helper. This is the definitive result of the execution. The - // error must be set before saving to the response cache. - reply.Header().SetGoError(rErr) + batch, reply, rErr := r.applyRaftCommandInBatch(ctx, index, originNode, args, &ms) defer batch.Close() // Advance the last applied index and commit the batch. @@ -896,7 +883,7 @@ func (r *Range) applyRaftCommand(ctx context.Context, index uint64, originNode p } } - return rErr + return reply, rErr } // applyRaftCommandInBatch executes the command in a batch engine and @@ -937,6 +924,8 @@ func (r *Range) applyRaftCommandInBatch(ctx context.Context, index uint64, origi if log.V(1) { log.Infoc(ctx, "found response cache entry for %+v", args.Header().CmdID) } + // TODO(tamird): move this into the response cache itself + defer func() { reply.Header().Error = nil }() // We successfully read from the response cache, so return whatever error // was present in the cached entry (if any). return batch, reply, reply.Header().GoError()