From 6d948a5da845511264a0f0c6486dab614bb5d40f Mon Sep 17 00:00:00 2001 From: "Eugene R." Date: Tue, 20 Aug 2024 22:16:38 +0300 Subject: [PATCH] refactor!: generic types; encapsulate internal structures (#34) --- examples/future/main.go | 3 +- future.go | 81 +++++++++++++++++++++-------------------- future_test.go | 66 ++++++++++++++++----------------- future_utils.go | 18 +++++---- once.go | 8 ++-- once_test.go | 36 ++++++++++++------ promise.go | 52 +++++++++----------------- task.go | 14 ++++--- task_test.go | 15 +++++++- 9 files changed, 153 insertions(+), 140 deletions(-) diff --git a/examples/future/main.go b/examples/future/main.go index c045318..409cc62 100644 --- a/examples/future/main.go +++ b/examples/future/main.go @@ -5,7 +5,6 @@ import ( "time" "github.com/reugn/async" - "github.com/reugn/async/internal/util" ) func main() { @@ -21,7 +20,7 @@ func asyncAction() async.Future[string] { promise := async.NewPromise[string]() go func() { time.Sleep(time.Second) - promise.Success(util.Ptr("OK")) + promise.Success("OK") }() return promise.Future() diff --git a/future.go b/future.go index 631ae10..f4b644d 100644 --- a/future.go +++ b/future.go @@ -13,54 +13,54 @@ type Future[T any] interface { // Map creates a new Future by applying a function to the successful // result of this Future. - Map(func(*T) (*T, error)) Future[T] + Map(func(T) (T, error)) Future[T] // FlatMap creates a new Future by applying a function to the successful // result of this Future. - FlatMap(func(*T) (Future[T], error)) Future[T] + FlatMap(func(T) (Future[T], error)) Future[T] // Join blocks until the Future is completed and returns either a result // or an error. - Join() (*T, error) + Join() (T, error) // Get blocks for at most the given time duration for this Future to // complete and returns either a result or an error. - Get(time.Duration) (*T, error) + Get(time.Duration) (T, error) // Recover handles any error that this Future might contain using a // resolver function. - Recover(func() (*T, error)) Future[T] + Recover(func() (T, error)) Future[T] // RecoverWith handles any error that this Future might contain using // another Future. RecoverWith(Future[T]) Future[T] // complete completes the Future with either a value or an error. - // Is used by Promise internally. - complete(*T, error) + // It is used by [Promise] internally. + complete(T, error) } -// FutureImpl implements the Future interface. -type FutureImpl[T any] struct { +// futureImpl implements the Future interface. +type futureImpl[T any] struct { acceptOnce sync.Once completeOnce sync.Once done chan any - value *T + value T err error } -// Verify FutureImpl satisfies the Future interface. -var _ Future[any] = (*FutureImpl[any])(nil) +// Verify futureImpl satisfies the Future interface. +var _ Future[any] = (*futureImpl[any])(nil) -// NewFuture returns a new Future. -func NewFuture[T any]() Future[T] { - return &FutureImpl[T]{ +// newFuture returns a new Future. +func newFuture[T any]() Future[T] { + return &futureImpl[T]{ done: make(chan any, 1), } } // accept blocks once, until the Future result is available. -func (fut *FutureImpl[T]) accept() { +func (fut *futureImpl[T]) accept() { fut.acceptOnce.Do(func() { result := <-fut.done fut.setResult(result) @@ -69,7 +69,7 @@ func (fut *FutureImpl[T]) accept() { // acceptTimeout blocks once, until the Future result is available or until // the timeout expires. -func (fut *FutureImpl[T]) acceptTimeout(timeout time.Duration) { +func (fut *futureImpl[T]) acceptTimeout(timeout time.Duration) { fut.acceptOnce.Do(func() { timer := time.NewTimer(timeout) defer timer.Stop() @@ -83,23 +83,24 @@ func (fut *FutureImpl[T]) acceptTimeout(timeout time.Duration) { } // setResult assigns a value to the Future instance. -func (fut *FutureImpl[T]) setResult(result any) { +func (fut *futureImpl[T]) setResult(result any) { switch value := result.(type) { case error: fut.err = value default: - fut.value = value.(*T) + fut.value = value.(T) } } // Map creates a new Future by applying a function to the successful result // of this Future and returns the result of the function as a new Future. -func (fut *FutureImpl[T]) Map(f func(*T) (*T, error)) Future[T] { - next := NewFuture[T]() +func (fut *futureImpl[T]) Map(f func(T) (T, error)) Future[T] { + next := newFuture[T]() go func() { fut.accept() if fut.err != nil { - next.complete(nil, fut.err) + var zero T + next.complete(zero, fut.err) } else { next.complete(f(fut.value)) } @@ -109,16 +110,18 @@ func (fut *FutureImpl[T]) Map(f func(*T) (*T, error)) Future[T] { // FlatMap creates a new Future by applying a function to the successful result // of this Future and returns the result of the function as a new Future. -func (fut *FutureImpl[T]) FlatMap(f func(*T) (Future[T], error)) Future[T] { - next := NewFuture[T]() +func (fut *futureImpl[T]) FlatMap(f func(T) (Future[T], error)) Future[T] { + next := newFuture[T]() go func() { fut.accept() if fut.err != nil { - next.complete(nil, fut.err) + var zero T + next.complete(zero, fut.err) } else { tfut, terr := f(fut.value) if terr != nil { - next.complete(nil, terr) + var zero T + next.complete(zero, terr) } else { next.complete(tfut.Join()) } @@ -129,14 +132,14 @@ func (fut *FutureImpl[T]) FlatMap(f func(*T) (Future[T], error)) Future[T] { // Join blocks until the Future is completed and returns either // a result or an error. -func (fut *FutureImpl[T]) Join() (*T, error) { +func (fut *futureImpl[T]) Join() (T, error) { fut.accept() return fut.value, fut.err } // Get blocks for at most the given time duration for this Future to // complete and returns either a result or an error. -func (fut *FutureImpl[T]) Get(timeout time.Duration) (*T, error) { +func (fut *futureImpl[T]) Get(timeout time.Duration) (T, error) { fut.acceptTimeout(timeout) return fut.value, fut.err } @@ -144,8 +147,8 @@ func (fut *FutureImpl[T]) Get(timeout time.Duration) (*T, error) { // Recover handles any error that this Future might contain using // a given resolver function. // Returns the result as a new Future. -func (fut *FutureImpl[T]) Recover(f func() (*T, error)) Future[T] { - next := NewFuture[T]() +func (fut *futureImpl[T]) Recover(f func() (T, error)) Future[T] { + next := newFuture[T]() go func() { fut.accept() if fut.err != nil { @@ -160,8 +163,8 @@ func (fut *FutureImpl[T]) Recover(f func() (*T, error)) Future[T] { // RecoverWith handles any error that this Future might contain using // another Future. // Returns the result as a new Future. -func (fut *FutureImpl[T]) RecoverWith(rf Future[T]) Future[T] { - next := NewFuture[T]() +func (fut *futureImpl[T]) RecoverWith(rf Future[T]) Future[T] { + next := newFuture[T]() go func() { fut.accept() if fut.err != nil { @@ -174,14 +177,12 @@ func (fut *FutureImpl[T]) RecoverWith(rf Future[T]) Future[T] { } // complete completes the Future with either a value or an error. -func (fut *FutureImpl[T]) complete(value *T, err error) { +func (fut *futureImpl[T]) complete(value T, err error) { fut.completeOnce.Do(func() { - go func() { - if err != nil { - fut.done <- err - } else { - fut.done <- value - } - }() + if err != nil { + fut.done <- err + } else { + fut.done <- value + } }) } diff --git a/future_test.go b/future_test.go index 81ab04b..818927f 100644 --- a/future_test.go +++ b/future_test.go @@ -16,18 +16,18 @@ func TestFuture(t *testing.T) { p := NewPromise[bool]() go func() { time.Sleep(100 * time.Millisecond) - p.Success(util.Ptr(true)) + p.Success(true) }() res, err := p.Future().Join() - assert.Equal(t, true, *res) + assert.Equal(t, true, res) assert.IsNil(t, err) } -func TestFutureUtils(t *testing.T) { - p1 := NewPromise[int]() - p2 := NewPromise[int]() - p3 := NewPromise[int]() +func TestFuture_Utils(t *testing.T) { + p1 := NewPromise[*int]() + p2 := NewPromise[*int]() + p3 := NewPromise[*int]() res1 := util.Ptr(1) res2 := util.Ptr(2) @@ -41,28 +41,28 @@ func TestFutureUtils(t *testing.T) { time.Sleep(300 * time.Millisecond) p3.Failure(err3) }() - arr := []Future[int]{p1.Future(), p2.Future(), p3.Future()} + arr := []Future[*int]{p1.Future(), p2.Future(), p3.Future()} res := []any{res1, res2, err3} futRes, _ := FutureSeq(arr).Join() - assert.Equal(t, res, *futRes) + assert.Equal(t, res, futRes) } -func TestFutureFirstCompleted(t *testing.T) { - p := NewPromise[bool]() +func TestFuture_FirstCompleted(t *testing.T) { + p := NewPromise[*bool]() go func() { time.Sleep(100 * time.Millisecond) p.Success(util.Ptr(true)) }() - timeout := FutureTimer[bool](10 * time.Millisecond) + timeout := FutureTimer[*bool](10 * time.Millisecond) futRes, futErr := FutureFirstCompletedOf(p.Future(), timeout).Join() assert.IsNil(t, futRes) assert.NotEqual(t, futErr, nil) } -func TestFutureTransform(t *testing.T) { - p1 := NewPromise[int]() +func TestFuture_Transform(t *testing.T) { + p1 := NewPromise[*int]() go func() { time.Sleep(100 * time.Millisecond) p1.Success(util.Ptr(1)) @@ -70,9 +70,9 @@ func TestFutureTransform(t *testing.T) { future := p1.Future().Map(func(v *int) (*int, error) { inc := *v + 1 return &inc, nil - }).FlatMap(func(v *int) (Future[int], error) { + }).FlatMap(func(v *int) (Future[*int], error) { inc := *v + 1 - p2 := NewPromise[int]() + p2 := NewPromise[*int]() p2.Success(&inc) return p2.Future(), nil }).Recover(func() (*int, error) { @@ -86,37 +86,37 @@ func TestFutureTransform(t *testing.T) { assert.Equal(t, 3, *res) } -func TestFutureRecover(t *testing.T) { +func TestFuture_Recover(t *testing.T) { p1 := NewPromise[int]() p2 := NewPromise[int]() go func() { time.Sleep(10 * time.Millisecond) - p1.Success(util.Ptr(1)) + p1.Success(1) time.Sleep(10 * time.Millisecond) p2.Failure(errors.New("recover Future failure")) }() - future := p1.Future().Map(func(_ *int) (*int, error) { - return nil, errors.New("map error") - }).FlatMap(func(_ *int) (Future[int], error) { + future := p1.Future().Map(func(_ int) (int, error) { + return 0, errors.New("map error") + }).FlatMap(func(_ int) (Future[int], error) { p2 := NewPromise[int]() p2.Failure(errors.New("flatMap Future failure")) return p2.Future(), nil - }).FlatMap(func(_ *int) (Future[int], error) { + }).FlatMap(func(_ int) (Future[int], error) { return nil, errors.New("flatMap error") - }).Recover(func() (*int, error) { - return nil, errors.New("recover error") - }).RecoverWith(p2.Future()).Recover(func() (*int, error) { - return util.Ptr(2), nil + }).Recover(func() (int, error) { + return 0, errors.New("recover error") + }).RecoverWith(p2.Future()).Recover(func() (int, error) { + return 2, nil }) res, err := future.Join() - assert.Equal(t, 2, *res) + assert.Equal(t, 2, res) assert.IsNil(t, err) } -func TestFutureFailure(t *testing.T) { - p1 := NewPromise[int]() - p2 := NewPromise[int]() +func TestFuture_Failure(t *testing.T) { + p1 := NewPromise[*int]() + p2 := NewPromise[*int]() go func() { time.Sleep(10 * time.Millisecond) p1.Failure(errors.New("Future error")) @@ -128,11 +128,11 @@ func TestFutureFailure(t *testing.T) { assert.Equal(t, 2, *res) } -func TestFutureTimeout(t *testing.T) { +func TestFuture_Timeout(t *testing.T) { p := NewPromise[bool]() go func() { time.Sleep(100 * time.Millisecond) - p.Success(util.Ptr(true)) + p.Success(true) }() future := p.Future() @@ -143,14 +143,14 @@ func TestFutureTimeout(t *testing.T) { assert.ErrorContains(t, err, "timeout") } -func TestFutureGoroutineLeak(t *testing.T) { +func TestFuture_GoroutineLeak(t *testing.T) { var wg sync.WaitGroup fmt.Println(runtime.NumGoroutine()) numFuture := 100 for i := 0; i < numFuture; i++ { - promise := NewPromise[string]() + promise := NewPromise[*string]() wg.Add(1) go func() { defer wg.Done() diff --git a/future_utils.go b/future_utils.go index a84966b..15195ce 100644 --- a/future_utils.go +++ b/future_utils.go @@ -6,20 +6,20 @@ import ( ) // FutureSeq reduces many Futures into a single Future. -// The resulting array may contain both *T values and errors. +// The resulting array may contain both T values and errors. func FutureSeq[T any](futures []Future[T]) Future[[]any] { - next := NewFuture[[]any]() + next := newFuture[[]any]() go func() { seq := make([]any, len(futures)) for i, future := range futures { - res, err := future.Join() + result, err := future.Join() if err != nil { seq[i] = err } else { - seq[i] = res + seq[i] = result } } - next.complete(&seq, nil) + next.complete(seq, nil) }() return next } @@ -28,7 +28,7 @@ func FutureSeq[T any](futures []Future[T]) Future[[]any] { // of the first Future in the list that is completed. // This means no matter if it is completed as a success or as a failure. func FutureFirstCompletedOf[T any](futures ...Future[T]) Future[T] { - next := NewFuture[T]() + next := newFuture[T]() go func() { for _, f := range futures { go func(future Future[T]) { @@ -42,11 +42,13 @@ func FutureFirstCompletedOf[T any](futures ...Future[T]) Future[T] { // FutureTimer returns Future that will have been resolved after given duration; // useful for FutureFirstCompletedOf for timeout purposes. func FutureTimer[T any](d time.Duration) Future[T] { - next := NewFuture[T]() + next := newFuture[T]() go func() { timer := time.NewTimer(d) <-timer.C - next.complete(nil, fmt.Errorf("FutureTimer %s timeout", d)) + var zero T + next.(*futureImpl[T]). + complete(zero, fmt.Errorf("FutureTimer %s timeout", d)) }() return next } diff --git a/once.go b/once.go index 30f7f6f..acc5ab6 100644 --- a/once.go +++ b/once.go @@ -9,14 +9,14 @@ import ( // Any subsequent call will return the previous result. type Once[T any] struct { runOnce sync.Once - result *T + result T err error } // Do calls the function f if and only if Do is being called for the // first time for this instance of Once. In other words, given // -// var once Once +// var once Once[T] // // if once.Do(f) is called multiple times, only the first call will invoke f, // even if f has a different value in each invocation. A new instance of @@ -26,8 +26,8 @@ type Once[T any] struct { // first execution. // // If f panics, Do considers it to have returned; future calls of Do return -// without calling f -func (o *Once[T]) Do(f func() (*T, error)) (*T, error) { +// without calling f. +func (o *Once[T]) Do(f func() (T, error)) (T, error) { o.runOnce.Do(func() { defer func() { if err := recover(); err != nil { diff --git a/once_test.go b/once_test.go index 758f9cc..b8e4051 100644 --- a/once_test.go +++ b/once_test.go @@ -6,23 +6,37 @@ import ( "testing" "github.com/reugn/async/internal/assert" + "github.com/reugn/async/internal/util" ) func TestOnce(t *testing.T) { - var once Once[int32] - count := new(int32) + var once Once[int] + var count int for i := 0; i < 10; i++ { - count, _ = once.Do(func() (*int32, error) { + count, _ = once.Do(func() (int, error) { + count++ + return count, nil + }) + } + assert.Equal(t, 1, count) +} + +func TestOnce_Ptr(t *testing.T) { + var once Once[*int] + count := new(int) + + for i := 0; i < 10; i++ { + count, _ = once.Do(func() (*int, error) { *count++ return count, nil }) } - assert.Equal(t, 1, *count) + assert.Equal(t, util.Ptr(1), count) } -func TestOnceConcurrent(t *testing.T) { - var once Once[int32] +func TestOnce_Concurrent(t *testing.T) { + var once Once[*int32] var count atomic.Int32 var wg sync.WaitGroup @@ -41,16 +55,16 @@ func TestOnceConcurrent(t *testing.T) { assert.Equal(t, 1, int(count.Load())) } -func TestOncePanic(t *testing.T) { - var once Once[int32] - count := new(int32) +func TestOnce_Panic(t *testing.T) { + var once Once[*int] + count := new(int) var err error for i := 0; i < 10; i++ { - count, err = once.Do(func() (*int32, error) { + count, err = once.Do(func() (*int, error) { *count /= *count return count, nil }) } - assert.Equal(t, "recovered runtime error: integer divide by zero", err.Error()) + assert.ErrorContains(t, err, "integer divide by zero") } diff --git a/promise.go b/promise.go index 7ee3cfe..7e19e09 100644 --- a/promise.go +++ b/promise.go @@ -7,7 +7,7 @@ import "sync" type Promise[T any] interface { // Success completes the underlying Future with a value. - Success(*T) + Success(T) // Failure fails the underlying Future with an error. Failure(error) @@ -16,54 +16,38 @@ type Promise[T any] interface { Future() Future[T] } -type promiseStatus uint8 - -const ( - ready promiseStatus = iota - completed -) - -// PromiseImpl implements the Promise interface. -type PromiseImpl[T any] struct { - sync.Mutex +// promiseImpl implements the Promise interface. +type promiseImpl[T any] struct { + once sync.Once future Future[T] - status promiseStatus } -// Verify PromiseImpl satisfies the Promise interface. -var _ Promise[any] = (*PromiseImpl[any])(nil) +// Verify promiseImpl satisfies the Promise interface. +var _ Promise[any] = (*promiseImpl[any])(nil) -// NewPromise returns a new PromiseImpl. +// NewPromise returns a new Promise. func NewPromise[T any]() Promise[T] { - return &PromiseImpl[T]{ - future: NewFuture[T](), - status: ready, + return &promiseImpl[T]{ + future: newFuture[T](), } } // Success completes the underlying Future with a given value. -func (p *PromiseImpl[T]) Success(value *T) { - p.Lock() - defer p.Unlock() - - if p.status != completed { +func (p *promiseImpl[T]) Success(value T) { + p.once.Do(func() { p.future.complete(value, nil) - p.status = completed - } + }) } // Failure fails the underlying Future with a given error. -func (p *PromiseImpl[T]) Failure(err error) { - p.Lock() - defer p.Unlock() - - if p.status != completed { - p.future.complete(nil, err) - p.status = completed - } +func (p *promiseImpl[T]) Failure(err error) { + p.once.Do(func() { + var zero T + p.future.complete(zero, err) + }) } // Future returns the underlying Future. -func (p *PromiseImpl[T]) Future() Future[T] { +func (p *promiseImpl[T]) Future() Future[T] { return p.future } diff --git a/task.go b/task.go index 3fbff94..df6de32 100644 --- a/task.go +++ b/task.go @@ -3,23 +3,25 @@ package async // Task is a data type for controlling possibly lazy and // asynchronous computations. type Task[T any] struct { - taskFunc func() (*T, error) + taskFunc func() (T, error) } -// NewTask returns a new Task. -func NewTask[T any](taskFunc func() (*T, error)) *Task[T] { +// NewTask returns a new Task associated with the specified function. +func NewTask[T any](taskFunc func() (T, error)) *Task[T] { return &Task[T]{ taskFunc: taskFunc, } } -// Call executes the Task and returns a Future. +// Call starts executing the task using a goroutine. It returns a +// Future which can be used to retrieve the result or error of the +// task when it is completed. func (task *Task[T]) Call() Future[T] { promise := NewPromise[T]() go func() { - res, err := task.taskFunc() + result, err := task.taskFunc() if err == nil { - promise.Success(res) + promise.Success(result) } else { promise.Failure(err) } diff --git a/task_test.go b/task_test.go index c9fb4f4..7b79fda 100644 --- a/task_test.go +++ b/task_test.go @@ -9,7 +9,18 @@ import ( "github.com/reugn/async/internal/util" ) -func TestTaskSuccess(t *testing.T) { +func TestTask_Success(t *testing.T) { + task := NewTask(func() (string, error) { + time.Sleep(10 * time.Millisecond) + return "ok", nil + }) + res, err := task.Call().Join() + + assert.Equal(t, "ok", res) + assert.IsNil(t, err) +} + +func TestTask_SuccessPtr(t *testing.T) { task := NewTask(func() (*string, error) { time.Sleep(10 * time.Millisecond) return util.Ptr("ok"), nil @@ -20,7 +31,7 @@ func TestTaskSuccess(t *testing.T) { assert.IsNil(t, err) } -func TestTaskFailure(t *testing.T) { +func TestTask_Failure(t *testing.T) { task := NewTask(func() (*string, error) { time.Sleep(10 * time.Millisecond) return nil, errors.New("error")