Skip to content

Commit

Permalink
feat(worker): support graceful shutdown (#459)
Browse files Browse the repository at this point in the history
* feat(worker): support graceful shutdown

notifications workers and queue have been sent to APNs/FCM before shutdown a push notification.

send buffered channel to signal.Notify to avoid blocking

see: golang/lint#175

fixed: #441

Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com>
  • Loading branch information
appleboy authored Feb 4, 2020
1 parent bcf1c0c commit 2d2a8a0
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 20 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ build_linux_lambda:
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -tags 'lambda' -ldflags '$(EXTLDFLAGS)-s -w $(LDFLAGS)' -o release/linux/lambda/$(DEPLOY_IMAGE)

docker_image:
docker build -t $(DEPLOY_ACCOUNT)/$(DEPLOY_IMAGE) -f Dockerfile .
docker build -t $(DEPLOY_ACCOUNT)/$(DEPLOY_IMAGE) -f ./docker/Dockerfile.linux.amd64 .

docker_release: docker_image

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ A push notification micro server using [Gin](https://github.com/gin-gonic/gin) f
- Support install TLS certificates from [Let's Encrypt](https://letsencrypt.org/) automatically.
- Support send notification through [RPC](https://en.wikipedia.org/wiki/Remote_procedure_call) protocol, we use [gRPC](https://grpc.io/) as default framework.
- Support running in Docker, [Kubernetes](https://kubernetes.io/) or [AWS Lambda](https://aws.amazon.com/lambda) ([Native Support in Golang](https://aws.amazon.com/blogs/compute/announcing-go-support-for-aws-lambda/))
- Support graceful shutdown that notifications workers and queue have are sent to APNs/FCM before a push notification service is shutdown.

See the default [YAML config example](config/config.yml):

Expand Down
15 changes: 15 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
version: '3'

services:
gorush:
image: appleboy/gorush
restart: always
ports:
- "8080:8080"
- "9000:9000"
logging:
options:
max-size: "100k"
max-file: "3"
environment:
- GORUSH_CORE_QUEUE_NUM=512
7 changes: 6 additions & 1 deletion gorush/notification_fcm_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package gorush

import (
"context"
"log"
"os"
"sync"
"testing"

"github.com/appleboy/go-fcm"
Expand All @@ -16,7 +18,10 @@ func init() {
log.Fatal(err)
}

InitWorkers(PushConf.Core.WorkerNum, PushConf.Core.QueueNum)
ctx := context.Background()
wg := &sync.WaitGroup{}
wg.Add(int(PushConf.Core.WorkerNum))
InitWorkers(ctx, wg, PushConf.Core.WorkerNum, PushConf.Core.QueueNum)

if err := InitAppStatus(); err != nil {
log.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion gorush/server_lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func RunHTTPServer() error {
return nil
}

LogAccess.Debug("HTTPD server is running on " + PushConf.Core.Port + " port.")
LogAccess.Info("HTTPD server is running on " + PushConf.Core.Port + " port.")

return gateway.ListenAndServe(PushConf.Core.Address+":"+PushConf.Core.Port, routerEngine())
}
2 changes: 1 addition & 1 deletion gorush/server_normal.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func RunHTTPServer() (err error) {
Handler: routerEngine(),
}

LogAccess.Debug("HTTPD server is running on " + PushConf.Core.Port + " port.")
LogAccess.Info("HTTPD server is running on " + PushConf.Core.Port + " port.")
if PushConf.Core.AutoTLS.Enabled {
return startServer(autoTLSServer())
} else if PushConf.Core.SSL {
Expand Down
3 changes: 2 additions & 1 deletion gorush/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/appleboy/gorush/storage/leveldb"
"github.com/appleboy/gorush/storage/memory"
"github.com/appleboy/gorush/storage/redis"

"github.com/gin-gonic/gin"
"github.com/thoas/stats"
)
Expand Down Expand Up @@ -41,7 +42,7 @@ type IosStatus struct {

// InitAppStatus for initialize app status
func InitAppStatus() error {
LogAccess.Debug("Init App Status Engine as ", PushConf.Stat.Engine)
LogAccess.Info("Init App Status Engine as ", PushConf.Stat.Engine)
switch PushConf.Stat.Engine {
case "memory":
StatStorage = memory.New()
Expand Down
13 changes: 7 additions & 6 deletions gorush/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (
)

// InitWorkers for initialize all workers.
func InitWorkers(workerNum int64, queueNum int64) {
LogAccess.Debug("worker number is ", workerNum, ", queue number is ", queueNum)
func InitWorkers(ctx context.Context, wg *sync.WaitGroup, workerNum int64, queueNum int64) {
LogAccess.Info("worker number is ", workerNum, ", queue number is ", queueNum)
QueueNotification = make(chan PushNotification, queueNum)
for i := int64(0); i < workerNum; i++ {
go startWorker()
go startWorker(ctx, wg, i)
}
}

Expand All @@ -33,11 +33,12 @@ func SendNotification(req PushNotification) {
}
}

func startWorker() {
for {
notification := <-QueueNotification
func startWorker(ctx context.Context, wg *sync.WaitGroup, num int64) {
defer wg.Done()
for notification := range QueueNotification {
SendNotification(notification)
}
LogAccess.Info("closed the worker num ", num)
}

// markFailedNotification adds failure logs for all tokens in push notification
Expand Down
48 changes: 40 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package main

import (
"context"
"flag"
"fmt"
"log"
"net"
"net/http"
"os"
"os/signal"
"path/filepath"
"strconv"
"sync"
"syscall"
"time"

"github.com/appleboy/gorush/config"
Expand All @@ -18,6 +22,24 @@ import (
"golang.org/x/sync/errgroup"
)

func withContextFunc(ctx context.Context, f func()) context.Context {
ctx, cancel := context.WithCancel(ctx)
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
defer signal.Stop(c)

select {
case <-ctx.Done():
case <-c:
cancel()
f()
}
}()

return ctx
}

func main() {
opts := config.ConfYaml{}

Expand Down Expand Up @@ -223,19 +245,29 @@ func main() {
}

if err = gorush.InitAppStatus(); err != nil {
return
gorush.LogError.Fatal(err)
}

gorush.InitWorkers(gorush.PushConf.Core.WorkerNum, gorush.PushConf.Core.QueueNum)
wg := &sync.WaitGroup{}
wg.Add(int(gorush.PushConf.Core.WorkerNum))
ctx := withContextFunc(context.Background(), func() {
gorush.LogAccess.Info("close the notification queue channel")
close(gorush.QueueNotification)
wg.Wait()
gorush.LogAccess.Info("the notification queue has been clear")
})

var g errgroup.Group
gorush.InitWorkers(ctx, wg, gorush.PushConf.Core.WorkerNum, gorush.PushConf.Core.QueueNum)

g.Go(gorush.InitAPNSClient)
if err = gorush.InitAPNSClient(); err != nil {
gorush.LogError.Fatal(err)
}

g.Go(func() error {
_, err := gorush.InitFCMClient(gorush.PushConf.Android.APIKey)
return err
})
if _, err = gorush.InitFCMClient(gorush.PushConf.Android.APIKey); err != nil {
gorush.LogError.Fatal(err)
}

var g errgroup.Group

g.Go(gorush.RunHTTPServer) // Run httpd server
g.Go(rpc.RunGRPCServer) // Run gRPC internal server
Expand Down
2 changes: 1 addition & 1 deletion rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (s *Server) Send(ctx context.Context, in *proto.NotificationRequest) (*prot
// RunGRPCServer run gorush grpc server
func RunGRPCServer() error {
if !gorush.PushConf.GRPC.Enabled {
gorush.LogAccess.Debug("gRPC server is disabled.")
gorush.LogAccess.Info("gRPC server is disabled.")
return nil
}

Expand Down

0 comments on commit 2d2a8a0

Please sign in to comment.