Skip to content

Commit

Permalink
GraphQL Admin API: Support export, draining, shutdown and setting lru…
Browse files Browse the repository at this point in the history
…_mb operations (#4739)

This PR adds support for doing export, shutdown, setting `lru_mb` and draining using the GraphQL Admin API. The current `/admin` HTTP endpoints have been kept as before.
  • Loading branch information
pawanrawal authored Feb 7, 2020
1 parent 2eca86c commit c39bd3f
Show file tree
Hide file tree
Showing 11 changed files with 458 additions and 18 deletions.
15 changes: 6 additions & 9 deletions dgraph/cmd/alpha/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func shutDownHandler(w http.ResponseWriter, r *http.Request) {
return
}

close(shutdownCh)
close(worker.ShutdownCh)
w.Header().Set("Content-Type", "application/json")
x.Check2(w.Write([]byte(`{"code": "Success", "message": "Server is shutting down"}`)))
}
Expand Down Expand Up @@ -135,22 +135,19 @@ func memoryLimitPutHandler(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if memoryMB < worker.MinAllottedMemory {

if err := worker.UpdateLruMb(memoryMB); err != nil {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "lru_mb must be at least %.0f\n", worker.MinAllottedMemory)
fmt.Fprint(w, err.Error())
return
}

posting.Config.Mu.Lock()
posting.Config.AllottedMemory = memoryMB
posting.Config.Mu.Unlock()
w.WriteHeader(http.StatusOK)
}

func memoryLimitGetHandler(w http.ResponseWriter, r *http.Request) {
posting.Config.Mu.Lock()
posting.Config.Lock()
memoryMB := posting.Config.AllottedMemory
posting.Config.Mu.Unlock()
posting.Config.Unlock()

if _, err := fmt.Fprintln(w, memoryMB); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand Down
10 changes: 4 additions & 6 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ func setupServer(closer *y.Closer) {

go func() {
defer wg.Done()
<-shutdownCh
<-worker.ShutdownCh

// Stops grpc/http servers; Already accepted connections are not closed.
if err := grpcListener.Close(); err != nil {
Expand All @@ -507,8 +507,6 @@ func setupServer(closer *y.Closer) {
wg.Wait()
}

var shutdownCh chan struct{}

func run() {
bindall = Alpha.Conf.GetBool("bindall")

Expand Down Expand Up @@ -618,7 +616,7 @@ func run() {

// setup shutdown os signal handler
sdCh := make(chan os.Signal, 3)
shutdownCh = make(chan struct{})
worker.ShutdownCh = make(chan struct{})

defer func() {
signal.Stop(sdCh)
Expand All @@ -630,9 +628,9 @@ func run() {
var numShutDownSig int
for range sdCh {
select {
case <-shutdownCh:
case <-worker.ShutdownCh:
default:
close(shutdownCh)
close(worker.ShutdownCh)
}
numShutDownSig++
glog.Infoln("Caught Ctrl-C. Terminating now (this may take a few seconds)...")
Expand Down
104 changes: 104 additions & 0 deletions graphql/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package admin

import (
"bytes"
"context"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -85,6 +86,10 @@ const (
schema: String!
}
input ExportInput {
format: String
}
input BackupInput {
destination: String!
accessKey: String
Expand All @@ -99,6 +104,30 @@ const (
message: String
}
type ExportPayload {
response: Response
}
input DrainingInput {
enable: Boolean
}
type DrainingPayload {
response: Response
}
type ShutdownPayload {
response: Response
}
input ConfigInput {
lruMb: Float
}
type ConfigPayload {
response: Response
}
type BackupPayload {
response: Response
}
Expand All @@ -110,6 +139,10 @@ const (
type Mutation {
updateGQLSchema(input: UpdateGQLSchemaInput!) : UpdateGQLSchemaPayload
export(input: ExportInput!): ExportPayload
draining(input: DrainingInput!): DrainingPayload
shutdown: ShutdownPayload
config(input: ConfigInput!): ConfigPayload
backup(input: BackupInput!) : BackupPayload
}
`
Expand Down Expand Up @@ -276,6 +309,50 @@ func newAdminResolverFactory() resolve.ResolverFactory {
return &resolve.Resolved{Err: errors.Errorf(errMsgServerNotReady)}
})
}).
WithMutationResolver("export", func(m schema.Mutation) resolve.MutationResolver {
export := &exportResolver{}

// export implements the mutation rewriter, executor and query executor hence its passed
// thrice here.
return resolve.NewMutationResolver(
export,
export,
export,
resolve.StdMutationCompletion(m.ResponseName()))
}).
WithMutationResolver("draining", func(m schema.Mutation) resolve.MutationResolver {
draining := &drainingResolver{}

// draining implements the mutation rewriter, executor and query executor hence its
// passed thrice here.
return resolve.NewMutationResolver(
draining,
draining,
draining,
resolve.StdMutationCompletion(m.ResponseName()))
}).
WithMutationResolver("shutdown", func(m schema.Mutation) resolve.MutationResolver {
shutdown := &shutdownResolver{}

// shutdown implements the mutation rewriter, executor and query executor hence its
// passed thrice here.
return resolve.NewMutationResolver(
shutdown,
shutdown,
shutdown,
resolve.StdMutationCompletion(m.ResponseName()))
}).
WithMutationResolver("config", func(m schema.Mutation) resolve.MutationResolver {
config := &configResolver{}

// config implements the mutation rewriter, executor and query executor hence its
// passed thrice here.
return resolve.NewMutationResolver(
config,
config,
config,
resolve.StdMutationCompletion(m.ResponseName()))
}).
WithMutationResolver("backup", func(m schema.Mutation) resolve.MutationResolver {
backup := &backupResolver{}

Expand Down Expand Up @@ -438,3 +515,30 @@ func (as *adminServer) resetSchema(gqlSchema schema.Schema) {

as.status = healthy
}

func writeResponse(m schema.Mutation, code, message string) []byte {
var buf bytes.Buffer

x.Check2(buf.WriteString(`{ "`))
x.Check2(buf.WriteString(m.SelectionSet()[0].ResponseName() + `": [{`))

for i, sel := range m.SelectionSet()[0].SelectionSet() {
var val string
switch sel.Name() {
case "code":
val = code
case "message":
val = message
}
if i != 0 {
x.Check2(buf.WriteString(","))
}
x.Check2(buf.WriteString(`"`))
x.Check2(buf.WriteString(sel.ResponseName()))
x.Check2(buf.WriteString(`":`))
x.Check2(buf.WriteString(`"` + val + `"`))
}
x.Check2(buf.WriteString("}]}"))

return buf.Bytes()
}
83 changes: 83 additions & 0 deletions graphql/admin/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2020 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package admin

import (
"context"
"encoding/json"

dgoapi "github.com/dgraph-io/dgo/v2/protos/api"
"github.com/dgraph-io/dgraph/gql"
"github.com/dgraph-io/dgraph/graphql/schema"
"github.com/dgraph-io/dgraph/worker"
"github.com/golang/glog"
)

type configResolver struct {
mutation schema.Mutation
}

type configInput struct {
LruMB float64
}

func (cr *configResolver) Rewrite(
m schema.Mutation) (*gql.GraphQuery, []*dgoapi.Mutation, error) {
glog.Info("Got config request through GraphQL admin API")

cr.mutation = m
input, err := getConfigInput(m)
if err != nil {
return nil, nil, err
}

err = worker.UpdateLruMb(input.LruMB)
return nil, nil, err
}

func (cr *configResolver) FromMutationResult(
mutation schema.Mutation,
assigned map[string]string,
result map[string]interface{}) (*gql.GraphQuery, error) {

return nil, nil
}

func (cr *configResolver) Mutate(
ctx context.Context,
query *gql.GraphQuery,
mutations []*dgoapi.Mutation) (map[string]string, map[string]interface{}, error) {

return nil, nil, nil
}

func (cr *configResolver) Query(ctx context.Context, query *gql.GraphQuery) ([]byte, error) {
buf := writeResponse(cr.mutation, "Success", "Config updated successfully")
return buf, nil
}

func getConfigInput(m schema.Mutation) (*configInput, error) {
inputArg := m.ArgValue(schema.InputArgName)
inputByts, err := json.Marshal(inputArg)
if err != nil {
return nil, schema.GQLWrapf(err, "couldn't get input argument")
}

var input configInput
err = json.Unmarshal(inputByts, &input)
return &input, schema.GQLWrapf(err, "couldn't get input argument")
}
87 changes: 87 additions & 0 deletions graphql/admin/draining.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2020 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package admin

import (
"context"
"encoding/json"
"fmt"

dgoapi "github.com/dgraph-io/dgo/v2/protos/api"
"github.com/dgraph-io/dgraph/gql"
"github.com/dgraph-io/dgraph/graphql/schema"
"github.com/dgraph-io/dgraph/x"
"github.com/golang/glog"
)

type drainingResolver struct {
mutation schema.Mutation
enable bool
}

type drainingInput struct {
Enable bool
}

func (dr *drainingResolver) Rewrite(
m schema.Mutation) (*gql.GraphQuery, []*dgoapi.Mutation, error) {
glog.Info("Got draining request through GraphQL admin API")

dr.mutation = m
input, err := getDrainingInput(m)
if err != nil {
return nil, nil, err
}

dr.enable = input.Enable
x.UpdateDrainingMode(input.Enable)
return nil, nil, nil
}

func (dr *drainingResolver) FromMutationResult(
mutation schema.Mutation,
assigned map[string]string,
result map[string]interface{}) (*gql.GraphQuery, error) {

return nil, nil
}

func (dr *drainingResolver) Mutate(
ctx context.Context,
query *gql.GraphQuery,
mutations []*dgoapi.Mutation) (map[string]string, map[string]interface{}, error) {

return nil, nil, nil
}

func (dr *drainingResolver) Query(ctx context.Context, query *gql.GraphQuery) ([]byte, error) {
buf := writeResponse(dr.mutation, "Success",
fmt.Sprintf("draining mode has been set to %v", dr.enable))
return buf, nil
}

func getDrainingInput(m schema.Mutation) (*drainingInput, error) {
inputArg := m.ArgValue(schema.InputArgName)
inputByts, err := json.Marshal(inputArg)
if err != nil {
return nil, schema.GQLWrapf(err, "couldn't get input argument")
}

var input drainingInput
err = json.Unmarshal(inputByts, &input)
return &input, schema.GQLWrapf(err, "couldn't get input argument")
}
Loading

0 comments on commit c39bd3f

Please sign in to comment.