diff --git a/dgraph/cmd/alpha/run.go b/dgraph/cmd/alpha/run.go index 228acc8c138..a0d8e88bd66 100644 --- a/dgraph/cmd/alpha/run.go +++ b/dgraph/cmd/alpha/run.go @@ -189,7 +189,7 @@ they form a Raft group and provide synchronous replication. grpc.EnableTracing = false flag.Bool("graphql_introspection", true, "Set to false for no GraphQL schema introspection") - + flag.Bool("ludicrous_mode", false, "Run alpha in ludicrous mode") } func setupCustomTokenizers() { @@ -578,6 +578,7 @@ func run() { SnapshotAfter: Alpha.Conf.GetInt("snapshot_after"), AbortOlderThan: abortDur, StartTime: startTime, + LudicrousMode: Alpha.Conf.GetBool("ludicrous_mode"), } setupCustomTokenizers() diff --git a/dgraph/cmd/zero/raft.go b/dgraph/cmd/zero/raft.go index 31bf27beaa7..0d43255999a 100644 --- a/dgraph/cmd/zero/raft.go +++ b/dgraph/cmd/zero/raft.go @@ -636,6 +636,10 @@ func (n *node) Run() { go n.updateZeroMembershipPeriodically(closer) go n.checkQuorum(closer) go n.RunReadIndexLoop(closer, readStateCh) + if x.WorkerConfig.LudicrousMode { + closer.AddRunning(1) + go x.StoreSync(n.Store, closer) + } // We only stop runReadIndexLoop after the for loop below has finished interacting with it. // That way we know sending to readStateCh will not deadlock. @@ -676,13 +680,13 @@ func (n *node) Run() { } n.SaveToStorage(&rd.HardState, rd.Entries, &rd.Snapshot) timer.Record("disk") - if rd.MustSync { + span.Annotatef(nil, "Saved to storage") + if !x.WorkerConfig.LudicrousMode && rd.MustSync { if err := n.Store.Sync(); err != nil { glog.Errorf("Error while calling Store.Sync: %v", err) } timer.Record("sync") } - span.Annotatef(nil, "Saved to storage") if !raft.IsEmptySnap(rd.Snapshot) { var state pb.MembershipState diff --git a/dgraph/cmd/zero/run.go b/dgraph/cmd/zero/run.go index f52a65f7c3c..9f8bb534c5a 100644 --- a/dgraph/cmd/zero/run.go +++ b/dgraph/cmd/zero/run.go @@ -95,6 +95,7 @@ instances to achieve high-availability. // about the status of supporting annotation logs through the datadog exporter flag.String("datadog.collector", "", "Send opencensus traces to Datadog. As of now, the trace"+ " exporter does not support annotation logs and would discard them.") + flag.Bool("ludicrous_mode", false, "Run zero in ludicrous mode") } func setupListener(addr string, port int, kind string) (listener net.Listener, err error) { @@ -170,6 +171,10 @@ func run() { rebalanceInterval: Zero.Conf.GetDuration("rebalance_interval"), } + x.WorkerConfig = x.WorkerOptions{ + LudicrousMode: Zero.Conf.GetBool("ludicrous_mode"), + } + if opts.numReplicas < 0 || opts.numReplicas%2 == 0 { log.Fatalf("ERROR: Number of replicas must be odd for consensus. Found: %d", opts.numReplicas) diff --git a/edgraph/server.go b/edgraph/server.go index 477845dfc3c..983cd569811 100644 --- a/edgraph/server.go +++ b/edgraph/server.go @@ -271,10 +271,12 @@ func (s *Server) doMutate(ctx context.Context, qc *queryContext, resp *api.Respo if len(qc.gmuList) == 0 { return nil } - if ctx.Err() != nil { return ctx.Err() } + if x.WorkerConfig.LudicrousMode { + qc.req.StartTs = worker.State.GetTimestamp(false) + } start := time.Now() defer func() { @@ -323,6 +325,15 @@ func (s *Server) doMutate(ctx context.Context, qc *queryContext, resp *api.Respo qc.span.Annotatef(nil, "Applying mutations: %+v", m) resp.Txn, err = query.ApplyMutations(ctx, m) qc.span.Annotatef(nil, "Txn Context: %+v. Err=%v", resp.Txn, err) + + if x.WorkerConfig.LudicrousMode { + // Mutations are automatically committed in case of ludicrous mode, so we don't + // need to manually commit. + resp.Txn.Keys = resp.Txn.Keys[:0] + resp.Txn.CommitTs = qc.req.StartTs + return err + } + if !qc.req.CommitNow { if err == zero.ErrConflict { err = status.Error(codes.FailedPrecondition, err.Error()) @@ -794,7 +805,7 @@ func (s *Server) doQuery(ctx context.Context, req *api.Request, doAuth AuthMode) // assigned in the processQuery function called below. defer annotateStartTs(qc.span, qc.req.StartTs) // For mutations, we update the startTs if necessary. - if isMutation && req.StartTs == 0 { + if isMutation && req.StartTs == 0 && !x.WorkerConfig.LudicrousMode { start := time.Now() req.StartTs = worker.State.GetTimestamp(false) qc.latency.AssignTimestamp = time.Since(start) @@ -825,7 +836,12 @@ func processQuery(ctx context.Context, qc *queryContext) (*api.Response, error) if len(qc.req.Query) == 0 { return resp, nil } - + if ctx.Err() != nil { + return resp, ctx.Err() + } + if x.WorkerConfig.LudicrousMode { + qc.req.StartTs = posting.Oracle().MaxAssigned() + } qr := query.Request{ Latency: qc.latency, GqlQuery: &qc.gqlRes, diff --git a/posting/list.go b/posting/list.go index 506393e30df..e4022fbc02c 100644 --- a/posting/list.go +++ b/posting/list.go @@ -398,6 +398,10 @@ func (l *List) addMutationInternal(ctx context.Context, txn *Txn, t *pb.Directed } l.updateMutationLayer(mpost) + if x.WorkerConfig.LudicrousMode { + return nil + } + // We ensure that commit marks are applied to posting lists in the right // order. We can do so by proposing them in the same order as received by the Oracle delta // stream from Zero, instead of in goroutines. diff --git a/posting/writer.go b/posting/writer.go index bdd512fc57f..2d4fa37747d 100644 --- a/posting/writer.go +++ b/posting/writer.go @@ -118,6 +118,10 @@ func (w *TxnWriter) Flush() error { glog.Errorf("Error while calling Sync from TxnWriter.Flush: %v", err) } }() + return w.Wait() +} + +func (w *TxnWriter) Wait() error { w.wg.Wait() select { case err := <-w.che: diff --git a/worker/draft.go b/worker/draft.go index b5af1897e3c..96f9052c88c 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -347,6 +347,14 @@ func (n *node) applyCommitted(proposal *pb.Proposal) error { span.Annotatef(nil, "While applying mutations: %v", err) return err } + if x.WorkerConfig.LudicrousMode { + ts := proposal.Mutations.StartTs + return n.commitOrAbort(proposal.Key, &pb.OracleDelta{ + Txns: []*pb.TxnStatus{ + {StartTs: ts, CommitTs: ts}, + }, + }) + } span.Annotate(nil, "Done") return nil } @@ -554,12 +562,20 @@ func (n *node) commitOrAbort(pkey string, delta *pb.OracleDelta) error { for _, status := range delta.Txns { toDisk(status.StartTs, status.CommitTs) } - if err := writer.Flush(); err != nil { - return errors.Wrapf(err, "while flushing to disk") + if x.WorkerConfig.LudicrousMode { + if err := writer.Wait(); err != nil { + glog.Errorf("Error while waiting to commit: +%v", err) + } + } else { + if err := writer.Flush(); err != nil { + return errors.Wrapf(err, "while flushing to disk") + } } g := groups() - atomic.StoreUint64(&g.deltaChecksum, delta.GroupChecksums[g.groupId()]) + if delta.GroupChecksums != nil && delta.GroupChecksums[g.groupId()] > 0 { + atomic.StoreUint64(&g.deltaChecksum, delta.GroupChecksums[g.groupId()]) + } // Now advance Oracle(), so we can service waiting reads. posting.Oracle().ProcessDelta(delta) @@ -790,6 +806,13 @@ func (n *node) Run() { go n.checkpointAndClose(done) go n.ReportRaftComms() + if x.WorkerConfig.LudicrousMode { + closer := y.NewCloser(2) + defer closer.SignalAndWait() + go x.StoreSync(n.Store, closer) + go x.StoreSync(pstore, closer) + } + applied, err := n.Store.Checkpoint() if err != nil { glog.Errorf("While trying to find raft progress: %v", err) @@ -907,18 +930,18 @@ func (n *node) Run() { // Store the hardstate and entries. Note that these are not CommittedEntries. n.SaveToStorage(&rd.HardState, rd.Entries, &rd.Snapshot) timer.Record("disk") - if rd.MustSync { - if err := n.Store.Sync(); err != nil { - glog.Errorf("Error while calling Store.Sync: %+v", err) - } - timer.Record("sync") - } if span != nil { span.Annotatef(nil, "Saved %d entries. Snapshot, HardState empty? (%v, %v)", len(rd.Entries), raft.IsEmptySnap(rd.Snapshot), raft.IsEmptyHardState(rd.HardState)) } + if !x.WorkerConfig.LudicrousMode && rd.MustSync { + if err := n.Store.Sync(); err != nil { + glog.Errorf("Error while calling Store.Sync: %+v", err) + } + timer.Record("sync") + } // Now schedule or apply committed entries. var proposals []*pb.Proposal @@ -953,6 +976,10 @@ func (n *node) Run() { if span := otrace.FromContext(pctx.Ctx); span != nil { span.Annotate(nil, "Proposal found in CommittedEntries") } + if x.WorkerConfig.LudicrousMode { + // Assuming that there will be no error while proposing. + n.Proposals.Done(proposal.Key, nil) + } } proposal.Index = entry.Index proposals = append(proposals, proposal) diff --git a/x/config.go b/x/config.go index 4f7110abb81..23115dfad96 100644 --- a/x/config.go +++ b/x/config.go @@ -76,6 +76,8 @@ type WorkerOptions struct { ProposedGroupId uint32 // StartTime is the start time of the alpha StartTime time.Time + // LudicrousMode is super fast mode with fewer guarantees. + LudicrousMode bool } // WorkerConfig stores the global instance of the worker package's options. diff --git a/x/x.go b/x/x.go index f9fea0788ee..dde6f6f9e2e 100644 --- a/x/x.go +++ b/x/x.go @@ -889,3 +889,22 @@ func RunVlogGC(store *badger.DB, closer *y.Closer) { } } } + +type DB interface { + Sync() error +} + +func StoreSync(db DB, closer *y.Closer) { + defer closer.Done() + ticker := time.NewTicker(1 * time.Second) + for { + select { + case <-ticker.C: + if err := db.Sync(); err != nil { + glog.Errorf("Error while calling db sync: %+v", err) + } + case <-closer.HasBeenClosed(): + return + } + } +}