Skip to content

Commit

Permalink
Fix bugs related to best-effort queries (#3125)
Browse files Browse the repository at this point in the history
- Don't use a pending txn cache when running best effort queries to avoid returning uncommitted data. If it gets used, a best-effort query ends up reading uncommitted data from another txn's cache who owns the start ts that best-effort query is "borrowing."

- If no txns are going on in the system and we create new predicates by asking for them (case in v1.0 series), a best-effort query would block forever due to predicate checksum mismatch between `OracleDelta` and `MembershipState`. Fix that by sending out an empty `OracleDelta` from Zero after a tablet update.

- Add tests in counter tool to check for these edge cases.

Changes:

* Don't use a pending txn cache when running best effort queries to avoid returning uncommitted data.
* Fix a bug caused by best effort queries, where we add a new predicate to the system without doing any transactions. This causes the group checksum to mismatch and stay that way. The fix would push something to Oracle update channel so a new OracleDelta would be produced and sent out.
* Add tests to ensure that best effort queries work as expected.
  • Loading branch information
manishrjain committed Mar 13, 2019
1 parent 691b3b3 commit 7a3bb09
Show file tree
Hide file tree
Showing 11 changed files with 419 additions and 246 deletions.
4 changes: 3 additions & 1 deletion dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,9 @@ func run() {
}
}
otrace.ApplyConfig(otrace.Config{
DefaultSampler: otrace.ProbabilitySampler(worker.Config.Tracing)})
DefaultSampler: otrace.ProbabilitySampler(worker.Config.Tracing),
MaxAnnotationEventsPerSpan: 64,
})

// Posting will initialize index which requires schema. Hence, initialize
// schema before calling posting.Init().
Expand Down
29 changes: 19 additions & 10 deletions dgraph/cmd/counter/increment.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,31 @@ func queryCounter(txn *dgo.Txn, pred string) (Counter, error) {
return counter, nil
}

func process(dg *dgo.Dgraph, readOnly bool, pred string) (Counter, error) {
if readOnly {
txn := dg.NewReadOnlyTxn()
defer txn.Discard(nil)
return queryCounter(txn, pred)
func process(dg *dgo.Dgraph, conf *viper.Viper) (Counter, error) {
ro := conf.GetBool("ro")
be := conf.GetBool("be")
pred := conf.GetString("pred")
var txn *dgo.Txn

switch {
case be:
txn = dg.NewReadOnlyTxn().BestEffort()
case ro:
txn = dg.NewReadOnlyTxn()
default:
txn = dg.NewTxn()
}
defer txn.Discard(nil)

txn := dg.NewTxn()
counter, err := queryCounter(txn, pred)
if err != nil {
return Counter{}, err
}
counter.Val++
if be || ro {
return counter, nil
}

counter.Val++
var mu api.Mutation
if len(counter.Uid) == 0 {
counter.Uid = "_:new"
Expand All @@ -117,8 +128,6 @@ func run(conf *viper.Viper) {
addr := conf.GetString("addr")
waitDur := conf.GetDuration("wait")
num := conf.GetInt("num")
ro := conf.GetBool("ro")
pred := conf.GetString("pred")
conn, err := grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
log.Fatal(err)
Expand All @@ -127,7 +136,7 @@ func run(conf *viper.Viper) {
dg := dgo.NewDgraphClient(dc)

for num > 0 {
cnt, err := process(dg, ro, pred)
cnt, err := process(dg, conf)
now := time.Now().UTC().Format("0102 03:04:05.999")
if err != nil {
fmt.Printf("%-17s While trying to process counter: %v. Retrying...\n", now, err)
Expand Down
136 changes: 113 additions & 23 deletions dgraph/cmd/counter/increment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,48 @@ package counter

import (
"context"
"log"
"fmt"
"math/rand"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/dgraph-io/dgo"
"github.com/dgraph-io/dgo/protos/api"
"github.com/dgraph-io/dgraph/x"
"github.com/spf13/viper"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

const N = 10
const pred = "counter"

func incrementInLoop(t *testing.T, dg *dgo.Dgraph, M int) int {
conf := viper.New()
conf.Set("pred", "counter.val")

var max int
for i := 0; i < M; i++ {
cnt, err := process(dg, conf)
if err != nil {
if strings.Index(err.Error(), "Transaction has been aborted") >= 0 {
// pass
} else {
t.Logf("Error while incrementing: %v\n", err)
}
} else {
if cnt.Val > max {
max = cnt.Val
}
}
}
t.Logf("Last value written by increment in loop: %d", max)
return max
}

func increment(t *testing.T, dg *dgo.Dgraph) int {
var max int
var mu sync.Mutex
Expand All @@ -51,26 +77,19 @@ func increment(t *testing.T, dg *dgo.Dgraph) int {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < N; i++ {
cnt, err := process(dg, false, pred)
if err != nil {
if strings.Index(err.Error(), "Transaction has been aborted") >= 0 {
// pass
} else {
t.Logf("Error while incrementing: %v\n", err)
}
} else {
storeMax(cnt.Val)
}
}
max := incrementInLoop(t, dg, N)
storeMax(max)
}()
}
wg.Wait()
return max
}

func read(t *testing.T, dg *dgo.Dgraph, expected int) {
cnt, err := process(dg, true, pred)
conf := viper.New()
conf.Set("pred", "counter.val")
conf.Set("ro", true)
cnt, err := process(dg, conf)
require.NoError(t, err)
ts := cnt.startTs
t.Logf("Readonly stage counter: %+v\n", cnt)
Expand All @@ -81,7 +100,7 @@ func read(t *testing.T, dg *dgo.Dgraph, expected int) {
go func() {
defer wg.Done()
for i := 0; i < N; i++ {
cnt, err := process(dg, true, pred)
cnt, err := process(dg, conf)
if err != nil {
t.Logf("Error while reading: %v\n", err)
} else {
Expand All @@ -94,14 +113,29 @@ func read(t *testing.T, dg *dgo.Dgraph, expected int) {
wg.Wait()
}

func TestIncrement(t *testing.T) {
conn, err := grpc.Dial("localhost:9180", grpc.WithInsecure())
if err != nil {
log.Fatal(err)
func readBestEffort(t *testing.T, dg *dgo.Dgraph, pred string, M int) {
conf := viper.New()
conf.Set("pred", pred)
conf.Set("be", true)
var last int
for i := 0; i < M; i++ {
cnt, err := process(dg, conf)
if err != nil {
t.Errorf("Error while reading: %v", err)
} else {
if last > cnt.Val {
t.Errorf("Current %d < Last %d", cnt.Val, last)
}
last = cnt.Val
}
}
dc := api.NewDgraphClient(conn)
dg := dgo.NewDgraphClient(dc)
t.Logf("Last value read by best effort: %d", last)
}

func setup(t *testing.T) *dgo.Dgraph {
dg := z.DgraphClientWithGroot(":9180")
// dg := z.DgraphClient(":9180")
ctx := context.Background()
op := api.Operation{DropAll: true}

// The following piece of code shows how one can set metadata with
Expand All @@ -111,13 +145,20 @@ func TestIncrement(t *testing.T) {
ctx := metadata.NewOutgoingContext(context.Background(), md)
x.Check(dg.Alter(ctx, &op))

cnt, err := process(dg, false, pred)
conf := viper.New()
conf.Set("pred", "counter.val")
cnt, err := process(dg, conf)
if err != nil {
t.Logf("Error while reading: %v\n", err)
} else {
t.Logf("Initial value: %d\n", cnt.Val)
}

return dg
}

func TestIncrement(t *testing.T) {
dg := setup(t)
val := increment(t, dg)
t.Logf("Increment stage done. Got value: %d\n", val)
read(t, dg, val)
Expand All @@ -127,3 +168,52 @@ func TestIncrement(t *testing.T) {
read(t, dg, val)
t.Logf("Read stage done with value: %d\n", val)
}

func TestBestEffort(t *testing.T) {
dg := setup(t)

var done int32
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for i := 0; ; i++ {
incrementInLoop(t, dg, 5)
if atomic.LoadInt32(&done) > 0 {
return
}
}
}()
go func() {
defer wg.Done()
time.Sleep(time.Second)
readBestEffort(t, dg, "counter.val", 1000)
atomic.AddInt32(&done, 1)
}()
wg.Wait()
t.Logf("Write/Best-Effort read stage OK.")
}

func TestBestEffortOnly(t *testing.T) {
dg := setup(t)
readBestEffort(t, dg, fmt.Sprintf("counter.val.%d", rand.Int()), 1)
time.Sleep(time.Second)

doneCh := make(chan struct{})
go func() {
for i := 0; i < 10; i++ {
readBestEffort(t, dg, fmt.Sprintf("counter.val.%d", rand.Int()), 1)
}
doneCh <- struct{}{}
}()

timer := time.NewTimer(15 * time.Second)
defer timer.Stop()

select {
case <-timer.C:
t.FailNow()
case <-doneCh:
}
t.Logf("Best-Effort only reads with multiple preds OK.")
}
7 changes: 7 additions & 0 deletions dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,13 @@ func (n *node) handleTabletProposal(tablet *pb.Tablet) error {
sort.Strings(preds)
g.Checksum = farm.Fingerprint64([]byte(strings.Join(preds, "")))
}
if n.AmLeader() {
// It is important to push something to Oracle updates channel, so the subscribers would
// get the latest checksum that we calculated above. Otherwise, if all the queries are
// best effort queries which don't create any transaction, then the OracleDelta never
// gets sent to Alphas, causing their group checksum to mismatch and never converge.
n.server.orc.updates <- &pb.OracleDelta{}
}
}()

if tablet.GroupId == 0 {
Expand Down
28 changes: 19 additions & 9 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,30 +477,40 @@ func (s *Server) Query(ctx context.Context, req *api.Request) (resp *api.Respons
return resp, err
}

// if err = validateQuery(parsedReq.Query); err != nil {
// return resp, err
// }

var queryRequest = query.QueryRequest{
Latency: &l,
GqlQuery: &parsedReq,
}
// Here we try our best effort to not contact Zero for a timestamp. If we succeed,
// then we use the max known transaction ts value (from ProcessDelta) for a read-only query.
// If we haven't processed any updates yet then fall back to getting TS from Zero.
switch {
case req.BestEffort:
span.Annotate([]otrace.Attribute{otrace.BoolAttribute("be", true)}, "")
case req.ReadOnly:
span.Annotate([]otrace.Attribute{otrace.BoolAttribute("ro", true)}, "")
default:
span.Annotate([]otrace.Attribute{otrace.BoolAttribute("no", true)}, "")
}
if req.BestEffort {
// Sanity: check that request is read-only too.
if !req.ReadOnly {
return resp, x.Errorf("A best effort query must be read-only.")
}
req.StartTs = posting.Oracle().MaxAssigned()
queryRequest.Cache = worker.NoTxnCache
}
if req.StartTs == 0 {
req.StartTs = State.getTimestamp(req.ReadOnly)
}
resp.Txn = &api.TxnContext{
StartTs: req.StartTs,
}
queryRequest.ReadTs = req.StartTs
resp.Txn = &api.TxnContext{StartTs: req.StartTs}
annotateStartTs(span, req.StartTs)

var queryRequest = query.QueryRequest{
Latency: &l,
GqlQuery: &parsedReq,
ReadTs: req.StartTs,
}

// Core processing happens here.
var er query.ExecuteResult
if er, err = queryRequest.Process(ctx); err != nil {
Expand Down
1 change: 1 addition & 0 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ message Query {
bool expand_all = 10; // expand all language variants.

uint64 read_ts = 13;
int32 cache = 14;
}

message ValueList {
Expand Down
Loading

0 comments on commit 7a3bb09

Please sign in to comment.