Skip to content

Commit

Permalink
Implement improved signal catching in services that already use it (#…
Browse files Browse the repository at this point in the history
…2333)

Implements a less RPC focused signal catch/shutdown method. Certain things that probably could also use this (i.e. `ocsp-updater`) haven't been given it as they would require rather substantial changes to allow for a graceful shutdown approach.

Fixes #2298.
  • Loading branch information
Roland Bracewell Shoemaker authored and cpu committed Nov 19, 2016
1 parent d52c13f commit 595204b
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 30 deletions.
11 changes: 11 additions & 0 deletions cmd/boulder-ca/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/cloudflare/cfssl/helpers"
"github.com/jmhodges/clock"
"github.com/letsencrypt/pkcs11key"
"google.golang.org/grpc"

"github.com/letsencrypt/boulder/ca"
caPB "github.com/letsencrypt/boulder/ca/proto"
Expand Down Expand Up @@ -176,6 +177,7 @@ func main() {
cmd.FailOnError(err, "Failed to create Publisher client")
}

var grpcSrv *grpc.Server
if c.CA.GRPC != nil {
s, l, err := bgrpc.NewServer(c.CA.GRPC, scope)
cmd.FailOnError(err, "Unable to setup CA gRPC server")
Expand All @@ -185,10 +187,19 @@ func main() {
err = s.Serve(l)
cmd.FailOnError(err, "CA gRPC service failed")
}()
grpcSrv = s
}

cas, err := rpc.NewAmqpRPCServer(amqpConf, c.CA.MaxConcurrentRPCServerRequests, scope, logger)
cmd.FailOnError(err, "Unable to create CA RPC server")

go cmd.CatchSignals(logger, func() {
cas.Stop()
if grpcSrv != nil {
grpcSrv.GracefulStop()
}
})

err = rpc.NewCertificateAuthorityServer(cas, cai)
cmd.FailOnError(err, "Failed to create Certificate Authority RPC server")

Expand Down
11 changes: 11 additions & 0 deletions cmd/boulder-publisher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"

ct "github.com/google/certificate-transparency/go"
"google.golang.org/grpc"

"github.com/letsencrypt/boulder/cmd"
"github.com/letsencrypt/boulder/core"
Expand Down Expand Up @@ -82,6 +83,7 @@ func main() {
scope,
sa)

var grpcSrv *grpc.Server
if c.Publisher.GRPC != nil {
s, l, err := bgrpc.NewServer(c.Publisher.GRPC, scope)
cmd.FailOnError(err, "Failed to setup gRPC server")
Expand All @@ -91,10 +93,19 @@ func main() {
err = s.Serve(l)
cmd.FailOnError(err, "gRPC service failed")
}()
grpcSrv = s
}

pubs, err := rpc.NewAmqpRPCServer(amqpConf, c.Publisher.MaxConcurrentRPCServerRequests, scope, logger)
cmd.FailOnError(err, "Unable to create Publisher RPC server")

go cmd.CatchSignals(logger, func() {
pubs.Stop()
if grpcSrv != nil {
grpcSrv.GracefulStop()
}
})

err = rpc.NewPublisherServer(pubs, pubi)
cmd.FailOnError(err, "Unable to setup Publisher RPC server")

Expand Down
3 changes: 3 additions & 0 deletions cmd/boulder-ra/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ func main() {

ras, err := rpc.NewAmqpRPCServer(amqpConf, c.RA.MaxConcurrentRPCServerRequests, scope, logger)
cmd.FailOnError(err, "Unable to create RA RPC server")

go cmd.CatchSignals(logger, ras.Stop)

err = rpc.NewRegistrationAuthorityServer(ras, rai, logger)
cmd.FailOnError(err, "Unable to setup RA RPC server")

Expand Down
2 changes: 2 additions & 0 deletions cmd/boulder-sa/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ func main() {
sas, err := rpc.NewAmqpRPCServer(amqpConf, c.SA.MaxConcurrentRPCServerRequests, scope, logger)
cmd.FailOnError(err, "Unable to create SA RPC server")

go cmd.CatchSignals(logger, sas.Stop)

err = rpc.NewStorageAuthorityServer(sas, sai)
cmd.FailOnError(err, "Unable to setup SA RPC server")

Expand Down
11 changes: 11 additions & 0 deletions cmd/boulder-va/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/jmhodges/clock"
"google.golang.org/grpc"

"github.com/letsencrypt/boulder/bdns"
"github.com/letsencrypt/boulder/cdr"
Expand Down Expand Up @@ -147,6 +148,7 @@ func main() {
logger)

amqpConf := c.VA.AMQP
var grpcSrv *grpc.Server
if c.VA.GRPC != nil {
s, l, err := bgrpc.NewServer(c.VA.GRPC, scope)
cmd.FailOnError(err, "Unable to setup VA gRPC server")
Expand All @@ -156,10 +158,19 @@ func main() {
err = s.Serve(l)
cmd.FailOnError(err, "VA gRPC service failed")
}()
grpcSrv = s
}

vas, err := rpc.NewAmqpRPCServer(amqpConf, c.VA.MaxConcurrentRPCServerRequests, scope, logger)
cmd.FailOnError(err, "Unable to create VA RPC server")

go cmd.CatchSignals(logger, func() {
vas.Stop()
if grpcSrv != nil {
grpcSrv.GracefulStop()
}
})

err = rpc.NewValidationAuthorityServer(vas, vai)
cmd.FailOnError(err, "Unable to setup VA RPC server")

Expand Down
7 changes: 6 additions & 1 deletion cmd/boulder-wfe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ func main() {
KillTimeout: c.WFE.ShutdownKillTimeout.Duration,
Stats: metrics.NewFBAdapter(scope, clock.Default()),
}
err = httpdown.ListenAndServe(srv, hd)
hdSrv, err := hd.ListenAndServe(srv)
cmd.FailOnError(err, "Error starting HTTP server")

go cmd.CatchSignals(logger, func() { _ = hdSrv.Stop() })

forever := make(chan struct{}, 1)
<-forever
}
13 changes: 7 additions & 6 deletions cmd/caa-checker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strings"
"sync"

"github.com/cactus/go-statsd-client/statsd"
"github.com/jmhodges/clock"
"github.com/miekg/dns"
"golang.org/x/net/context"
Expand Down Expand Up @@ -205,14 +204,14 @@ func (ccs *caaCheckerServer) ValidForIssuance(ctx context.Context, check *pb.Che
}

type config struct {
GRPC cmd.GRPCServerConfig
GRPC cmd.GRPCServerConfig
Statsd cmd.StatsdConfig
Syslog cmd.SyslogConfig

DebugAddr string `yaml:"debug-addr"`
DNSResolver string `yaml:"dns-resolver"`
DNSNetwork string `yaml:"dns-network"`
DNSTimeout cmd.ConfigDuration `yaml:"dns-timeout"`
StatsdServer string `yaml:"statsd-server"`
StatsdPrefix string `yaml:"statsd-prefix"`
CAASERVFAILExceptions string `yaml:"caa-servfail-exceptions"`
}

Expand All @@ -226,9 +225,10 @@ func main() {
err = yaml.Unmarshal(configBytes, &c)
cmd.FailOnError(err, fmt.Sprintf("Failed to parse configuration file from '%s'", *configPath))

stats, err := statsd.NewClient(c.StatsdServer, c.StatsdPrefix)
cmd.FailOnError(err, "Failed to create StatsD client")
stats, logger := cmd.StatsAndLogging(c.Statsd, c.Syslog)
scope := metrics.NewStatsdScope(stats, "CAAService")
defer logger.AuditPanic()
logger.Info(cmd.VersionString("CAA-Checker"))

caaSERVFAILExceptions, err := bdns.ReadHostList(c.CAASERVFAILExceptions)
cmd.FailOnError(err, "Couldn't read CAASERVFAILExceptions file")
Expand All @@ -247,6 +247,7 @@ func main() {
ccs := &caaCheckerServer{resolver, scope}
pb.RegisterCAACheckerServer(s, ccs)

go cmd.CatchSignals(logger, s.GracefulStop)
go cmd.DebugServer(c.DebugAddr)

err = s.Serve(l)
Expand Down
7 changes: 6 additions & 1 deletion cmd/ocsp-responder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,13 @@ as generated by Boulder's single-ocsp command.
KillTimeout: killTimeout,
Stats: metrics.NewFBAdapter(scope, clock.Default()),
}
err = httpdown.ListenAndServe(srv, hd)
hdSrv, err := hd.ListenAndServe(srv)
cmd.FailOnError(err, "Error starting HTTP server")

go cmd.CatchSignals(logger, func() { _ = hdSrv.Stop() })

forever := make(chan struct{}, 1)
<-forever
}

func mux(scope metrics.Scope, responderPath string, source cfocsp.Source) http.Handler {
Expand Down
27 changes: 27 additions & 0 deletions cmd/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ import (
"net/http"
_ "net/http/pprof" // HTTP performance profiling, added transparently to HTTP APIs
"os"
"os/signal"
"path"
"runtime"
"syscall"
"time"

cfsslLog "github.com/cloudflare/cfssl/log"
Expand Down Expand Up @@ -225,3 +227,28 @@ func ReadConfigFile(filename string, out interface{}) error {
func VersionString(name string) string {
return fmt.Sprintf("Versions: %s=(%s %s) Golang=(%s) BuildHost=(%s)", name, core.GetBuildID(), core.GetBuildTime(), runtime.Version(), core.GetBuildHost())
}

var signalToName = map[os.Signal]string{
syscall.SIGTERM: "SIGTERM",
syscall.SIGINT: "SIGINT",
syscall.SIGHUP: "SIGHUP",
}

// CatchSignals catches SIGTERM, SIGINT, SIGHUP and executes a callback
// method before exiting
func CatchSignals(logger blog.Logger, callback func()) {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM)
signal.Notify(sigChan, syscall.SIGINT)
signal.Notify(sigChan, syscall.SIGHUP)

sig := <-sigChan
logger.Info(fmt.Sprintf("Caught %s", signalToName[sig]))

if callback != nil {
callback()
}

logger.Info("Exiting")
os.Exit(0)
}
22 changes: 0 additions & 22 deletions rpc/amqp-rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@ import (
"fmt"
"io/ioutil"
"os"
"os/signal"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/jmhodges/clock"
Expand Down Expand Up @@ -406,8 +404,6 @@ func (rpc *AmqpRPCServer) Start(c *cmd.AMQPConfig) error {
rpc.connected = true
rpc.mu.Unlock()

go rpc.catchSignals()

for {
select {
case msg, ok := <-rpc.connection.messages():
Expand Down Expand Up @@ -448,24 +444,6 @@ func (rpc *AmqpRPCServer) Start(c *cmd.AMQPConfig) error {
}
}

var signalToName = map[os.Signal]string{
syscall.SIGTERM: "SIGTERM",
syscall.SIGINT: "SIGINT",
syscall.SIGHUP: "SIGHUP",
}

func (rpc *AmqpRPCServer) catchSignals() {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM)
signal.Notify(sigChan, syscall.SIGINT)
signal.Notify(sigChan, syscall.SIGHUP)

sig := <-sigChan
rpc.log.Info(fmt.Sprintf(" [!] Caught %s", signalToName[sig]))
rpc.Stop()
signal.Stop(sigChan)
}

// Stop gracefully stops the AmqpRPCServer, after calling AmqpRPCServer.Start will
// continue blocking until it has processed any messages that have already been
// retrieved.
Expand Down

0 comments on commit 595204b

Please sign in to comment.