Skip to content

Commit

Permalink
new package
Browse files Browse the repository at this point in the history
  • Loading branch information
jharshman committed May 8, 2020
0 parents commit 1ad3850
Show file tree
Hide file tree
Showing 3 changed files with 355 additions and 0 deletions.
158 changes: 158 additions & 0 deletions async.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
Package async provides the types, functions, and methods to facilitate the safe running
and closing of asynchronous tasks.
To get started, let's take a look at async.Job. As you can see in the example below
we are defining an async.Job called myJob, and stubbing out the Run and Close fields.
These fields are functions that will control the running and safe closing of your Job.
myJob := async.Job{
Run: func() {
// do my thing
},
Close: func() {
// close my thing
},
}
Running an HTTP server with async.Job might look like the following:
myJob := async.Job{
Run: func() error {
return http.ListenAndServe()
},
Close: func() error {
return s.Shutdown(context.Background())
},
}
myJob.Execute()
By default, the function defined for async.Job.Close will trigger when a syscall.SIGINT or
syscall.SIGTERM is received. You can modify these defaults by setting your own on the async.Job.
myJob := async.Job{
Run: func() error {
return http.ListenAndServe()
},
Close: func() error {
return s.Shutdown(context.Background())
},
Signals: []os.Signal{syscall.SIGHUP},
}
myJob.Execute()
*/
package async

import (
"fmt"
"os"
"os/signal"
"syscall"
)

type SafeCloser interface {
RunWithClose() (sig, ack chan int, err chan error)
}

type Job struct {
// Run And Close functions.
// Both required iff using Execute() or RunWithClose().
Run func() error
Close func() error

// Signals is a slice of os.Signal to notify on.
// This is used by Execute(). Defaults to SIGINT and SIGTERM.
Signals []os.Signal

// todo: decide if this is in fact useful
// Pointer to next Job. Useful for chaining order of operations.
Next *Job

// references to job comm channels
sig *chan int
ack *chan int
err *chan error
}

// RunWithClose executes the function defined in Job.Run as a
// goroutine. It returns three channels to the caller to facilitate
// communication. Once signaled on the "sig" channel, the function
// defined in Job.Close will be called. Once Job.Close has finished,
// the caller is sent a final message on the "ack" channel.
// All errors are reported through the "err" channel.
func (j *Job) RunWithClose() (sig, ack chan int, err chan error) {
sig = make(chan int, 1)
ack = make(chan int, 1)
err = make(chan error, 1)

j.sig = &sig
j.ack = &ack
j.err = &err

go func() {
go func() {
if e := j.Run(); e != nil {
err <- e
}
}()
<-sig
if e := j.Close(); e != nil {
err <- e
}
ack <- 1
}()
return
}

// Execute is a blocking method that calls RunWithClose and
// sets up a channel to listen for signals defined in Job.Signals.
// Will return error if RunWithClose results in an error from either
// Job.Run or Job.Close.
func (j *Job) Execute() error {

// sanity check for job, requires both Run and Close functions defined.
if j.Run == nil || j.Close == nil {
return fmt.Errorf("either Run or Close fields missing")
}

sig, ack, err := j.RunWithClose()

closeChan := make(chan os.Signal, 1)
if len(j.Signals) == 0 {
j.Signals = []os.Signal{
syscall.SIGINT,
syscall.SIGTERM,
}
}
signal.Notify(closeChan, j.Signals...)

LOOP:
for {
select {
case <-closeChan:
sig <- 1
case <-ack:
break LOOP
case e := <-err:
return e
}
}

// todo: think a bit more on the job.Next functionality
//// check for next
//if j.Next != nil {
// if nextErr := j.Next.Execute(); nextErr != nil {
// return nextErr
// }
//}

return nil
}

// Helper function to signal a job to close.
func (j *Job) SignalToClose() {
*j.sig <- 1
}
194 changes: 194 additions & 0 deletions async_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package async_test

import (
"context"
"errors"
"net/http"
"os"
"os/signal"
"syscall"
"testing"
"time"

"github.com/jharshman/async"
)

func Test_RunWithClose(t *testing.T) {
s := http.Server{
Addr: ":8080",
Handler: http.DefaultServeMux,
}

http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
})

job := async.Job{
Run: func() error {
return s.ListenAndServe()
},
Close: func() error {
return s.Shutdown(context.Background())
},
}

sig, ack, err := job.RunWithClose()

closeChan := make(chan os.Signal, 1)
signal.Notify(closeChan, syscall.SIGTERM, syscall.SIGINT)

// go routine to notify close after short wait
go func() {
<-time.After(time.Second * 5)
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
}()

LOOP:
for {
select {
case <-closeChan:
sig <- 1
break LOOP
case <-ack:
break LOOP
case e := <-err:
t.Errorf("%v\n", e)
break LOOP
}
}
}

func TestJob_Execute(t *testing.T) {
s := http.Server{
Addr: ":8080",
Handler: http.DefaultServeMux,
}

http.HandleFunc("/foo", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
})

job := async.Job{
Run: func() error {
return s.ListenAndServe()
},
Close: func() error {
return s.Shutdown(context.Background())
},
Signals: []os.Signal{syscall.SIGINT},
}

// go routine to notify close after short wait
go func() {
<-time.After(time.Second * 5)
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
}()

err := job.Execute()
if err != nil {
t.Error(err)
}
}

func TestJob_ExecuteRunWithErrors(t *testing.T) {
job := async.Job{
Run: func() error {
return errors.New("some error")
},
Close: func() error {
return nil
},
Signals: []os.Signal{syscall.SIGINT},
}

// go routine to notify close after short wait
go func() {
<-time.After(time.Second * 5)
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
}()

// error expected here
err := job.Execute()
if err == nil {
t.Error(err)
}
}

func TestJob_ExecuteCloseWithErrors(t *testing.T) {
job := async.Job{
Run: func() error {
return nil
},
Close: func() error {
return errors.New("some error")
},
Signals: []os.Signal{syscall.SIGINT},
}

// go routine to notify close after short wait
go func() {
<-time.After(time.Second * 5)
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
}()

// error expected here
err := job.Execute()
if err == nil {
t.Error(err)
}
}

func TestJob_ExecuteNoCloseDefined(t *testing.T) {
job := async.Job{
Run: func() error {
return nil
},
Signals: []os.Signal{syscall.SIGINT},
}

// error expected here
err := job.Execute()
if err == nil {
t.Error(err)
}
}

func TestJob_ExecuteNoRunDefined(t *testing.T) {
job := async.Job{
Close: func() error {
return nil
},
Signals: []os.Signal{syscall.SIGINT},
}

// error expected here
err := job.Execute()
if err == nil {
t.Error(err)
}
}

func TestJob_SignalToClose(t *testing.T) {
job := async.Job{
Run: func() error {
return nil
},
Close: func() error {
return nil
},
Signals: []os.Signal{syscall.SIGINT},
}

// go routine to notify close after short wait
go func() {
<-time.After(time.Second * 5)
job.SignalToClose()
}()

err := job.Execute()
if err != nil {
t.Error(err)
}
}
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/jharshman/async

go 1.14

0 comments on commit 1ad3850

Please sign in to comment.