Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(lambda): monitor lambda server, fix performance issue, remove lambda logs from extensions #8006

Merged
merged 8 commits into from
Sep 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion compose/compose.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,8 @@ func main() {
"./docker-compose.yml", "name of output file")
cmd.PersistentFlags().BoolVarP(&opts.LocalBin, "local", "l", true,
"use locally-compiled binary if true, otherwise use binary from docker container")
cmd.PersistentFlags().StringVar(&opts.Image, "image", "dgraph/dgraph",
// TODO(Naman): Change this to dgraph/dgraph once the lambda changes are released.
cmd.PersistentFlags().StringVar(&opts.Image, "image", "public.ecr.aws/n1e3y0t3/dgraph-lambda",
NamanJain8 marked this conversation as resolved.
Show resolved Hide resolved
"Docker image for alphas and zeros.")
cmd.PersistentFlags().StringVarP(&opts.Tag, "tag", "t", "latest",
"Docker tag for the --image image. Requires -l=false to use binary from docker container.")
Expand Down
Empty file modified contrib/bench-lambda/load-data.sh
100644 → 100755
Empty file.
4 changes: 2 additions & 2 deletions contrib/bench-lambda/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ func main() {
callDgraph(client, url)
}
if num := atomic.AddInt32(&count, 1); num%1000 == 0 {
elasped := time.Since(start).Round(time.Second).Seconds()
if elasped == 0 {
elasped := time.Since(start).Seconds()
if elasped < 1 {
return
}
fmt.Printf("[Chan: %d] Done %d requests in time: %f QPS: %d\n",
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/alpha/dist/index.js

Large diffs are not rendered by default.

13 changes: 0 additions & 13 deletions dgraph/cmd/alpha/dist/index.js.LICENSE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,6 @@
* MIT Licensed
*/

/*!
* connect-timeout
* Copyright(c) 2014 Jonathan Ong
* Copyright(c) 2014-2015 Douglas Christopher Wilson
* MIT Licensed
*/

/*!
* content-disposition
* Copyright(c) 2014-2017 Douglas Christopher Wilson
Expand Down Expand Up @@ -196,12 +189,6 @@
* MIT Licensed
*/

/*!
* on-headers
* Copyright(c) 2014 Douglas Christopher Wilson
* MIT Licensed
*/

/*!
* parseurl
* Copyright(c) 2014 Jonathan Ong
Expand Down
83 changes: 69 additions & 14 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"math"
"net"
"net/http"
_ "net/http/pprof" // http profiler
_ "net/http/pprof" // http profile
"net/url"
"os"
"os/exec"
Expand Down Expand Up @@ -243,6 +243,8 @@ they form a Raft group and provide synchronous replication.
"Number of JS lambda servers to be launched by alpha.").
Flag("port",
"The starting port at which the lambda server listens.").
Flag("restart-after",
"Restarts the lambda server after given duration of unresponsiveness").
String())

flag.String("cdc", worker.CDCDefaults, z.NewSuperFlagHelp(worker.CDCDefaults).
Expand Down Expand Up @@ -468,11 +470,7 @@ func setupLambdaServer(closer *z.Closer) {
return
}

glog.Infoln("Setting up lambda servers")
dgraphUrl := fmt.Sprintf("http://localhost:%d", httpPort())
// Entry point of the script is index.js.
filename := filepath.Join(x.WorkerConfig.TmpDir, "index.js")

// Copy over all the embedded files to actual files.
dir := "dist"
files, err := jsLambda.ReadDir(dir)
x.Check(err)
Expand All @@ -488,28 +486,84 @@ func setupLambdaServer(closer *z.Closer) {
x.Check(file.Close())
}

type lambda struct {
cmd *exec.Cmd
active bool
lastActive int64
health string
port int
}

lambdas := make([]*lambda, 0, num)
for i := 0; i < num; i++ {
lambdas = append(lambdas, &lambda{
port: port + i,
health: fmt.Sprintf("http://127.0.0.1:%d/health", port+i),
})
}

// Entry point of the script is index.js.
filename := filepath.Join(x.WorkerConfig.TmpDir, "index.js")
dgraphUrl := fmt.Sprintf("http://127.0.0.1:%d", httpPort())

glog.Infoln("Setting up lambda servers")
for i := range lambdas {
go func(i int) {
for {
select {
case <-closer.HasBeenClosed():
break
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will never reach here untill the process is running. I believe thats the expected behavior. Just wanted to point this out

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. This is added so that alpha does not try to restart the lambda while the alpha is being shut down.

default:
cmd := exec.CommandContext(closer.Ctx(), "node", filename)
cmd.Env = append(cmd.Env, fmt.Sprintf("PORT=%d", port+i))
cmd.Env = append(cmd.Env, fmt.Sprintf("PORT=%d", lambdas[i].port))
cmd.Env = append(cmd.Env, fmt.Sprintf("DGRAPH_URL="+dgraphUrl))
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
lambdas[i].cmd = cmd
lambdas[i].lastActive = time.Now().UnixNano()
lambdas[i].active = true
glog.Infof("Running node command: %+v\n", cmd)
err := cmd.Run()
if err != nil {
glog.Errorf("Lambda server idx: %d stopped with error %v", i, err)
if err := cmd.Run(); err != nil {
glog.Errorf("Lambda server at port: %d stopped with error: %v",
lambdas[i].port, err)
}
time.Sleep(2 * time.Second)
}
}
}(i)
}

// Monitor the lambda servers. If the server is unresponsive for more than restart-after time,
// restart it.
client := http.Client{Timeout: 1 * time.Second}
go func() {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
select {
case <-closer.HasBeenClosed():
return
case <-ticker.C:
timestamp := time.Now().UnixNano()
for _, l := range lambdas {
if !l.active {
continue
}
resp, err := client.Get(l.health)
if err != nil || resp.StatusCode != 200 {
if time.Duration(timestamp-l.lastActive) > x.Config.Lambda.RestartAfter {
glog.Warningf("Lambda Server at port: %d not responding."+
" Killed it with err: %v", l.port, l.cmd.Process.Kill())
l.active = false
}
continue
}
resp.Body.Close()
l.lastActive = timestamp
}
}
}
}()
}

func serveGRPC(l net.Listener, tlsCfg *tls.Config, closer *z.Closer) {
Expand Down Expand Up @@ -809,9 +863,10 @@ func run() {
lambda := z.NewSuperFlag(Alpha.Conf.GetString("lambda")).MergeAndCheckDefault(
worker.LambdaDefaults)
x.Config.Lambda = x.LambdaOptions{
Url: lambda.GetString("url"),
Num: lambda.GetUint32("num"),
Port: lambda.GetUint32("port"),
Url: lambda.GetString("url"),
Num: lambda.GetUint32("num"),
Port: lambda.GetUint32("port"),
RestartAfter: lambda.GetDuration("restart-after"),
}
if x.Config.Lambda.Url != "" {
graphqlLambdaUrl, err := url.Parse(x.Config.Lambda.Url)
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/alpha/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func processToFastJSON(q string) string {
log.Fatal(err)
}

buf, _, err := query.ToJson(context.Background(), &l, qr.Subgraphs, nil)
buf, err := query.ToJson(context.Background(), &l, qr.Subgraphs, nil)
if err != nil {
log.Fatal(err)
}
Expand Down
4 changes: 1 addition & 3 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1564,7 +1564,6 @@ func processQuery(ctx context.Context, qc *queryContext) (*api.Response, error)
return resp, errors.Wrap(err, "")
}

var logs []string
if len(er.SchemaNode) > 0 || len(er.Types) > 0 {
if err = authorizeSchemaQuery(ctx, &er); err != nil {
return resp, err
Expand All @@ -1587,7 +1586,7 @@ func processQuery(ctx context.Context, qc *queryContext) (*api.Response, error)
} else if qc.req.RespFormat == api.Request_RDF {
resp.Rdf, err = query.ToRDF(qc.latency, er.Subgraphs)
} else {
resp.Json, logs, err = query.ToJson(ctx, qc.latency, er.Subgraphs, qc.gqlField)
resp.Json, err = query.ToJson(ctx, qc.latency, er.Subgraphs, qc.gqlField)
}
// if err is just some error from GraphQL encoding, then we need to continue the normal
// execution ignoring the error as we still need to assign metrics and latency info to resp.
Expand Down Expand Up @@ -1642,7 +1641,6 @@ func processQuery(ctx context.Context, qc *queryContext) (*api.Response, error)

resp.Metrics = &api.Metrics{
NumUids: er.Metrics,
Logs: logs,
}

var total uint64
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/blevesearch/bleve v1.0.13
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd
github.com/dgraph-io/badger/v3 v3.0.0-20210825061050-c2b23c471f5e
github.com/dgraph-io/dgo/v210 v210.0.0-20210825123656-d3f867fe9cc3
github.com/dgraph-io/dgo/v210 v210.0.0-20210421093152-78a2fece3ebd
github.com/dgraph-io/gqlgen v0.13.2
github.com/dgraph-io/gqlparser/v2 v2.2.0
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ github.com/dgraph-io/badger v1.6.0 h1:DshxFxZWXUcO0xX476VJC07Xsr6ZCBVRHKZ93Oh7Ev
github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4=
github.com/dgraph-io/badger/v3 v3.0.0-20210825061050-c2b23c471f5e h1:lugmhvI1tMal0wKW0g5uxIRHUqXpE5y1lgq/vm/UP/8=
github.com/dgraph-io/badger/v3 v3.0.0-20210825061050-c2b23c471f5e/go.mod h1:dULbq6ehJ5K0cGW/1TQ9iSfUk0gbSiToDWmWmTsJ53E=
github.com/dgraph-io/dgo/v210 v210.0.0-20210825123656-d3f867fe9cc3 h1:/S7Dett03h3+KWRenJeuKE/1jZv76MaB9C1mbR/1Tns=
github.com/dgraph-io/dgo/v210 v210.0.0-20210825123656-d3f867fe9cc3/go.mod h1:dCzdThGGTPYOAuNtrM6BiXj/86voHn7ZzkPL6noXR3s=
github.com/dgraph-io/dgo/v210 v210.0.0-20210421093152-78a2fece3ebd h1:bKck5FnruuJxL1oCmrDSYWRl634IxBwL/IwwWx4UgEM=
github.com/dgraph-io/dgo/v210 v210.0.0-20210421093152-78a2fece3ebd/go.mod h1:dCzdThGGTPYOAuNtrM6BiXj/86voHn7ZzkPL6noXR3s=
github.com/dgraph-io/gqlgen v0.13.2 h1:TNhndk+eHKj5qE7BenKKSYdSIdOGhLqxR1rCiMso9KM=
github.com/dgraph-io/gqlgen v0.13.2/go.mod h1:iCOrOv9lngN7KAo+jMgvUPVDlYHdf7qDwsTkQby2Sis=
github.com/dgraph-io/gqlparser/v2 v2.1.1/go.mod h1:MYS4jppjyx8b9tuUtjV7jU1UFZK6P9fvO8TsIsQtRKU=
Expand Down
5 changes: 3 additions & 2 deletions graphql/e2e/directives/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: "3.5"
services:
zero1:
image: namanj1811/dgraph:lambda
image: dgraph/dgraph:latest
working_dir: /data/zero1
ports:
- 5080
Expand All @@ -17,7 +17,8 @@ services:
command: /gobin/dgraph zero --logtostderr -v=2 --bindall --expose_trace --profile_mode block --block_rate 10 --my=zero1:5080

alpha1:
image: namanj1811/dgraph:lambda
# TODO(Naman): Change this to dgraph/dgraph once the lambda changes are released.
image: public.ecr.aws/n1e3y0t3/dgraph-lambda:latest
working_dir: /data/alpha1
volumes:
- type: bind
Expand Down
5 changes: 3 additions & 2 deletions graphql/e2e/normal/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: "3.5"
services:
zero1:
image: namanj1811/dgraph:lambda
image: dgraph/dgraph:latest
working_dir: /data/zero1
ports:
- 5080
Expand All @@ -17,7 +17,8 @@ services:
command: /gobin/dgraph zero --logtostderr -v=2 --bindall --expose_trace --profile_mode block --block_rate 10 --my=zero1:5080

alpha1:
image: namanj1811/dgraph:lambda
# TODO(Naman): Change this to dgraph/dgraph once the lambda changes are released.
image: public.ecr.aws/n1e3y0t3/dgraph-lambda:latest
working_dir: /data/alpha1
volumes:
- type: bind
Expand Down
5 changes: 0 additions & 5 deletions graphql/resolve/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ func (mr *dgraphResolver) rewriteAndExecute(
},
},
},
Logs: make([]string, 0),
}

emptyResult := func(err error) *Resolved {
Expand Down Expand Up @@ -296,7 +295,6 @@ func (mr *dgraphResolver) rewriteAndExecute(
return emptyResult(gqlErr), resolverFailed
}
ext.TouchedUids += mutResp.GetMetrics().GetNumUids()[touchedUidsKey]
ext.Logs = append(ext.Logs, mutResp.GetMetrics().GetLogs()...)
}

// Parse the result of query.
Expand Down Expand Up @@ -406,7 +404,6 @@ func (mr *dgraphResolver) rewriteAndExecute(
queryErrs = err
}
ext.TouchedUids += qryResp.GetMetrics().GetNumUids()[touchedUidsKey]
ext.Logs = append(ext.Logs, qryResp.GetMetrics().GetLogs()...)
}
}

Expand All @@ -428,7 +425,6 @@ func (mr *dgraphResolver) rewriteAndExecute(
}

ext.TouchedUids += mutResp.GetMetrics().GetNumUids()[touchedUidsKey]
ext.Logs = append(ext.Logs, mutResp.GetMetrics().GetLogs()...)
if req.Query != "" && len(mutResp.GetJson()) != 0 {
if err := json.Unmarshal(mutResp.GetJson(), &result); err != nil {
return emptyResult(
Expand Down Expand Up @@ -525,7 +521,6 @@ func (mr *dgraphResolver) rewriteAndExecute(
}
queryErrs = schema.AppendGQLErrs(queryErrs, err)
ext.TouchedUids += qryResp.GetMetrics().GetNumUids()[touchedUidsKey]
ext.Logs = append(ext.Logs, qryResp.GetMetrics().GetLogs()...)
}
numUids := getNumUids(mutation, mutResp.Uids, result)

Expand Down
2 changes: 0 additions & 2 deletions graphql/resolve/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ func (qr *queryResolver) rewriteAndExecute(ctx context.Context, query schema.Que
}

ext.TouchedUids = resp.GetMetrics().GetNumUids()[touchedUidsKey]
ext.Logs = append(ext.Logs, resp.GetMetrics().GetLogs()...)
resolved := &Resolved{
Data: resp.GetJson(),
Field: query,
Expand Down Expand Up @@ -222,7 +221,6 @@ func (qr *customDQLQueryResolver) rewriteAndExecute(ctx context.Context,
return emptyResult(schema.GQLWrapf(err, "Dgraph query failed"))
}
ext.TouchedUids = resp.GetMetrics().GetNumUids()[touchedUidsKey]
ext.Logs = append(ext.Logs, resp.GetMetrics().GetLogs()...)

var respJson map[string]interface{}
if err = schema.Unmarshal(resp.Json, &respJson); err != nil {
Expand Down
34 changes: 1 addition & 33 deletions graphql/resolve/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"encoding/json"
"net/http"
"reflect"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -723,31 +722,7 @@ func (hr *httpResolver) rewriteAndExecute(ctx context.Context, field schema.Fiel
}
}

// Extract the logs from the lambda response and propagate it via extensions.
var ext *schema.Extensions
if field.HasLambdaDirective() {
res, ok := fieldData.(map[string]interface{})
if !ok {
return &Resolved{
Data: field.NullResponse(),
Field: field,
Err: field.GqlErrorf(nil, "Evaluation of lambda resolver failed because expected"+
" result of lambda to be map type, got: %+v for field: %s within type: %s.",
res, field.Name(), field.GetObjectName()),
}
}
logs, ok := res["logs"].(string)
if !ok {
glog.Errorf("Expected lambda logs of type string, got %v for field: %s",
reflect.TypeOf(res).Name(), field.Name())
}
ext = &schema.Extensions{
Logs: []string{logs},
}
fieldData = res["res"]
}

return DataResultWithExt(field, map[string]interface{}{field.Name(): fieldData}, errs, ext)
return DataResult(field, map[string]interface{}{field.Name(): fieldData}, errs)
}

func (h *httpQueryResolver) Resolve(ctx context.Context, query schema.Query) *Resolved {
Expand Down Expand Up @@ -778,13 +753,6 @@ func DataResult(f schema.Field, data map[string]interface{}, err error) *Resolve
}
}

func DataResultWithExt(
f schema.Field, data map[string]interface{}, err error, ext *schema.Extensions) *Resolved {
r := DataResult(f, data, err)
r.Extensions = ext
return r
}

func newtimer(ctx context.Context, Duration *schema.OffsetDuration) schema.OffsetTimer {
resolveStartTime, _ := ctx.Value(resolveStartTime).(time.Time)
tf := schema.NewOffsetTimerFactory(resolveStartTime)
Expand Down
Loading