Skip to content

Commit

Permalink
refactor(job)!: eliminate data race; add callback
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn committed Oct 22, 2023
1 parent f90eaa3 commit 8a92ecc
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 107 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
uses: actions/checkout@v4

- name: Run coverage
run: go test ./... -coverprofile=coverage.out -covermode=atomic
run: go test -race ./... -coverprofile=coverage.out -covermode=atomic

- name: Upload coverage to Codecov
if: ${{ matrix.go-version == '1.18.x' }}
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func main() {
shellJob := quartz.NewShellJob("ls -la")

request, _ := http.NewRequest(http.MethodGet, "https://worldtimeapi.org/api/timezone/utc", nil)
curlJob, _ := quartz.NewCurlJob(request)
curlJob := quartz.NewCurlJob(request)

functionJob := quartz.NewFunctionJob(func(_ context.Context) (int, error) { return 42, nil })

Expand Down
14 changes: 5 additions & 9 deletions examples/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,7 @@ func sampleJobs(ctx context.Context, wg *sync.WaitGroup) {
fmt.Println(err)
return
}
curlJob, err := quartz.NewCurlJob(request)
if err != nil {
fmt.Println(err)
return
}
curlJob := quartz.NewCurlJob(request)
functionJob := quartz.NewFunctionJobWithDesc("42", func(_ context.Context) (int, error) { return 42, nil })

_ = sched.ScheduleJob(ctx, shellJob, cronTrigger)
Expand All @@ -101,15 +97,15 @@ func sampleJobs(ctx context.Context, wg *sync.WaitGroup) {
time.Sleep(time.Second * 10)

fmt.Println(sched.GetJobKeys())
fmt.Println(shellJob.Result)
fmt.Println(shellJob.Stdout())

responseBody, err := io.ReadAll(curlJob.Response.Body)
responseBody, err := io.ReadAll(curlJob.Response().Body)
if err != nil {
fmt.Println(err)
} else {
fmt.Printf("%s\n%s\n", curlJob.Response.Status, string(responseBody))
fmt.Printf("%s\n%s\n", curlJob.Response().Status, string(responseBody))
}
fmt.Printf("Function job result: %v\n", *functionJob.Result)
fmt.Printf("Function job result: %v\n", *functionJob.Result())

time.Sleep(time.Second * 2)
sched.Stop()
Expand Down
57 changes: 40 additions & 17 deletions quartz/function_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,30 @@ package quartz
import (
"context"
"fmt"
"sync"
)

// Function represents an argument-less function which returns a generic type R and a possible error.
// Function represents an argument-less function which returns
// a generic type R and a possible error.
type Function[R any] func(context.Context) (R, error)

// FunctionJob represents a Job that invokes the passed Function, implements the quartz.Job interface.
// FunctionJob represents a Job that invokes the passed Function,
// implements the quartz.Job interface.
type FunctionJob[R any] struct {
sync.RWMutex
function *Function[R]
desc string
Result *R
Error error
JobStatus JobStatus
result *R
err error
jobStatus JobStatus
}

// NewFunctionJob returns a new FunctionJob without an explicit description.
func NewFunctionJob[R any](function Function[R]) *FunctionJob[R] {
return &FunctionJob[R]{
function: &function,
desc: fmt.Sprintf("FunctionJob:%p", &function),
Result: nil,
Error: nil,
JobStatus: NA,
jobStatus: NA,
}
}

Expand All @@ -33,9 +35,7 @@ func NewFunctionJobWithDesc[R any](desc string, function Function[R]) *FunctionJ
return &FunctionJob[R]{
function: &function,
desc: desc,
Result: nil,
Error: nil,
JobStatus: NA,
jobStatus: NA,
}
}

Expand All @@ -53,13 +53,36 @@ func (f *FunctionJob[R]) Key() int {
// It invokes the held function, setting the results in Result and Error members.
func (f *FunctionJob[R]) Execute(ctx context.Context) {
result, err := (*f.function)(ctx)
f.Lock()
if err != nil {
f.JobStatus = FAILURE
f.Result = nil
f.Error = err
f.jobStatus = FAILURE
f.result = nil
f.err = err
} else {
f.JobStatus = OK
f.Error = nil
f.Result = &result
f.jobStatus = OK
f.result = &result
f.err = nil
}
f.Unlock()
}

// Result returns the result of the FunctionJob.
func (f *FunctionJob[R]) Result() *R {
f.RLock()
defer f.RUnlock()
return f.result
}

// Error returns the error of the FunctionJob.
func (f *FunctionJob[R]) Error() error {
f.RLock()
defer f.RUnlock()
return f.err
}

// JobStatus returns the status of the FunctionJob.
func (f *FunctionJob[R]) JobStatus() JobStatus {
f.RLock()
defer f.RUnlock()
return f.jobStatus
}
27 changes: 14 additions & 13 deletions quartz/function_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package quartz_test
import (
"context"
"errors"
"sync/atomic"
"testing"
"time"

Expand All @@ -13,14 +14,14 @@ func TestFunctionJob(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var n = 2
var n int32 = 2
funcJob1 := quartz.NewFunctionJob(func(_ context.Context) (string, error) {
n += 2
atomic.AddInt32(&n, 2)
return "fired1", nil
})

funcJob2 := quartz.NewFunctionJob(func(_ context.Context) (int, error) {
n += 2
atomic.AddInt32(&n, 2)
return 42, nil
})

Expand All @@ -32,15 +33,15 @@ func TestFunctionJob(t *testing.T) {
sched.Clear()
sched.Stop()

assertEqual(t, funcJob1.JobStatus, quartz.OK)
assertNotEqual(t, funcJob1.Result, nil)
assertEqual(t, *funcJob1.Result, "fired1")
assertEqual(t, funcJob1.JobStatus(), quartz.OK)
assertNotEqual(t, funcJob1.Result(), nil)
assertEqual(t, *funcJob1.Result(), "fired1")

assertEqual(t, funcJob2.JobStatus, quartz.OK)
assertNotEqual(t, funcJob2.Result, nil)
assertEqual(t, *funcJob2.Result, 42)
assertEqual(t, funcJob2.JobStatus(), quartz.OK)
assertNotEqual(t, funcJob2.Result(), nil)
assertEqual(t, *funcJob2.Result(), 42)

assertEqual(t, n, 6)
assertEqual(t, int(atomic.LoadInt32(&n)), 6)
}

func TestNewFunctionJobWithDescAndKey(t *testing.T) {
Expand Down Expand Up @@ -88,10 +89,10 @@ func TestFunctionJobRespectsContext(t *testing.T) {
if n != -1 {
t.Fatal("job side effect should have reflected cancelation:", n)
}
if !errors.Is(funcJob2.Error, context.Canceled) {
t.Fatal("unexpected error function", funcJob2.Error)
if !errors.Is(funcJob2.Error(), context.Canceled) {
t.Fatal("unexpected error function", funcJob2.Error())
}
if funcJob2.Result != nil {
if funcJob2.Result() != nil {
t.Fatal("errored jobs should not return values")
}
}
Loading

0 comments on commit 8a92ecc

Please sign in to comment.