Skip to content

Commit

Permalink
storage: Remove intentResolver optimization for local Replica
Browse files Browse the repository at this point in the history
This optimization uses direct access to the Replica object,
which is unusual and complicates the Store/Replica concurrency
model. It's cleaner to use the DistSender for both local and
remote intents, although it means we can no longer detect when
a command has been proposed, and must instead wait for it to be
completed.

Fixes cockroachdb#7933
  • Loading branch information
bdarnell committed Aug 7, 2016
1 parent bee9130 commit d41dc72
Showing 1 changed file with 20 additions and 68 deletions.
88 changes: 20 additions & 68 deletions storage/intent_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package storage

import (
"sync"

"github.com/cockroachdb/cockroach/base"
"github.com/cockroachdb/cockroach/internal/client"
"github.com/cockroachdb/cockroach/keys"
Expand All @@ -29,7 +27,6 @@ import (
"github.com/cockroachdb/cockroach/util/syncutil"
"github.com/cockroachdb/cockroach/util/tracing"
"github.com/cockroachdb/cockroach/util/uuid"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -354,17 +351,21 @@ func (ir *intentResolver) processIntentsAsync(r *Replica, intents []intentsWithA
}
}

// resolveIntents resolves the given intents. For those which are
// local to the range, we submit directly to the local Raft instance;
// all non-local intents are resolved asynchronously in a batch. If
// `wait` is true, all operations are carried out synchronously and an
// error is returned. Otherwise, the call returns without error as
// soon as all local resolve commands have been **proposed** (not
// executed). This ensures that if a waiting client retries
// immediately after calling this function, it will not hit the same
// intents again.
// resolveIntents resolves the given intents. `wait` is currently a
// no-op; all intents are resolved synchronously.
//
// TODO(bdarnell): Restore the wait=false optimization when/if #8360
// is fixed. `wait=false` requests a semi-synchronous operation,
// returning when all local commands have been *proposed* but not yet
// committed or executed. This ensures that if a waiting client
// retries immediately after calling this function, it will not hit
// the same intents again (in the absence of #8360, we provide this
// guarantee by resolving the intents synchronously regardless of the
// `wait` argument).
func (ir *intentResolver) resolveIntents(ctx context.Context, r *Replica,
intents []roachpb.Intent, wait bool, poison bool) error {
// Force synchronous operation; see above TODO.
wait = true
if len(intents) == 0 {
return nil
}
Expand All @@ -373,13 +374,10 @@ func (ir *intentResolver) resolveIntents(ctx context.Context, r *Replica,
defer cleanup()
log.Tracef(ctx, "resolving intents [wait=%t]", wait)

var reqsRemote []roachpb.Request
baLocal := roachpb.BatchRequest{}
baLocal.Timestamp = ir.store.Clock().Now()
var reqs []roachpb.Request
for i := range intents {
intent := intents[i] // avoids a race in `i, intent := range ...`
var resolveArgs roachpb.Request
var local bool // whether this intent lives on this Range
{
if len(intent.EndKey) == 0 {
resolveArgs = &roachpb.ResolveIntentRequest{
Expand All @@ -388,66 +386,23 @@ func (ir *intentResolver) resolveIntents(ctx context.Context, r *Replica,
Status: intent.Status,
Poison: poison,
}
local = r.ContainsKey(intent.Key)
} else {
resolveArgs = &roachpb.ResolveIntentRangeRequest{
Span: intent.Span,
IntentTxn: intent.Txn,
Status: intent.Status,
Poison: poison,
}
local = r.ContainsKeyRange(intent.Key, intent.EndKey)
}
}

// If the intent isn't (completely) local, we'll need to send an external request.
// We'll batch them all up and send at the end.
if local {
baLocal.Add(resolveArgs)
} else {
reqsRemote = append(reqsRemote, resolveArgs)
}
}

// The local batch goes directly to Raft.
var wg sync.WaitGroup
if len(baLocal.Requests) > 0 {
action := func(ctx context.Context) error {
// Always operate with a timeout when resolving intents: this
// prevents rare shutdown timeouts in tests.
ctxWithTimeout, cancel := context.WithTimeout(ctx, base.NetworkTimeout)
defer cancel()
_, pErr := r.addWriteCmd(ctxWithTimeout, baLocal, &wg)
return pErr.GoError()
}
wg.Add(1)
if wait || r.store.Stopper().RunLimitedAsyncTask(ir.sem, func() {
// Trace this under the ID of the intent owner.
// Create a new span though, since we do not want to pass a span
// between goroutines or we risk use-after-finish.
sp := r.store.Tracer().StartSpan("resolve intents")
defer sp.Finish()
spanCtx := opentracing.ContextWithSpan(ctx, sp)

if err := action(spanCtx); err != nil {
log.Warningf(spanCtx, "unable to resolve local intents; %s", err)
}
}) != nil {
// Still run the task when draining. Our caller already has a task and
// going async here again is merely for performance, but some intents
// need to be resolved because they might block other tasks. See #1684.
// Note that handleSkippedIntents has a TODO in case #1684 comes back.
// TODO(tschottdorf): This is ripe for removal.
if err := action(ctx); err != nil {
return err
}
}
reqs = append(reqs, resolveArgs)
}

// Resolve all of the intents which aren't local to the Range.
if len(reqsRemote) > 0 {
// Resolve all of the intents.
if len(reqs) > 0 {
b := &client.Batch{}
b.AddRawRequest(reqsRemote...)
b.AddRawRequest(reqs...)
action := func() error {
// TODO(tschottdorf): no tracing here yet.
return r.store.DB().Run(b)
Expand All @@ -457,17 +412,14 @@ func (ir *intentResolver) resolveIntents(ctx context.Context, r *Replica,
log.Warningf(ctx, "unable to resolve external intents: %s", err)
}
}) != nil {
// As with local intents, try async to not keep the caller waiting, but
// when draining just go ahead and do it synchronously. See #1684.
// Try async to not keep the caller waiting, but when draining
// just go ahead and do it synchronously. See #1684.
// TODO(tschottdorf): This is ripe for removal.
if err := action(); err != nil {
return err
}
}
}

// Wait until the local ResolveIntents batch has been submitted to
// raft. No-op if all were non-local.
wg.Wait()
return nil
}

0 comments on commit d41dc72

Please sign in to comment.