diff --git a/raft_handlers.go b/raft_handlers.go index efd45ec5c2b..abc7ad843eb 100644 --- a/raft_handlers.go +++ b/raft_handlers.go @@ -38,72 +38,115 @@ func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) { // Response to vote request func VoteHttpHandler(w http.ResponseWriter, req *http.Request) { rvreq := &raft.RequestVoteRequest{} - err := decodeJsonRequest(req, rvreq) - if err == nil { - debugf("[recv] POST %s/vote [%s]", r.url, rvreq.CandidateName) - if resp := r.RequestVote(rvreq); resp != nil { - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(resp) - return - } - } - warnf("[vote] ERROR: %v", err) - w.WriteHeader(http.StatusInternalServerError) + + if _, err := rvreq.Decode(req.Body); err != nil { + http.Error(w, "", http.StatusBadRequest) + warnf("[recv] BADREQUEST %s/vote [%v]", r.url, err) + return + } + + debugf("[recv] POST %s/vote [%s]", r.url, rvreq.CandidateName) + + resp := r.RequestVote(rvreq) + + if resp == nil { + warn("[vote] Error: nil response") + http.Error(w, "", http.StatusInternalServerError) + return + } + + if _, err := resp.Encode(w); err != nil { + warn("[vote] Error: %v", err) + http.Error(w, "", http.StatusInternalServerError) + return + } } // Response to append entries request func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) { aereq := &raft.AppendEntriesRequest{} - err := decodeJsonRequest(req, aereq) - if err == nil { - debugf("[recv] POST %s/log/append [%d]", r.url, len(aereq.Entries)) + if _, err := aereq.Decode(req.Body); err != nil { + http.Error(w, "", http.StatusBadRequest) + warnf("[recv] BADREQUEST %s/log/append [%v]", r.url, err) + return + } + + debugf("[recv] POST %s/log/append [%d]", r.url, len(aereq.Entries)) - r.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength)) + r.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength)) - if resp := r.AppendEntries(aereq); resp != nil { - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(resp) - if !resp.Success { - debugf("[Append Entry] Step back") - } - return - } + resp := r.AppendEntries(aereq) + + if resp == nil { + warn("[ae] Error: nil response") + http.Error(w, "", http.StatusInternalServerError) + return + } + + if !resp.Success { + debugf("[Append Entry] Step back") + } + + if _, err := resp.Encode(w); err != nil { + warn("[ae] Error: %v", err) + http.Error(w, "", http.StatusInternalServerError) + return } - warnf("[Append Entry] ERROR: %v", err) - w.WriteHeader(http.StatusInternalServerError) } // Response to recover from snapshot request func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) { - aereq := &raft.SnapshotRequest{} - err := decodeJsonRequest(req, aereq) - if err == nil { - debugf("[recv] POST %s/snapshot/ ", r.url) - if resp := r.RequestSnapshot(aereq); resp != nil { - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(resp) - return - } - } - warnf("[Snapshot] ERROR: %v", err) - w.WriteHeader(http.StatusInternalServerError) + ssreq := &raft.SnapshotRequest{} + + if _, err := ssreq.Decode(req.Body); err != nil { + http.Error(w, "", http.StatusBadRequest) + warnf("[recv] BADREQUEST %s/snapshot [%v]", r.url, err) + return + } + + debugf("[recv] POST %s/snapshot", r.url) + + resp := r.RequestSnapshot(ssreq) + + if resp == nil { + warn("[ss] Error: nil response") + http.Error(w, "", http.StatusInternalServerError) + return + } + + if _, err := resp.Encode(w); err != nil { + warn("[ss] Error: %v", err) + http.Error(w, "", http.StatusInternalServerError) + return + } } // Response to recover from snapshot request func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) { - aereq := &raft.SnapshotRecoveryRequest{} - err := decodeJsonRequest(req, aereq) - if err == nil { - debugf("[recv] POST %s/snapshotRecovery/ ", r.url) - if resp := r.SnapshotRecoveryRequest(aereq); resp != nil { - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(resp) - return - } - } - warnf("[Snapshot] ERROR: %v", err) - w.WriteHeader(http.StatusInternalServerError) + ssrreq := &raft.SnapshotRecoveryRequest{} + + if _, err := ssrreq.Decode(req.Body); err != nil { + http.Error(w, "", http.StatusBadRequest) + warnf("[recv] BADREQUEST %s/snapshotRecovery [%v]", r.url, err) + return + } + + debugf("[recv] POST %s/snapshotRecovery", r.url) + + resp := r.SnapshotRecoveryRequest(ssrreq) + + if resp == nil { + warn("[ssr] Error: nil response") + http.Error(w, "", http.StatusInternalServerError) + return + } + + if _, err := resp.Encode(w); err != nil { + warn("[ssr] Error: %v", err) + http.Error(w, "", http.StatusInternalServerError) + return + } } // Get the port that listening for etcd connecting of the server diff --git a/transporter.go b/transporter.go index ac533c239ac..f9aed0f77dd 100644 --- a/transporter.go +++ b/transporter.go @@ -19,7 +19,6 @@ package main import ( "bytes" "crypto/tls" - "encoding/json" "fmt" "io" "net" @@ -76,10 +75,12 @@ func dialWithTimeout(network, addr string) (net.Conn, error) { // Sends AppendEntries RPCs to a peer when the server is the leader. func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse { - var aersp *raft.AppendEntriesResponse var b bytes.Buffer - json.NewEncoder(&b).Encode(req) + if _, err := req.Encode(&b); err != nil { + warn("transporter.ae.encoding.error:", err) + return nil + } size := b.Len() @@ -108,6 +109,7 @@ func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.P if ok { thisFollowerStats.Fail() } + return nil } else { if ok { thisFollowerStats.Succ(end.Sub(start)) @@ -119,24 +121,28 @@ func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.P t.CancelWhenTimeout(httpRequest) - aersp = &raft.AppendEntriesResponse{} - if err := json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF { - return aersp + aeresp := &raft.AppendEntriesResponse{} + if _, err = aeresp.Decode(resp.Body); err != nil && err != io.EOF { + warn("transporter.ae.decoding.error:", err) + return nil } - + return aeresp } - return aersp + return nil } // Sends RequestVote RPCs to a peer when the server is the candidate. func (t *transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse { - var rvrsp *raft.RequestVoteResponse var b bytes.Buffer - json.NewEncoder(&b).Encode(req) + + if _, err := req.Encode(&b); err != nil { + warn("transporter.vr.encoding.error:", err) + return nil + } u, _ := nameToRaftURL(peer.Name) - debugf("Send Vote to %s", u) + debugf("Send Vote from %s to %s", server.Name(), u) resp, httpRequest, err := t.Post(fmt.Sprintf("%s/vote", u), &b) @@ -150,28 +156,31 @@ func (t *transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req t.CancelWhenTimeout(httpRequest) rvrsp := &raft.RequestVoteResponse{} - if err := json.NewDecoder(resp.Body).Decode(&rvrsp); err == nil || err == io.EOF { - return rvrsp + if _, err = rvrsp.Decode(resp.Body); err != nil && err != io.EOF { + warn("transporter.vr.decoding.error:", err) + return nil } - + return rvrsp } - return rvrsp + return nil } // Sends SnapshotRequest RPCs to a peer when the server is the candidate. func (t *transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse { - var aersp *raft.SnapshotResponse var b bytes.Buffer - json.NewEncoder(&b).Encode(req) + + if _, err := req.Encode(&b); err != nil { + warn("transporter.ss.encoding.error:", err) + return nil + } u, _ := nameToRaftURL(peer.Name) - debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u, - req.LastTerm, req.LastIndex) + debugf("Send Snapshot Request from %s to %s", server.Name(), u) resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b) if err != nil { - debugf("Cannot send SendSnapshotRequest to %s : %s", u, err) + debugf("Cannot send Snapshot Request to %s : %s", u, err) } if resp != nil { @@ -179,69 +188,68 @@ func (t *transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, t.CancelWhenTimeout(httpRequest) - aersp = &raft.SnapshotResponse{} - if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF { - - return aersp + ssrsp := &raft.SnapshotResponse{} + if _, err = ssrsp.Decode(resp.Body); err != nil && err != io.EOF { + warn("transporter.ss.decoding.error:", err) + return nil } + return ssrsp } - - return aersp + return nil } // Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate. func (t *transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse { - var aersp *raft.SnapshotRecoveryResponse var b bytes.Buffer - json.NewEncoder(&b).Encode(req) + + if _, err := req.Encode(&b); err != nil { + warn("transporter.ss.encoding.error:", err) + return nil + } u, _ := nameToRaftURL(peer.Name) - debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u, - req.LastTerm, req.LastIndex) + debugf("Send Snapshot Recovery from %s to %s", server.Name(), u) - resp, _, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b) + resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b) if err != nil { - debugf("Cannot send SendSnapshotRecoveryRequest to %s : %s", u, err) + debugf("Cannot send Snapshot Recovery to %s : %s", u, err) } if resp != nil { defer resp.Body.Close() - aersp = &raft.SnapshotRecoveryResponse{} - if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF { - return aersp + t.CancelWhenTimeout(httpRequest) + + ssrrsp := &raft.SnapshotRecoveryResponse{} + if _, err = ssrrsp.Decode(resp.Body); err != nil && err != io.EOF { + warn("transporter.ssr.decoding.error:", err) + return nil } + return ssrrsp } + return nil - return aersp } // Send server side POST request func (t *transporter) Post(urlStr string, body io.Reader) (*http.Response, *http.Request, error) { - req, _ := http.NewRequest("POST", urlStr, body) - resp, err := t.client.Do(req) - return resp, req, err - } // Send server side GET request func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error) { - req, _ := http.NewRequest("GET", urlStr, nil) - resp, err := t.client.Do(req) - return resp, req, err } -// Cancel the on fly HTTP transaction when timeout happens +// Cancel the on fly HTTP transaction when timeout happens. func (t *transporter) CancelWhenTimeout(req *http.Request) { go func() { - time.Sleep(ElectionTimeout) + time.Sleep(tranTimeout) t.transport.CancelRequest(req) }() }