Skip to content

Commit

Permalink
refactor!: generic types; encapsulate internal structures (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn authored Aug 20, 2024
1 parent 1648eed commit 6d948a5
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 140 deletions.
3 changes: 1 addition & 2 deletions examples/future/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"time"

"github.com/reugn/async"
"github.com/reugn/async/internal/util"
)

func main() {
Expand All @@ -21,7 +20,7 @@ func asyncAction() async.Future[string] {
promise := async.NewPromise[string]()
go func() {
time.Sleep(time.Second)
promise.Success(util.Ptr("OK"))
promise.Success("OK")
}()

return promise.Future()
Expand Down
81 changes: 41 additions & 40 deletions future.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,54 +13,54 @@ type Future[T any] interface {

// Map creates a new Future by applying a function to the successful
// result of this Future.
Map(func(*T) (*T, error)) Future[T]
Map(func(T) (T, error)) Future[T]

// FlatMap creates a new Future by applying a function to the successful
// result of this Future.
FlatMap(func(*T) (Future[T], error)) Future[T]
FlatMap(func(T) (Future[T], error)) Future[T]

// Join blocks until the Future is completed and returns either a result
// or an error.
Join() (*T, error)
Join() (T, error)

// Get blocks for at most the given time duration for this Future to
// complete and returns either a result or an error.
Get(time.Duration) (*T, error)
Get(time.Duration) (T, error)

// Recover handles any error that this Future might contain using a
// resolver function.
Recover(func() (*T, error)) Future[T]
Recover(func() (T, error)) Future[T]

// RecoverWith handles any error that this Future might contain using
// another Future.
RecoverWith(Future[T]) Future[T]

// complete completes the Future with either a value or an error.
// Is used by Promise internally.
complete(*T, error)
// It is used by [Promise] internally.
complete(T, error)
}

// FutureImpl implements the Future interface.
type FutureImpl[T any] struct {
// futureImpl implements the Future interface.
type futureImpl[T any] struct {
acceptOnce sync.Once
completeOnce sync.Once
done chan any
value *T
value T
err error
}

// Verify FutureImpl satisfies the Future interface.
var _ Future[any] = (*FutureImpl[any])(nil)
// Verify futureImpl satisfies the Future interface.
var _ Future[any] = (*futureImpl[any])(nil)

// NewFuture returns a new Future.
func NewFuture[T any]() Future[T] {
return &FutureImpl[T]{
// newFuture returns a new Future.
func newFuture[T any]() Future[T] {
return &futureImpl[T]{
done: make(chan any, 1),
}
}

// accept blocks once, until the Future result is available.
func (fut *FutureImpl[T]) accept() {
func (fut *futureImpl[T]) accept() {
fut.acceptOnce.Do(func() {
result := <-fut.done
fut.setResult(result)
Expand All @@ -69,7 +69,7 @@ func (fut *FutureImpl[T]) accept() {

// acceptTimeout blocks once, until the Future result is available or until
// the timeout expires.
func (fut *FutureImpl[T]) acceptTimeout(timeout time.Duration) {
func (fut *futureImpl[T]) acceptTimeout(timeout time.Duration) {
fut.acceptOnce.Do(func() {
timer := time.NewTimer(timeout)
defer timer.Stop()
Expand All @@ -83,23 +83,24 @@ func (fut *FutureImpl[T]) acceptTimeout(timeout time.Duration) {
}

// setResult assigns a value to the Future instance.
func (fut *FutureImpl[T]) setResult(result any) {
func (fut *futureImpl[T]) setResult(result any) {
switch value := result.(type) {
case error:
fut.err = value
default:
fut.value = value.(*T)
fut.value = value.(T)
}
}

// Map creates a new Future by applying a function to the successful result
// of this Future and returns the result of the function as a new Future.
func (fut *FutureImpl[T]) Map(f func(*T) (*T, error)) Future[T] {
next := NewFuture[T]()
func (fut *futureImpl[T]) Map(f func(T) (T, error)) Future[T] {
next := newFuture[T]()
go func() {
fut.accept()
if fut.err != nil {
next.complete(nil, fut.err)
var zero T
next.complete(zero, fut.err)
} else {
next.complete(f(fut.value))
}
Expand All @@ -109,16 +110,18 @@ func (fut *FutureImpl[T]) Map(f func(*T) (*T, error)) Future[T] {

// FlatMap creates a new Future by applying a function to the successful result
// of this Future and returns the result of the function as a new Future.
func (fut *FutureImpl[T]) FlatMap(f func(*T) (Future[T], error)) Future[T] {
next := NewFuture[T]()
func (fut *futureImpl[T]) FlatMap(f func(T) (Future[T], error)) Future[T] {
next := newFuture[T]()
go func() {
fut.accept()
if fut.err != nil {
next.complete(nil, fut.err)
var zero T
next.complete(zero, fut.err)
} else {
tfut, terr := f(fut.value)
if terr != nil {
next.complete(nil, terr)
var zero T
next.complete(zero, terr)
} else {
next.complete(tfut.Join())
}
Expand All @@ -129,23 +132,23 @@ func (fut *FutureImpl[T]) FlatMap(f func(*T) (Future[T], error)) Future[T] {

// Join blocks until the Future is completed and returns either
// a result or an error.
func (fut *FutureImpl[T]) Join() (*T, error) {
func (fut *futureImpl[T]) Join() (T, error) {
fut.accept()
return fut.value, fut.err
}

// Get blocks for at most the given time duration for this Future to
// complete and returns either a result or an error.
func (fut *FutureImpl[T]) Get(timeout time.Duration) (*T, error) {
func (fut *futureImpl[T]) Get(timeout time.Duration) (T, error) {
fut.acceptTimeout(timeout)
return fut.value, fut.err
}

// Recover handles any error that this Future might contain using
// a given resolver function.
// Returns the result as a new Future.
func (fut *FutureImpl[T]) Recover(f func() (*T, error)) Future[T] {
next := NewFuture[T]()
func (fut *futureImpl[T]) Recover(f func() (T, error)) Future[T] {
next := newFuture[T]()
go func() {
fut.accept()
if fut.err != nil {
Expand All @@ -160,8 +163,8 @@ func (fut *FutureImpl[T]) Recover(f func() (*T, error)) Future[T] {
// RecoverWith handles any error that this Future might contain using
// another Future.
// Returns the result as a new Future.
func (fut *FutureImpl[T]) RecoverWith(rf Future[T]) Future[T] {
next := NewFuture[T]()
func (fut *futureImpl[T]) RecoverWith(rf Future[T]) Future[T] {
next := newFuture[T]()
go func() {
fut.accept()
if fut.err != nil {
Expand All @@ -174,14 +177,12 @@ func (fut *FutureImpl[T]) RecoverWith(rf Future[T]) Future[T] {
}

// complete completes the Future with either a value or an error.
func (fut *FutureImpl[T]) complete(value *T, err error) {
func (fut *futureImpl[T]) complete(value T, err error) {
fut.completeOnce.Do(func() {
go func() {
if err != nil {
fut.done <- err
} else {
fut.done <- value
}
}()
if err != nil {
fut.done <- err
} else {
fut.done <- value
}
})
}
66 changes: 33 additions & 33 deletions future_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@ func TestFuture(t *testing.T) {
p := NewPromise[bool]()
go func() {
time.Sleep(100 * time.Millisecond)
p.Success(util.Ptr(true))
p.Success(true)
}()
res, err := p.Future().Join()

assert.Equal(t, true, *res)
assert.Equal(t, true, res)
assert.IsNil(t, err)
}

func TestFutureUtils(t *testing.T) {
p1 := NewPromise[int]()
p2 := NewPromise[int]()
p3 := NewPromise[int]()
func TestFuture_Utils(t *testing.T) {
p1 := NewPromise[*int]()
p2 := NewPromise[*int]()
p3 := NewPromise[*int]()

res1 := util.Ptr(1)
res2 := util.Ptr(2)
Expand All @@ -41,38 +41,38 @@ func TestFutureUtils(t *testing.T) {
time.Sleep(300 * time.Millisecond)
p3.Failure(err3)
}()
arr := []Future[int]{p1.Future(), p2.Future(), p3.Future()}
arr := []Future[*int]{p1.Future(), p2.Future(), p3.Future()}
res := []any{res1, res2, err3}
futRes, _ := FutureSeq(arr).Join()

assert.Equal(t, res, *futRes)
assert.Equal(t, res, futRes)
}

func TestFutureFirstCompleted(t *testing.T) {
p := NewPromise[bool]()
func TestFuture_FirstCompleted(t *testing.T) {
p := NewPromise[*bool]()
go func() {
time.Sleep(100 * time.Millisecond)
p.Success(util.Ptr(true))
}()
timeout := FutureTimer[bool](10 * time.Millisecond)
timeout := FutureTimer[*bool](10 * time.Millisecond)
futRes, futErr := FutureFirstCompletedOf(p.Future(), timeout).Join()

assert.IsNil(t, futRes)
assert.NotEqual(t, futErr, nil)
}

func TestFutureTransform(t *testing.T) {
p1 := NewPromise[int]()
func TestFuture_Transform(t *testing.T) {
p1 := NewPromise[*int]()
go func() {
time.Sleep(100 * time.Millisecond)
p1.Success(util.Ptr(1))
}()
future := p1.Future().Map(func(v *int) (*int, error) {
inc := *v + 1
return &inc, nil
}).FlatMap(func(v *int) (Future[int], error) {
}).FlatMap(func(v *int) (Future[*int], error) {
inc := *v + 1
p2 := NewPromise[int]()
p2 := NewPromise[*int]()
p2.Success(&inc)
return p2.Future(), nil
}).Recover(func() (*int, error) {
Expand All @@ -86,37 +86,37 @@ func TestFutureTransform(t *testing.T) {
assert.Equal(t, 3, *res)
}

func TestFutureRecover(t *testing.T) {
func TestFuture_Recover(t *testing.T) {
p1 := NewPromise[int]()
p2 := NewPromise[int]()
go func() {
time.Sleep(10 * time.Millisecond)
p1.Success(util.Ptr(1))
p1.Success(1)
time.Sleep(10 * time.Millisecond)
p2.Failure(errors.New("recover Future failure"))
}()
future := p1.Future().Map(func(_ *int) (*int, error) {
return nil, errors.New("map error")
}).FlatMap(func(_ *int) (Future[int], error) {
future := p1.Future().Map(func(_ int) (int, error) {
return 0, errors.New("map error")
}).FlatMap(func(_ int) (Future[int], error) {
p2 := NewPromise[int]()
p2.Failure(errors.New("flatMap Future failure"))
return p2.Future(), nil
}).FlatMap(func(_ *int) (Future[int], error) {
}).FlatMap(func(_ int) (Future[int], error) {
return nil, errors.New("flatMap error")
}).Recover(func() (*int, error) {
return nil, errors.New("recover error")
}).RecoverWith(p2.Future()).Recover(func() (*int, error) {
return util.Ptr(2), nil
}).Recover(func() (int, error) {
return 0, errors.New("recover error")
}).RecoverWith(p2.Future()).Recover(func() (int, error) {
return 2, nil
})

res, err := future.Join()
assert.Equal(t, 2, *res)
assert.Equal(t, 2, res)
assert.IsNil(t, err)
}

func TestFutureFailure(t *testing.T) {
p1 := NewPromise[int]()
p2 := NewPromise[int]()
func TestFuture_Failure(t *testing.T) {
p1 := NewPromise[*int]()
p2 := NewPromise[*int]()
go func() {
time.Sleep(10 * time.Millisecond)
p1.Failure(errors.New("Future error"))
Expand All @@ -128,11 +128,11 @@ func TestFutureFailure(t *testing.T) {
assert.Equal(t, 2, *res)
}

func TestFutureTimeout(t *testing.T) {
func TestFuture_Timeout(t *testing.T) {
p := NewPromise[bool]()
go func() {
time.Sleep(100 * time.Millisecond)
p.Success(util.Ptr(true))
p.Success(true)
}()
future := p.Future()

Expand All @@ -143,14 +143,14 @@ func TestFutureTimeout(t *testing.T) {
assert.ErrorContains(t, err, "timeout")
}

func TestFutureGoroutineLeak(t *testing.T) {
func TestFuture_GoroutineLeak(t *testing.T) {
var wg sync.WaitGroup

fmt.Println(runtime.NumGoroutine())

numFuture := 100
for i := 0; i < numFuture; i++ {
promise := NewPromise[string]()
promise := NewPromise[*string]()
wg.Add(1)
go func() {
defer wg.Done()
Expand Down
Loading

0 comments on commit 6d948a5

Please sign in to comment.