Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(GraphQL): Zero HTTP endpoints are now available at GraphQL admin (GRAPHQL-1118) #6649

Merged
merged 18 commits into from
Mar 31, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 16 additions & 39 deletions dgraph/cmd/zero/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ func (st *state) removeNode(w http.ResponseWriter, r *http.Request) {
return
}

if err := st.zero.removeNode(context.Background(), nodeId, uint32(groupId)); err != nil {
if _, err := st.zero.RemoveNode(context.Background(), &pb.RemoveNodeRequest{NodeId: nodeId,
GroupId: uint32(groupId)}); err != nil {
x.SetStatus(w, x.Error, err.Error())
return
}
Expand Down Expand Up @@ -159,50 +160,26 @@ func (st *state) moveTablet(w http.ResponseWriter, r *http.Request) {
groupId, ok := intFromQueryParam(w, r, "group")
if !ok {
w.WriteHeader(http.StatusBadRequest)
x.SetStatus(w, x.ErrorInvalidRequest, fmt.Sprintf(
"Query parameter 'group' should contain a valid integer."))
return
}
dstGroup := uint32(groupId)
knownGroups := st.zero.KnownGroups()
var isKnown bool
for _, grp := range knownGroups {
if grp == dstGroup {
isKnown = true
break
}
}
if !isKnown {
w.WriteHeader(http.StatusBadRequest)
x.SetStatus(w, x.ErrorInvalidRequest, fmt.Sprintf("Group: [%d] is not a known group.",
dstGroup))
return
}

tab := st.zero.ServingTablet(tablet)
if tab == nil {
w.WriteHeader(http.StatusBadRequest)
x.SetStatus(w, x.ErrorInvalidRequest, fmt.Sprintf("No tablet found for: %s", tablet))
return
}

srcGroup := tab.GroupId
if srcGroup == dstGroup {
w.WriteHeader(http.StatusInternalServerError)
x.SetStatus(w, x.ErrorInvalidRequest,
fmt.Sprintf("Tablet: [%s] is already being served by group: [%d]", tablet, srcGroup))
"Query parameter 'group' should contain a valid integer.")
return
}
dstGroup := uint32(groupId)

if err := st.zero.movePredicate(tablet, srcGroup, dstGroup); err != nil {
glog.Errorf("While moving predicate %s from %d -> %d. Error: %v",
tablet, srcGroup, dstGroup, err)
w.WriteHeader(http.StatusInternalServerError)
x.SetStatus(w, x.Error, err.Error())
var resp *pb.Status
var err error
if resp, err = st.zero.MoveTablet(context.Background(), &pb.MoveTabletRequest{Tablet: tablet,
DstGroup: dstGroup}); err != nil {
if resp.GetMsg() == x.ErrorInvalidRequest {
w.WriteHeader(http.StatusBadRequest)
x.SetStatus(w, x.ErrorInvalidRequest, err.Error())
} else {
w.WriteHeader(http.StatusInternalServerError)
x.SetStatus(w, x.Error, err.Error())
}
return
}
_, err := fmt.Fprintf(w, "Predicate: [%s] moved from group [%d] to [%d]",
tablet, srcGroup, dstGroup)
_, err = fmt.Fprint(w, resp.GetMsg())
if err != nil {
glog.Warningf("Error while writing response: %+v", err)
}
Expand Down
5 changes: 2 additions & 3 deletions dgraph/cmd/zero/license_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
package zero

import (
"bytes"
"context"
"io/ioutil"
"math"
Expand Down Expand Up @@ -125,7 +124,7 @@ func (st *state) applyEnterpriseLicense(w http.ResponseWriter, r *http.Request)

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
if err := st.zero.applyLicense(ctx, bytes.NewReader(b)); err != nil {
if _, err := st.zero.ApplyLicense(ctx, &pb.ApplyLicenseRequest{License: b}); err != nil {
w.WriteHeader(http.StatusBadRequest)
x.SetStatus(w, x.ErrorInvalidRequest, err.Error())
return
Expand All @@ -143,7 +142,7 @@ func (s *Server) applyLicenseFile(path string) {
}
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
if err = s.applyLicense(ctx, bytes.NewReader(content)); err != nil {
if _, err = s.ApplyLicense(ctx, &pb.ApplyLicenseRequest{License: content}); err != nil {
glog.Infof("Unable to apply license at %v due to error %v", path, err)
}
}
15 changes: 10 additions & 5 deletions dgraph/cmd/zero/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ instances to achieve high-availability.
flag.StringP("wal", "w", "zw", "Directory storing WAL.")
flag.Duration("rebalance_interval", 8*time.Minute, "Interval for trying a predicate move.")
flag.String("enterprise_license", "", "Path to the enterprise license file.")
flag.Bool("enable_admin_http", true,
"Turn on/off the admin endpoints exposed over HTTP port for Zero.")
}

func setupListener(addr string, port int, kind string) (listener net.Listener, err error) {
Expand Down Expand Up @@ -230,11 +232,14 @@ func run() {
st.serveHTTP(httpListener)

http.HandleFunc("/health", st.pingResponse)
http.HandleFunc("/state", st.getState)
http.HandleFunc("/removeNode", st.removeNode)
http.HandleFunc("/moveTablet", st.moveTablet)
http.HandleFunc("/assign", st.assign)
http.HandleFunc("/enterpriseLicense", st.applyEnterpriseLicense)
// the following endpoints are disabled only if the flag is explicitly set to false
if Zero.Conf.GetBool("enable_admin_http") {
http.HandleFunc("/state", st.getState)
http.HandleFunc("/removeNode", st.removeNode)
http.HandleFunc("/moveTablet", st.moveTablet)
http.HandleFunc("/assign", st.assign)
http.HandleFunc("/enterpriseLicense", st.applyEnterpriseLicense)
}
zpages.Handle(http.DefaultServeMux, "/z")

// This must be here. It does not work if placed before Grpc init.
Expand Down
43 changes: 43 additions & 0 deletions dgraph/cmd/zero/tablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,49 @@ func (s *Server) rebalanceTablets() {
}
}

// MoveTablet can be used to move a tablet to a specific group.
// It takes in tablet and destination group as argument.
// It returns a *pb.Status to be used by the `/moveTablet` HTTP handler in Zero.
func (s *Server) MoveTablet(ctx context.Context, req *pb.MoveTabletRequest) (*pb.Status, error) {
if !s.Node.AmLeader() {
return &pb.Status{Code: 1, Msg: x.Error}, errNotLeader
}

knownGroups := s.KnownGroups()
var isKnown bool
for _, grp := range knownGroups {
if grp == req.DstGroup {
isKnown = true
break
}
}
if !isKnown {
return &pb.Status{Code: 1, Msg: x.ErrorInvalidRequest},
fmt.Errorf("Group: [%d] is not a known group.", req.DstGroup)
}

tab := s.ServingTablet(req.Tablet)
if tab == nil {
return &pb.Status{Code: 1, Msg: x.ErrorInvalidRequest},
fmt.Errorf("No tablet found for: %s", req.Tablet)
}

srcGroup := tab.GroupId
if srcGroup == req.DstGroup {
return &pb.Status{Code: 1, Msg: x.ErrorInvalidRequest},
fmt.Errorf("Tablet: [%s] is already being served by group: [%d]", req.Tablet, srcGroup)
}

if err := s.movePredicate(req.Tablet, srcGroup, req.DstGroup); err != nil {
glog.Errorf("While moving predicate %s from %d -> %d. Error: %v",
req.Tablet, srcGroup, req.DstGroup, err)
return &pb.Status{Code: 1, Msg: x.Error}, err
}

return &pb.Status{Code: 0, Msg: fmt.Sprintf("Predicate: [%s] moved from group [%d] to [%d]",
req.Tablet, srcGroup, req.DstGroup)}, nil
}

// movePredicate is the main entry point for move predicate logic. This Zero must remain the leader
// for the entire duration of predicate move. If this Zero stops being the leader, the final
// proposal of reassigning the tablet to the destination would fail automatically.
Expand Down
46 changes: 27 additions & 19 deletions dgraph/cmd/zero/zero.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package zero

import (
"bytes"
"context"
"io"
"math"
"strings"
"sync"
Expand Down Expand Up @@ -391,26 +391,32 @@ func (s *Server) createProposals(dst *pb.Group) ([]*pb.ZeroProposal, error) {
return res, nil
}

// removeNode removes the given node from the given group.
// RemoveNode removes the given node from the given group.
// It's the user's responsibility to ensure that node doesn't come back again
// before calling the api.
func (s *Server) removeNode(ctx context.Context, nodeId uint64, groupId uint32) error {
if groupId == 0 {
return s.Node.ProposePeerRemoval(ctx, nodeId)
func (s *Server) RemoveNode(ctx context.Context, req *pb.RemoveNodeRequest) (*pb.Status, error) {
if req.GroupId == 0 {
return nil, s.Node.ProposePeerRemoval(ctx, req.NodeId)
}
zp := &pb.ZeroProposal{}
zp.Member = &pb.Member{Id: nodeId, GroupId: groupId, AmDead: true}
if _, ok := s.state.Groups[groupId]; !ok {
return errors.Errorf("No group with groupId %d found", groupId)
zp.Member = &pb.Member{Id: req.NodeId, GroupId: req.GroupId, AmDead: true}
if _, ok := s.state.Groups[req.GroupId]; !ok {
return nil, errors.Errorf("No group with groupId %d found", req.GroupId)
}
if _, ok := s.state.Groups[groupId].Members[nodeId]; !ok {
return errors.Errorf("No node with nodeId %d found in group %d", nodeId, groupId)
if _, ok := s.state.Groups[req.GroupId].Members[req.NodeId]; !ok {
return nil, errors.Errorf("No node with nodeId %d found in group %d", req.NodeId,
req.GroupId)
}
if len(s.state.Groups[groupId].Members) == 1 && len(s.state.Groups[groupId].Tablets) > 0 {
return errors.Errorf("Move all tablets from group %d before removing the last node", groupId)
if len(s.state.Groups[req.GroupId].Members) == 1 && len(s.state.Groups[req.GroupId].
Tablets) > 0 {
return nil, errors.Errorf("Move all tablets from group %d before removing the last node",
req.GroupId)
}
if err := s.Node.proposeAndWait(ctx, zp); err != nil {
return nil, err
}

return s.Node.proposeAndWait(ctx, zp)
return &pb.Status{}, nil
}

// Connect is used by Alpha nodes to connect the very first time with group zero.
Expand Down Expand Up @@ -764,19 +770,21 @@ func (s *Server) latestMembershipState(ctx context.Context) (*pb.MembershipState
return ms, nil
}

func (s *Server) applyLicense(ctx context.Context, signedData io.Reader) error {
func (s *Server) ApplyLicense(ctx context.Context, req *pb.ApplyLicenseRequest) (*pb.Status,
error) {
var l license
signedData := bytes.NewReader(req.License)
if err := verifySignature(signedData, strings.NewReader(publicKey), &l); err != nil {
return errors.Wrapf(err, "while extracting enterprise details from the license")
return nil, errors.Wrapf(err, "while extracting enterprise details from the license")
}

numNodes := len(s.state.GetZeros())
for _, group := range s.state.GetGroups() {
numNodes += len(group.GetMembers())
}
if uint64(numNodes) > l.MaxNodes {
return errors.Errorf("Your license only allows [%v] (Alpha + Zero) nodes. You have: [%v].",
l.MaxNodes, numNodes)
return nil, errors.Errorf("Your license only allows [%v] (Alpha + Zero) nodes. "+
"You have: [%v].", l.MaxNodes, numNodes)
}

proposal := &pb.ZeroProposal{
Expand All @@ -789,8 +797,8 @@ func (s *Server) applyLicense(ctx context.Context, signedData io.Reader) error {

err := s.Node.proposeAndWait(ctx, proposal)
if err != nil {
return errors.Wrapf(err, "while proposing enterprise license state to cluster")
return nil, errors.Wrapf(err, "while proposing enterprise license state to cluster")
}
glog.Infof("Enterprise license proposed to the cluster %+v", proposal)
return nil
return &pb.Status{}, nil
}
4 changes: 2 additions & 2 deletions dgraph/cmd/zero/zero_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ func TestRemoveNode(t *testing.T) {
Groups: map[uint32]*pb.Group{1: {Members: map[uint64]*pb.Member{}}},
},
}
err := server.removeNode(context.TODO(), 3, 1)
_, err := server.RemoveNode(context.TODO(), &pb.RemoveNodeRequest{NodeId: 3, GroupId: 1})
require.Error(t, err)
err = server.removeNode(context.TODO(), 1, 2)
_, err = server.RemoveNode(context.TODO(), &pb.RemoveNodeRequest{NodeId: 1, GroupId: 2})
require.Error(t, err)
}
80 changes: 80 additions & 0 deletions ee/acl/acl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2674,6 +2674,86 @@ func TestGuardianOnlyAccessForAdminEndpoints(t *testing.T) {
}},
guardianData: `{"restore": {"code": "Failure"}}`,
},
{
name: "removeNode has guardian auth",
query: `
mutation {
removeNode(input: {nodeId: 1, groupId: 2147483640}) {
response {
code
}
}
}`,
queryName: "removeNode",
testGuardianAccess: true,
guardianErrs: x.GqlErrorList{{
Message: "resolving removeNode failed because rpc error: code = Unknown desc = No" +
" group with groupId 2147483640 found",
Locations: []x.Location{{Line: 3, Column: 8}},
}},
guardianData: `{"removeNode": null}`,
},
{
name: "moveTablet has guardian auth",
query: `
mutation {
moveTablet(input: {tablet: "non_existent_pred", groupId: 2147483640}) {
response {
code
message
}
}
}`,
queryName: "moveTablet",
testGuardianAccess: true,
guardianErrs: x.GqlErrorList{{
Message: "resolving moveTablet failed because rpc error: code = Unknown desc" +
" = Group: [2147483640] is not a known group.",
Locations: []x.Location{{Line: 3, Column: 8}},
}},
guardianData: `{"moveTablet": null}`,
},
{
name: "assign has guardian auth",
query: `
mutation {
assign(input: {what: UID, num: 0}) {
response {
startId
endId
readOnly
}
}
}`,
queryName: "assign",
testGuardianAccess: true,
guardianErrs: x.GqlErrorList{{
Message: "resolving assign failed because rpc error: code = Unknown desc" +
" = Nothing to be leased",
Locations: []x.Location{{Line: 3, Column: 8}},
}},
guardianData: `{"assign": null}`,
},
{
name: "enterpriseLicense has guardian auth",
query: `
mutation {
enterpriseLicense(input: {license: ""}) {
response {
code
}
}
}`,
queryName: "enterpriseLicense",
testGuardianAccess: true,
guardianErrs: x.GqlErrorList{{
Message: "resolving enterpriseLicense failed because rpc error: code = Unknown" +
" desc = while extracting enterprise details from the license: while decoding" +
" license file: EOF",
Locations: []x.Location{{Line: 3, Column: 8}},
}},
guardianData: `{"enterpriseLicense": null}`,
},
{
name: "getGQLSchema has guardian auth",
query: `
Expand Down
Loading