diff --git a/conn/node.go b/conn/node.go index 81a93a390d6..94857712cbf 100644 --- a/conn/node.go +++ b/conn/node.go @@ -23,6 +23,7 @@ import ( "github.com/dgraph-io/dgraph/protos/intern" "github.com/dgraph-io/dgraph/raftwal" "github.com/dgraph-io/dgraph/x" + "github.com/golang/glog" "golang.org/x/net/context" ) @@ -124,9 +125,9 @@ func (n *Node) Raft() raft.Node { // SetConfState would store the latest ConfState generated by ApplyConfChange. func (n *Node) SetConfState(cs *raftpb.ConfState) { + glog.Infof("Setting conf state to %+v\n", cs) n.Lock() defer n.Unlock() - x.Printf("Setting conf state to %+v\n", cs) n._confState = cs } @@ -215,7 +216,7 @@ func (n *Node) PastLife() (idx uint64, restart bool, rerr error) { return } if !raft.IsEmptySnap(sp) { - x.Printf("Found Snapshot, Metadata: %+v\n", sp.Metadata) + glog.Infof("Found Snapshot.Metadata: %+v\n", sp.Metadata) restart = true idx = sp.Metadata.Index } @@ -226,7 +227,7 @@ func (n *Node) PastLife() (idx uint64, restart bool, rerr error) { return } if !raft.IsEmptyHardState(hd) { - x.Printf("Found hardstate: %+v\n", hd) + glog.Infof("Found hardstate: %+v\n", hd) restart = true } @@ -235,7 +236,7 @@ func (n *Node) PastLife() (idx uint64, restart bool, rerr error) { if rerr != nil { return } - x.Printf("Group %d found %d entries\n", n.RaftContext.Group, num) + glog.Infof("Group %d found %d entries\n", n.RaftContext.Group, num) // We'll always have at least one entry. if num > 1 { restart = true @@ -292,7 +293,7 @@ func (n *Node) BatchAndSendMessages() { if exists := failedConn[to]; !exists { // So that we print error only the first time we are not able to connect. // Otherwise, the log is polluted with multiple errors. - x.Printf("No healthy connection found to node Id: %d addr: [%s], err: %v\n", + glog.Warningf("No healthy connection to node Id: %d addr: [%s], err: %v\n", to, addr, err) failedConn[to] = true } @@ -325,7 +326,8 @@ func (n *Node) doSendMessage(pool *Pool, data []byte) { go func() { _, err := c.RaftMessage(ctx, batch) if err != nil { - x.Printf("Error while sending message to node with addr: %s, err: %v\n", pool.Addr, err) + glog.Warningf("Error while sending message to node with addr: %s, err: %v\n", + pool.Addr, err) } ch <- err }() @@ -356,7 +358,7 @@ func (n *Node) Connect(pid uint64, addr string) { // a nil *pool. if addr == n.MyAddr { // TODO: Note this fact in more general peer health info somehow. - x.Printf("Peer %d claims same host as me\n", pid) + glog.Infof("Peer %d claims same host as me\n", pid) n.SetPeer(pid, addr) return } @@ -387,7 +389,7 @@ func (n *Node) proposeConfChange(ctx context.Context, pb raftpb.ConfChange) erro if cctx.Err() != nil { return errInternalRetry } - x.Printf("Error while proposing conf change: %v", err) + glog.Warningf("Error while proposing conf change: %v", err) return err } select { @@ -419,8 +421,8 @@ func (n *Node) AddToCluster(ctx context.Context, pid uint64) error { } err = errInternalRetry for err == errInternalRetry { - x.Printf("Trying to add %d to cluster. Addr: %v\n", pid, addr) - x.Printf("Current confstate at %d: %+v\n", n.Id, n.ConfState()) + glog.Infof("Trying to add %d to cluster. Addr: %v\n", pid, addr) + glog.Infof("Current confstate at %d: %+v\n", n.Id, n.ConfState()) err = n.proposeConfChange(ctx, cc) } return err diff --git a/contrib/integration/acctupsert/main.go b/contrib/integration/acctupsert/main.go index 2b000e1675d..8e605b89882 100644 --- a/contrib/integration/acctupsert/main.go +++ b/contrib/integration/acctupsert/main.go @@ -12,6 +12,7 @@ import ( "encoding/json" "flag" "fmt" + "math/rand" "strings" "sync" "sync/atomic" @@ -33,6 +34,7 @@ var ( firsts = []string{"Paul", "Eric", "Jack", "John", "Martin"} lasts = []string{"Brown", "Smith", "Robinson", "Waters", "Taylor"} ages = []int{20, 25, 30, 35} + types = []string{"CEO", "COO", "CTO", "CFO"} ) type account struct { @@ -139,6 +141,7 @@ func tryUpsert(c *dgo.Dgraph, acc account) error { { get(func: eq(first, %q)) @filter(eq(last, %q) AND eq(age, %d)) { uid + expand(_all_) {uid} } } `, acc.first, acc.last, acc.age) @@ -153,6 +156,8 @@ func tryUpsert(c *dgo.Dgraph, acc account) error { x.Check(json.Unmarshal(resp.GetJson(), &decode)) x.AssertTrue(len(decode.Get) <= 1) + t := rand.Intn(len(types)) + var uid string if len(decode.Get) == 1 { x.AssertTrue(decode.Get[0].Uid != nil) @@ -162,8 +167,9 @@ func tryUpsert(c *dgo.Dgraph, acc account) error { _:acct %q . _:acct %q . _:acct "%d"^^ . - `, - acc.first, acc.last, acc.age, + _:acct <%s> "" . + `, + acc.first, acc.last, acc.age, types[t], ) mu := &api.Mutation{SetNquads: []byte(nqs)} assigned, err := txn.Mutate(ctx, mu) @@ -172,7 +178,6 @@ func tryUpsert(c *dgo.Dgraph, acc account) error { } uid = assigned.GetUids()["acct"] x.AssertTrue(uid != "") - } nq := fmt.Sprintf(` diff --git a/dgraph/cmd/root.go b/dgraph/cmd/root.go index 1b6a4b9147d..ac22f1b29be 100644 --- a/dgraph/cmd/root.go +++ b/dgraph/cmd/root.go @@ -8,6 +8,7 @@ package cmd import ( + goflag "flag" "fmt" "os" @@ -19,6 +20,7 @@ import ( "github.com/dgraph-io/dgraph/dgraph/cmd/zero" "github.com/dgraph-io/dgraph/x" "github.com/spf13/cobra" + flag "github.com/spf13/pflag" "github.com/spf13/viper" ) @@ -40,6 +42,7 @@ cluster. // Execute adds all child commands to the root command and sets flags appropriately. // This is called by main.main(). It only needs to happen once to the rootCmd. func Execute() { + goflag.Parse() if err := RootCmd.Execute(); err != nil { fmt.Println(err) os.Exit(1) @@ -61,6 +64,7 @@ func init() { RootCmd.PersistentFlags().Bool("expose_trace", false, "Allow trace endpoint to be accessible from remote") rootConf.BindPFlags(RootCmd.PersistentFlags()) + flag.CommandLine.AddGoFlagSet(goflag.CommandLine) var subcommands = []*x.SubCommand{ &bulk.Bulk, &live.Live, &server.Server, &zero.Zero, &version.Version, &debug.Debug, diff --git a/edgraph/server.go b/edgraph/server.go index 86083ddd17b..a3c7b93caf9 100644 --- a/edgraph/server.go +++ b/edgraph/server.go @@ -35,6 +35,7 @@ import ( "github.com/dgraph-io/dgraph/types/facets" "github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/x" + "github.com/golang/glog" "github.com/pkg/errors" ) @@ -235,6 +236,10 @@ func (s *ServerState) getTimestamp() uint64 { } func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, error) { + if glog.V(2) { + glog.Infof("Received ALTER op: %+v", op) + defer glog.Infof("ALTER op: %+v done", op) + } if op.Schema == "" && op.DropAttr == "" && !op.DropAll { // Must have at least one field set. This helps users if they attempt // to set a field but use the wrong name (could be decoded from JSON). @@ -397,6 +402,9 @@ func (s *Server) Mutate(ctx context.Context, mu *api.Mutation) (resp *api.Assign // This method is used to execute the query and return the response to the // client as a protocol buffer message. func (s *Server) Query(ctx context.Context, req *api.Request) (resp *api.Response, err error) { + if glog.V(3) { + glog.Infof("Got a query: %+v", req) + } if err := x.HealthCheck(); err != nil { if tr, ok := trace.FromContext(ctx); ok { tr.LazyPrintf("Request rejected %v", err) diff --git a/posting/list.go b/posting/list.go index e8e96cfede8..32d60ccaa45 100644 --- a/posting/list.go +++ b/posting/list.go @@ -914,6 +914,8 @@ func (l *List) Value(readTs uint64) (rval types.Val, rerr error) { // If list consists of one or more languages, first available value is returned; if no language // from list match the values, processing is the same as for empty list. func (l *List) ValueFor(readTs uint64, langs []string) (rval types.Val, rerr error) { + l.RLock() // All public methods should acquire locks, while private ones should assert them. + defer l.RUnlock() p, err := l.postingFor(readTs, langs) if err != nil { return rval, err @@ -922,8 +924,7 @@ func (l *List) ValueFor(readTs uint64, langs []string) (rval types.Val, rerr err } func (l *List) postingFor(readTs uint64, langs []string) (p *intern.Posting, rerr error) { - l.RLock() - defer l.RUnlock() + l.AssertRLock() // Avoid recursive locking by asserting a lock here. return l.postingForLangs(readTs, langs) } diff --git a/raftwal/storage.go b/raftwal/storage.go index b3f5760b24d..b016c2a1215 100644 --- a/raftwal/storage.go +++ b/raftwal/storage.go @@ -17,6 +17,7 @@ import ( "github.com/coreos/etcd/raft" pb "github.com/coreos/etcd/raft/raftpb" "github.com/dgraph-io/badger" + "github.com/golang/glog" "golang.org/x/net/trace" "github.com/dgraph-io/dgraph/x" @@ -65,13 +66,27 @@ func (u *txnUnifier) Cancel() { type localCache struct { sync.RWMutex - snap pb.Snapshot + firstIndex uint64 + snap pb.Snapshot +} + +func (c *localCache) setFirst(first uint64) { + c.Lock() + defer c.Unlock() + c.firstIndex = first +} + +func (c *localCache) first() uint64 { + c.RLock() + defer c.RUnlock() + return c.firstIndex } func (c *localCache) setSnapshot(s pb.Snapshot) { c.Lock() defer c.Unlock() c.snap = s + c.firstIndex = 0 // Reset firstIndex. } func (c *localCache) snapshot() pb.Snapshot { @@ -240,7 +255,16 @@ func (w *DiskStorage) FirstIndex() (uint64, error) { if !raft.IsEmptySnap(snap) { return snap.Metadata.Index + 1, nil } + if first := w.cache.first(); first > 0 { + return first, nil + } index, err := w.seekEntry(nil, 0, false) + if err == nil { + glog.V(2).Infof("Setting first index: %d", index+1) + w.cache.setFirst(index + 1) + } else { + glog.Errorf("While seekEntry. Error: %v", err) + } return index + 1, err } @@ -549,11 +573,13 @@ func (w *DiskStorage) Entries(lo, hi, maxSize uint64) (es []pb.Entry, rerr error } func (w *DiskStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) error { + glog.V(2).Infof("CreateSnapshot i=%d, cs=%+v", i, cs) first, err := w.FirstIndex() if err != nil { return err } if i < first { + glog.Errorf("i=%d= snap.Index { + log := fmt.Sprintf("Skipping snapshot at %d, because found one at %d", + snap.Index, existing.Metadata.Index) + n.elog.Printf(log) + glog.Info(log) + return nil + } n.elog.Printf("Creating snapshot: %+v", snap) - x.Printf("Creating snapshot at index: %d. ReadTs: %d.\n", snap.Index, snap.ReadTs) + glog.Infof("Creating snapshot at index: %d. ReadTs: %d.\n", snap.Index, snap.ReadTs) data, err := snap.Marshal() x.Check(err) - // We can now discard all invalid versions of keys below this ts. + // We can now discard all invalid versions of keys below this ts. pstore.SetDiscardTs(snap.ReadTs) return n.Store.CreateSnapshot(snap.Index, n.ConfState(), data) @@ -571,6 +584,7 @@ func (n *node) Run() { done := make(chan struct{}) go func() { <-n.closer.HasBeenClosed() + glog.Infof("Stopping node.Run") if peerId, has := groups().MyPeer(); has && n.AmLeader() { n.Raft().TransferLeadership(n.ctx, Config.RaftId, peerId) time.Sleep(time.Second) // Let transfer happen. @@ -950,6 +964,13 @@ func (n *node) InitAndStartNode() { sp, err := n.Store.Snapshot() x.Checkf(err, "Unable to get existing snapshot") if !raft.IsEmptySnap(sp) { + // It is important that we pick up the conf state here. + // Otherwise, we'll lose the store conf state, and it would get + // overwritten with an empty state when a new snapshot is taken. + // This causes a node to just hang on restart, because it finds a + // zero-member Raft group. + n.SetConfState(&sp.Metadata.ConfState) + members := groups().members(n.gid) for _, id := range sp.Metadata.ConfState.Nodes { m, ok := members[id] @@ -959,8 +980,9 @@ func (n *node) InitAndStartNode() { } } n.SetRaft(raft.RestartNode(n.Cfg)) + glog.V(2).Infoln("Restart node complete") } else { - x.Printf("New Node for group: %d\n", n.gid) + glog.Infof("New Node for group: %d\n", n.gid) if _, hasPeer := groups().MyPeer(); hasPeer { // Get snapshot before joining peers as it can take time to retrieve it and we dont // want the quorum to be inactive when it happens. diff --git a/x/lock.go b/x/lock.go index 3acc7f5f90d..be243d34853 100644 --- a/x/lock.go +++ b/x/lock.go @@ -14,6 +14,7 @@ import ( // SafeMutex can be used in place of sync.RWMutex type SafeMutex struct { + // m deadlock.RWMutex // Very useful for detecting locking issues. m sync.RWMutex wait *SafeWait writer int32