Skip to content

Commit

Permalink
Merge pull request #201 from porters-xyz/graceful_shutdown
Browse files Browse the repository at this point in the history
Adds graceful shutdown for proxy server and task queue
  • Loading branch information
wtfsayo committed Apr 18, 2024
2 parents 94de817 + e7e225c commit 6afa05b
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 10 deletions.
10 changes: 10 additions & 0 deletions gateway/common/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package common

import (
"time"
)

// store config as constants for now
const (
SHUTDOWN_DELAY = 5 * time.Second
)
14 changes: 14 additions & 0 deletions gateway/common/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"strconv"
"sync"
"time"
)

// TODO make it "pluggable" so different types of tasks can operate off queue
Expand Down Expand Up @@ -62,6 +63,19 @@ func (q *TaskQueue) SetupWorkers() {
// use this for graceful shutdown
func (q *TaskQueue) CloseQueue() {
close(q.Tasks)

ticker := time.NewTicker(100 * time.Millisecond)
for {
select {
case <-ticker.C:
if len(q.Tasks) == 0 {
return
}
case <-time.After(SHUTDOWN_DELAY):
log.Println("workers not finished, work may be lost")
return
}
}
}

func worker(q *TaskQueue) {
Expand Down
40 changes: 40 additions & 0 deletions gateway/gateway.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package main

import (
"log"
"os"
"os/signal"
"sync"
"syscall"

"porters/common"
"porters/proxy"
)

func gateway() {
// Start job queue
common.GetTaskQueue().SetupWorkers()

log.Println("starting gateway")
proxy.Start()

done := make(chan os.Signal)
signal.Notify(done, syscall.SIGINT, syscall.SIGTERM)
<-done
shutdown()
}

func shutdown() {
var wg sync.WaitGroup

wg.Add(2)
go func() {
defer wg.Done()
proxy.Stop()
}()
go func() {
defer wg.Done()
common.GetTaskQueue().CloseQueue()
}()
wg.Wait()
}
8 changes: 1 addition & 7 deletions gateway/main.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package main

import (
"log"
"os"

"porters/common"
"porters/plugins"
"porters/proxy"
)
Expand All @@ -16,16 +14,12 @@ func main() {
arg := os.Args[1]
if arg == "gateway" {

// Start job queue
common.GetTaskQueue().SetupWorkers()

// currently registering plugins via main
proxy.Register(&plugins.Counter{})
proxy.Register(&plugins.ApiKeyAuth{"X-API"})
proxy.Register(&plugins.BalanceTracker{})
proxy.Register(&plugins.NoopFilter{proxy.LifecycleMask(proxy.AccountLookup|proxy.RateLimit)})

log.Println("starting gateway")
proxy.Start()
gateway()
}
}
26 changes: 23 additions & 3 deletions gateway/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ import (

"github.com/gorilla/mux"

"porters/common"
"porters/db"
)

var server *http.Server

func Start() {
// TODO grab url for gateway kit
proxyUrl := os.Getenv("PROXY_TO")
Expand Down Expand Up @@ -45,9 +48,26 @@ func Start() {
router := mux.NewRouter()
router.HandleFunc("/{appId}", handler(revProxy))
router.HandleFunc("/health", healthHandler())
err2 := http.ListenAndServe(":9000", router)
if err2 != nil {
panic(err2)

server = &http.Server{Addr: ":9000", Handler: router}
go func() {
err := server.ListenAndServe()
if err != nil {
log.Println("server error", err)
}
}()
}

func Stop() {
// 5 second shutdown
ctx, cancel := context.WithTimeout(context.Background(), common.SHUTDOWN_DELAY)
defer cancel()

err := server.Shutdown(ctx)
if err != nil {
log.Println("error shutting down", err)
} else {
log.Println("shutdown successful")
}
}

Expand Down

0 comments on commit 6afa05b

Please sign in to comment.