Skip to content

Commit

Permalink
WIP on TestDrainingProcessorSwallowsUncertaintyError
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
yuzefovich committed Aug 2, 2019
1 parent fa5f826 commit 4631246
Showing 1 changed file with 24 additions and 12 deletions.
36 changes: 24 additions & 12 deletions pkg/sql/distsqlrun/processors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"net/url"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"

Expand All @@ -32,6 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/jackc/pgx"
)

Expand Down Expand Up @@ -486,7 +488,7 @@ func TestDrainingProcessorSwallowsUncertaintyError(t *testing.T) {
defer leaktest.AfterTest(t)()

// We're going to test by running a query that selects rows 1..10 with limit
// 5. Out of these, rows 1..5 are on node 2, 6..10 on node 1. We're going to
// 5. Out of these, rows 1..5 are on node 1, 6..10 on node 2. We're going to
// block the read on node 1 until the client gets the 5 rows from node 2. Then
// we're going to inject an uncertainty error in the blocked read. The point
// of the test is to check that the error is swallowed, because the processor
Expand All @@ -501,8 +503,12 @@ func TestDrainingProcessorSwallowsUncertaintyError(t *testing.T) {
// trapRead is set, atomically, once the test wants to block a read on the
// first node.
var trapRead int64
blockedRead := make(chan roachpb.BatchRequest)
unblockRead := make(chan *roachpb.Error)
// unblockRead is a condition variable that the read on the first node will
// block on. Once it is signaled, the first node will get an injected
// uncertainty error.
mu := &syncutil.Mutex{}
unblockRead := sync.NewCond(mu)
blockRead := true

tc := serverutils.StartTestCluster(t, 3, /* numNodes */
base.TestClusterArgs{
Expand All @@ -525,9 +531,17 @@ func TestDrainingProcessorSwallowsUncertaintyError(t *testing.T) {
}
key := req.(*roachpb.ScanRequest).Key.String()
endKey := req.(*roachpb.ScanRequest).EndKey.String()
if strings.Contains(key, "/1") && strings.Contains(endKey, "5/") {
blockedRead <- ba
return <-unblockRead
if strings.Contains(key, "53/1") && strings.Contains(endKey, "53/1/5/") {
mu.Lock()
for blockRead {
unblockRead.Wait()
}
mu.Unlock()
return roachpb.NewError(
roachpb.NewReadWithinUncertaintyIntervalError(
ba.Timestamp, /* readTs */
ba.Timestamp.Add(1, 0), /* existingTs */
ba.Txn))
}
return nil
},
Expand Down Expand Up @@ -621,12 +635,10 @@ func TestDrainingProcessorSwallowsUncertaintyError(t *testing.T) {
// After we've gotten all the rows from the second node, let the first node
// return an uncertainty error.
if n == 10 {
ba := <-blockedRead
unblockRead <- roachpb.NewError(
roachpb.NewReadWithinUncertaintyIntervalError(
ba.Timestamp, /* readTs */
ba.Timestamp.Add(1, 0), /* existingTs */
ba.Txn))
mu.Lock()
blockRead = false
mu.Unlock()
unblockRead.Signal()
}
}
err = rows.Err()
Expand Down

0 comments on commit 4631246

Please sign in to comment.