Skip to content

Commit

Permalink
some changes for more idiomatic code
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Baker committed Dec 12, 2018
1 parent 33b9f9d commit 3ee692c
Showing 1 changed file with 25 additions and 19 deletions.
44 changes: 25 additions & 19 deletions nomad/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,8 @@ const (

type rpcHandler struct {
*Server
logger log.Logger
gologger *golog.Logger
acceptLoopDelay time.Duration
logger log.Logger
gologger *golog.Logger
}

func newRpcHandler(s *Server) *rpcHandler {
Expand Down Expand Up @@ -85,6 +84,8 @@ type RPCContext struct {
// listen is used to listen for incoming RPC connections
func (r *rpcHandler) listen(ctx context.Context) {
defer close(r.listenerCh)

var acceptLoopDelay time.Duration
for {
select {
case <-ctx.Done():
Expand All @@ -99,41 +100,46 @@ func (r *rpcHandler) listen(ctx context.Context) {
if r.shutdown {
return
}
r.handleAcceptErr(err, ctx)
r.handleAcceptErr(ctx, err, &acceptLoopDelay)
continue
}
// No error, reset loop delay
r.acceptLoopDelay = 0
acceptLoopDelay = 0

go r.handleConn(ctx, conn, &RPCContext{Conn: conn})
metrics.IncrCounter([]string{"nomad", "rpc", "accept_conn"}, 1)
}
}

// Sleep to avoid spamming the log, with a maximum delay according to whether or not the error is temporary
func (r *rpcHandler) handleAcceptErr(err error, ctx context.Context) {
const baseAcceptLoopDelay = 5 * time.Millisecond
const maxAcceptLoopDelay = 5 * time.Second
const maxAcceptLoopDelayTemporaryError = 1 * time.Second
// handleAcceptErr sleeps to avoid spamming the log,
// with a maximum delay according to whether or not the error is temporary
func (r *rpcHandler) handleAcceptErr(ctx context.Context, err error, loopDelay *time.Duration) {
const baseDelay = 5 * time.Millisecond
const maxDelayPerm = 5 * time.Second
const maxDelayTemp = 1 * time.Second

if r.acceptLoopDelay == 0 {
r.acceptLoopDelay = baseAcceptLoopDelay
if *loopDelay == 0 {
*loopDelay = baseDelay
} else {
r.acceptLoopDelay *= 2
*loopDelay *= 2
}

temporaryError := false
if ne, ok := err.(net.Error); ok && ne.Temporary() {
temporaryError = true
}
if temporaryError && r.acceptLoopDelay > maxAcceptLoopDelayTemporaryError {
r.acceptLoopDelay = maxAcceptLoopDelayTemporaryError
} else if r.acceptLoopDelay > maxAcceptLoopDelay {
r.acceptLoopDelay = maxAcceptLoopDelay

if temporaryError && *loopDelay > maxDelayTemp {
*loopDelay = maxDelayTemp
} else if *loopDelay > maxDelayPerm {
*loopDelay = maxDelayPerm
}
r.logger.Error("failed to accept RPC conn", "error", err, "delay", r.acceptLoopDelay)

r.logger.Error("failed to accept RPC conn", "error", err, "delay", *loopDelay)

select {
case <-ctx.Done():
case <-time.After(r.acceptLoopDelay):
case <-time.After(*loopDelay):
}
}

Expand Down

0 comments on commit 3ee692c

Please sign in to comment.