Skip to content

Commit

Permalink
Add jitter to sleep (#39)
Browse files Browse the repository at this point in the history
Add `sleep-jitter` config/flag to sleep. So when deployed in large
scale, this will reduce the likelihood for
having all the servers banning the remote endpoint at once.

In this change `runChan` is replaced with a lambda function `runOnce`,
which is a non-blocking function
to trigger a single run, and it returns if ansible is running already.
Which removes the possibility that multiple
ansible runs queued up and runs back to back.

When sleep jitter is specified, the puller will sleep a duration that
conforms to uniform distribution
`U(sleep-jitter, sleep+jitter)`. Otherwise the behavior remains
unchanged.
  • Loading branch information
ashi009 committed Aug 15, 2023
1 parent cd825c6 commit d58859c
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 18 deletions.
12 changes: 6 additions & 6 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ var (
ansibleController string
)

// MakeChannelHandler returns an http handler that populates a channel with a single `true` when the handler is invoked
func MakeChannelHandler(c chan bool) func(http.ResponseWriter, *http.Request) {
// MakeRunOnceHandler returns an http handler that calls runOnce when invoked.
func MakeRunOnceHandler(runOnce func()) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
c <- true
runOnce()
http.Redirect(w, r, httpPathAnsibleControl, http.StatusFound)
}
}
Expand Down Expand Up @@ -114,13 +114,13 @@ func HandlerStatus(w http.ResponseWriter, r *http.Request) {

// NewServer creates a new http server
//
// runChan is a channel that we will write to when the adhocTrigger handler is invoked.
func NewServer(runChan chan bool) *http.Server {
// runOnce is a function that we will be called when the adhocTrigger handler is invoked.
func NewServer(runOnce func()) *http.Server {
r := mux.NewRouter()

r.Handle("/metrics", promhttp.Handler())
r.HandleFunc("/", HandlerIndex).Methods("GET")
r.HandleFunc(httpPathAnsibleAdhocTrigger, MakeChannelHandler(runChan)).Methods("POST")
r.HandleFunc(httpPathAnsibleAdhocTrigger, MakeRunOnceHandler(runOnce)).Methods("POST")
r.HandleFunc(httpPathAnsibleDisable, HandlerAnsibleDisable).Methods("POST")
r.HandleFunc(httpPathAnsibleEnable, HandlerAnsibleEnable).Methods("POST")
r.HandleFunc(httpPathAnsibleControl, HandlerAnsibleControl).Methods("GET")
Expand Down
42 changes: 30 additions & 12 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"fmt"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"time"
Expand Down Expand Up @@ -101,6 +102,7 @@ func init() {
pflag.String("venv-requirements-file", "requirements.txt", "Relative path in the pulled tarball of the requirements file to populate the virtual environment")

pflag.Int("sleep", 30, "Number of minutes to sleep between runs")
pflag.Int("sleep-jitter", 0, "Number of maxium minutes to jitter between runs. When set, the actual sleep time between each run will be uniformly distributed between [sleep-jitter, sleep+jitter)")
pflag.Bool("start-disabled", false, "Whether or not to start the server disabled")
pflag.Bool("debug", false, "Start the server in debug mode")
pflag.Bool("once", false, "Run Ansible Puller just once, then exit")
Expand Down Expand Up @@ -296,24 +298,40 @@ func main() {
promVersion.WithLabelValues(Version).Set(1)

period := time.Duration(viper.GetInt("sleep")) * time.Minute
tickerChan := time.NewTicker(period).C
runChan := make(chan bool, 8)
jitter := time.Duration(viper.GetInt("sleep-jitter")) * time.Minute

if jitter >= period {
logrus.Fatalf("sleep-jitter is too large, it must be less than the 'sleep' period %d", viper.GetInt("sleep"))
}

runChan := make(chan bool)
runOnce := func() {
// Non-blocking send to the run channel. If it's already running, this will be a no-op.
select {
case runChan <- true:
default:
}
}

go func() {
// Blocking wait for the timer to tick, then send a notification down the run channel
// This will tie the timer and ad-hoc jobs to the same channel so that we can simplify run triggers
runOnce()
if jitter == 0 {
for range time.Tick(period) {
runOnce()
}
return
}
rng := rand.New(rand.NewSource(time.Now().Unix()))
for {
<-tickerChan
runChan <- true
// Sleep for a random duration in [period - jitter, period + jitter).
time.Sleep(period - jitter + time.Duration(rng.Int63n(2*int64(jitter))))
runOnce()
}
}()

go func() {
logrus.Infoln(fmt.Sprintf("Launching Ansible Runner. Runs %d minutes apart.", viper.GetInt("sleep")))
runChan <- true
for {
<-runChan

logrus.Infoln(fmt.Sprintf("Launching Ansible Runner. Runs %d minutes (with %d mintues jitter) apart.", viper.GetInt("sleep"), viper.GetInt("sleep-jitter")))
for range runChan {
start := time.Now()
err := ansibleRun()
elapsed := time.Since(start)
Expand All @@ -329,7 +347,7 @@ func main() {
}
}()

srv := NewServer(runChan)
srv := NewServer(runOnce)
logrus.Infoln("Starting server on " + viper.GetString("http-listen-string"))
logrus.Fatal(srv.ListenAndServe())
}

0 comments on commit d58859c

Please sign in to comment.