Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bugs related to best-effort queries #3125

Merged
merged 8 commits into from
Mar 13, 2019
4 changes: 3 additions & 1 deletion dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,9 @@ func run() {
}
}
otrace.ApplyConfig(otrace.Config{
DefaultSampler: otrace.ProbabilitySampler(x.WorkerConfig.Tracing)})
DefaultSampler: otrace.ProbabilitySampler(x.WorkerConfig.Tracing),
MaxAnnotationEventsPerSpan: 64,
})

// Posting will initialize index which requires schema. Hence, initialize
// schema before calling posting.Init().
Expand Down
6 changes: 3 additions & 3 deletions dgraph/cmd/counter/increment.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ func queryCounter(txn *dgo.Txn, pred string) (Counter, error) {

func process(dg *dgo.Dgraph, conf *viper.Viper) (Counter, error) {
ro := conf.GetBool("ro")
bo := conf.GetBool("bo")
be := conf.GetBool("be")
pred := conf.GetString("pred")
var txn *dgo.Txn

switch {
case bo:
case be:
txn = dg.NewReadOnlyTxn().BestEffort()
case ro:
txn = dg.NewReadOnlyTxn()
Expand All @@ -109,7 +109,7 @@ func process(dg *dgo.Dgraph, conf *viper.Viper) (Counter, error) {
if err != nil {
return Counter{}, err
}
if bo || ro {
if be || ro {
return counter, nil
}

Expand Down
129 changes: 113 additions & 16 deletions dgraph/cmd/counter/increment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,49 @@ package counter

import (
"context"
"fmt"
"math/rand"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/dgraph-io/dgo"
"github.com/dgraph-io/dgo/protos/api"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/dgraph/z"
"github.com/spf13/viper"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/metadata"
)

const N = 10
const pred = "counter"

func incrementInLoop(t *testing.T, dg *dgo.Dgraph, M int) int {
conf := viper.New()
conf.Set("pred", "counter.val")

var max int
for i := 0; i < M; i++ {
cnt, err := process(dg, conf)
if err != nil {
if strings.Index(err.Error(), "Transaction has been aborted") >= 0 {
// pass
} else {
t.Logf("Error while incrementing: %v\n", err)
}
} else {
if cnt.Val > max {
max = cnt.Val
}
}
}
t.Logf("Last value written by increment in loop: %d", max)
return max
}

func increment(t *testing.T, dg *dgo.Dgraph) int {
var max int
var mu sync.Mutex
Expand All @@ -50,26 +78,19 @@ func increment(t *testing.T, dg *dgo.Dgraph) int {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < N; i++ {
cnt, err := process(dg, false, pred)
if err != nil {
if strings.Index(err.Error(), "Transaction has been aborted") >= 0 {
// pass
} else {
t.Logf("Error while incrementing: %v\n", err)
}
} else {
storeMax(cnt.Val)
}
}
max := incrementInLoop(t, dg, N)
storeMax(max)
}()
}
wg.Wait()
return max
}

func read(t *testing.T, dg *dgo.Dgraph, expected int) {
cnt, err := process(dg, true, pred)
conf := viper.New()
conf.Set("pred", "counter.val")
conf.Set("ro", true)
cnt, err := process(dg, conf)
require.NoError(t, err)
ts := cnt.startTs
t.Logf("Readonly stage counter: %+v\n", cnt)
Expand All @@ -80,7 +101,7 @@ func read(t *testing.T, dg *dgo.Dgraph, expected int) {
go func() {
defer wg.Done()
for i := 0; i < N; i++ {
cnt, err := process(dg, true, pred)
cnt, err := process(dg, conf)
if err != nil {
t.Logf("Error while reading: %v\n", err)
} else {
Expand All @@ -93,8 +114,28 @@ func read(t *testing.T, dg *dgo.Dgraph, expected int) {
wg.Wait()
}

func TestIncrement(t *testing.T) {
func readBestEffort(t *testing.T, dg *dgo.Dgraph, pred string, M int) {
conf := viper.New()
conf.Set("pred", pred)
conf.Set("be", true)
var last int
for i := 0; i < M; i++ {
cnt, err := process(dg, conf)
if err != nil {
t.Errorf("Error while reading: %v", err)
} else {
if last > cnt.Val {
t.Errorf("Current %d < Last %d", cnt.Val, last)
}
last = cnt.Val
}
}
t.Logf("Last value read by best effort: %d", last)
}

func setup(t *testing.T) *dgo.Dgraph {
dg := z.DgraphClientWithGroot(":9180")
// dg := z.DgraphClient(":9180")
ctx := context.Background()
op := api.Operation{DropAll: true}

Expand All @@ -105,13 +146,20 @@ func TestIncrement(t *testing.T) {
ctx = metadata.NewOutgoingContext(ctx, md)
x.Check(dg.Alter(ctx, &op))

cnt, err := process(dg, false, pred)
conf := viper.New()
conf.Set("pred", "counter.val")
cnt, err := process(dg, conf)
if err != nil {
t.Logf("Error while reading: %v\n", err)
} else {
t.Logf("Initial value: %d\n", cnt.Val)
}

return dg
}

func TestIncrement(t *testing.T) {
dg := setup(t)
val := increment(t, dg)
t.Logf("Increment stage done. Got value: %d\n", val)
read(t, dg, val)
Expand All @@ -121,3 +169,52 @@ func TestIncrement(t *testing.T) {
read(t, dg, val)
t.Logf("Read stage done with value: %d\n", val)
}

func TestBestEffort(t *testing.T) {
dg := setup(t)

var done int32
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for i := 0; ; i++ {
incrementInLoop(t, dg, 5)
if atomic.LoadInt32(&done) > 0 {
return
}
}
}()
go func() {
defer wg.Done()
time.Sleep(time.Second)
readBestEffort(t, dg, "counter.val", 1000)
atomic.AddInt32(&done, 1)
}()
wg.Wait()
t.Logf("Write/Best-Effort read stage OK.")
}

func TestBestEffortOnly(t *testing.T) {
dg := setup(t)
readBestEffort(t, dg, fmt.Sprintf("counter.val.%d", rand.Int()), 1)
time.Sleep(time.Second)

doneCh := make(chan struct{})
go func() {
for i := 0; i < 10; i++ {
readBestEffort(t, dg, fmt.Sprintf("counter.val.%d", rand.Int()), 1)
}
doneCh <- struct{}{}
}()

timer := time.NewTimer(15 * time.Second)
defer timer.Stop()

select {
case <-timer.C:
t.FailNow()
case <-doneCh:
}
t.Logf("Best-Effort only reads with multiple preds OK.")
}
7 changes: 7 additions & 0 deletions dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,13 @@ func (n *node) handleTabletProposal(tablet *pb.Tablet) error {
sort.Strings(preds)
g.Checksum = farm.Fingerprint64([]byte(strings.Join(preds, "")))
}
if n.AmLeader() {
// It is important to push something to Oracle updates channel, so the subscribers would
// get the latest checksum that we calculated above. Otherwise, if all the queries are
// best effort queries which don't create any transaction, then the OracleDelta never
// gets sent to Alphas, causing their group checksum to mismatch and never converge.
n.server.orc.updates <- &pb.OracleDelta{}
}
}()

if tablet.GroupId == 0 {
Expand Down
24 changes: 15 additions & 9 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,30 +572,36 @@ func (s *Server) doQuery(ctx context.Context, req *api.Request) (resp *api.Respo
return resp, err
}

var queryRequest = query.QueryRequest{
Latency: &l,
GqlQuery: &parsedReq,
}
// Here we try our best effort to not contact Zero for a timestamp. If we succeed,
// then we use the max known transaction ts value (from ProcessDelta) for a read-only query.
// If we haven't processed any updates yet then fall back to getting TS from Zero.
switch {
case req.BestEffort:
span.Annotate([]otrace.Attribute{otrace.BoolAttribute("be", true)}, "")
case req.ReadOnly:
span.Annotate([]otrace.Attribute{otrace.BoolAttribute("ro", true)}, "")
default:
span.Annotate([]otrace.Attribute{otrace.BoolAttribute("no", true)}, "")
}
if req.BestEffort {
// Sanity: check that request is read-only too.
if !req.ReadOnly {
return resp, x.Errorf("A best effort query must be read-only.")
}
req.StartTs = posting.Oracle().MaxAssigned()
queryRequest.Cache = worker.NoTxnCache
}
if req.StartTs == 0 {
req.StartTs = State.getTimestamp(req.ReadOnly)
}
resp.Txn = &api.TxnContext{
StartTs: req.StartTs,
}
queryRequest.ReadTs = req.StartTs
resp.Txn = &api.TxnContext{StartTs: req.StartTs}
annotateStartTs(span, req.StartTs)

var queryRequest = query.QueryRequest{
Latency: &l,
GqlQuery: &parsedReq,
ReadTs: req.StartTs,
}

// Core processing happens here.
var er query.ExecutionResult
if er, err = queryRequest.Process(ctx); err != nil {
Expand Down
1 change: 1 addition & 0 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ message Query {
bool expand_all = 10; // expand all language variants.

uint64 read_ts = 13;
int32 cache = 14;
}

message ValueList {
Expand Down
Loading