Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: zero timeout on composed routers should disable timeout #72

Merged
merged 1 commit into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions compparallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@ func (r *composableParallel) Bootstrap(ctx context.Context) error {
})
}

func withCancelAndOptionalTimeout(ctx context.Context, timeout time.Duration) (context.Context, context.CancelFunc) {
if timeout != 0 {
return context.WithTimeout(ctx, timeout)
}
return context.WithCancel(ctx)
}

func getValueOrErrorParallel[T any](
ctx context.Context,
routers []*ParallelRouter,
Expand All @@ -177,7 +184,7 @@ func getValueOrErrorParallel[T any](
select {
case <-ctx.Done():
case <-tim.C:
ctx, cancel := context.WithTimeout(ctx, r.Timeout)
ctx, cancel := withCancelAndOptionalTimeout(ctx, r.Timeout)
defer cancel()
value, empty, err := f(ctx, r.Router)
if err != nil &&
Expand Down Expand Up @@ -269,8 +276,9 @@ func executeParallel(
errCh <- ctx.Err()
}
case <-tim.C:
ctx, cancel := context.WithTimeout(ctx, r.Timeout)
ctx, cancel := withCancelAndOptionalTimeout(ctx, r.Timeout)
defer cancel()

log.Debug("executeParallel: calling router function for router ", r.Router,
" with timeout ", r.Timeout,
" and ignore errors ", r.IgnoreError,
Expand Down Expand Up @@ -335,8 +343,9 @@ func getChannelOrErrorParallel[T any](
)
return
case <-tim.C:
ctx, cancel := context.WithTimeout(ctx, r.Timeout)
ctx, cancel := withCancelAndOptionalTimeout(ctx, r.Timeout)
defer cancel()

log.Debug("getChannelOrErrorParallel: calling router function for router ", r.Router,
" with timeout ", r.Timeout,
" and ignore errors ", r.IgnoreError,
Expand Down
22 changes: 22 additions & 0 deletions compparallel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,28 @@ func TestComposableParallelFixtures(t *testing.T) {
}},
SearchValue: []searchValueFixture{{key: "a", ctx: canceledCtx, err: context.Canceled}},
},
{
Name: "timeout=0 should disable the timeout, two routers with one disabled timeout should timeout on the other router",
routers: []*ParallelRouter{
{
Timeout: 0,
IgnoreError: false,
Router: &Compose{
ValueStore: newDummyValueStore(t, nil, nil),
},
},
{
Timeout: time.Second,
IgnoreError: false,
Router: &Compose{
ValueStore: newDummyValueStore(t, []string{"a"}, []string{"av"}),
},
},
},
GetValue: []getValueFixture{
{key: "/wait/100ms/a", value: "av", searchValCount: 1},
},
},
}

for _, f := range fixtures {
Expand Down
17 changes: 8 additions & 9 deletions compsequential.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,9 @@ func getValueOrErrorSequential[T any](
return value, ctxErr
}

ctx, cancel := context.WithTimeout(ctx, router.Timeout)
ctx, cancel := withCancelAndOptionalTimeout(ctx, router.Timeout)
defer cancel()

value, empty, err := f(ctx, router.Router)
if err != nil &&
!errors.Is(err, routing.ErrNotFound) &&
Expand All @@ -184,14 +185,15 @@ func executeSequential(
if ctxErr := ctx.Err(); ctxErr != nil {
return ctxErr
}
ctx, cancel := context.WithTimeout(ctx, router.Timeout)

ctx, cancel := withCancelAndOptionalTimeout(ctx, router.Timeout)
defer cancel()

if err := f(ctx, router.Router); err != nil &&
!errors.Is(err, routing.ErrNotFound) &&
!router.IgnoreError {
cancel()
return err
}
cancel()
}

return nil
Expand All @@ -211,13 +213,12 @@ func getChannelOrErrorSequential[T any](
close(chanOut)
return
}

ctx, cancel := context.WithTimeout(ctx, router.Timeout)
ctx, cancel := withCancelAndOptionalTimeout(ctx, router.Timeout)
defer cancel()
rch, err := f(ctx, router.Router)
if err != nil &&
!errors.Is(err, routing.ErrNotFound) &&
!router.IgnoreError {
cancel()
break
}

Expand All @@ -238,8 +239,6 @@ func getChannelOrErrorSequential[T any](

}
}

cancel()
}

close(chanOut)
Expand Down
132 changes: 54 additions & 78 deletions compsequential_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,27 +41,31 @@ func TestNoResultsSequential(t *testing.T) {
}

func TestComposableSequentialFixtures(t *testing.T) {
type getValueFixture struct {
err error
key string
value string
searchValCount int
}
type putValueFixture struct {
err error
key string
value string
}
type provideFixture struct {
err error
}
type findPeerFixture struct {
peerID string
err error
}
fixtures := []struct {
Name string
routers []*SequentialRouter
GetValueFixtures []struct {
err error
key string
value string
searchValCount int
}
PutValueFixtures []struct {
err error
key string
value string
}
ProvideFixtures []struct {
err error
}
FindPeerFixtures []struct {
peerID string
err error
}
GetValueFixtures []getValueFixture
PutValueFixtures []putValueFixture
ProvideFixtures []provideFixture
FindPeerFixtures []findPeerFixture
}{
{
Name: "simple two routers",
Expand All @@ -85,12 +89,7 @@ func TestComposableSequentialFixtures(t *testing.T) {
},
},
},
GetValueFixtures: []struct {
err error
key string
value string
searchValCount int
}{
GetValueFixtures: []getValueFixture{
{
key: "d",
value: "dv",
Expand All @@ -102,11 +101,7 @@ func TestComposableSequentialFixtures(t *testing.T) {
searchValCount: 2,
},
},
PutValueFixtures: []struct {
err error
key string
value string
}{
PutValueFixtures: []putValueFixture{
{
err: errors.New("a"),
key: "/error/a",
Expand All @@ -117,17 +112,12 @@ func TestComposableSequentialFixtures(t *testing.T) {
value: "a",
},
},
ProvideFixtures: []struct {
err error
}{
ProvideFixtures: []provideFixture{
{
err: routing.ErrNotSupported,
},
},
FindPeerFixtures: []struct {
peerID string
err error
}{
FindPeerFixtures: []findPeerFixture{
{
peerID: "pid1",
},
Expand Down Expand Up @@ -158,12 +148,7 @@ func TestComposableSequentialFixtures(t *testing.T) {
},
},
},
GetValueFixtures: []struct {
err error
key string
value string
searchValCount int
}{
GetValueFixtures: []getValueFixture{
{
key: "d",
value: "dv",
Expand All @@ -174,11 +159,7 @@ func TestComposableSequentialFixtures(t *testing.T) {
key: "a",
},
},
PutValueFixtures: []struct {
err error
key string
value string
}{
PutValueFixtures: []putValueFixture{
{
key: "/error/x",
value: "xv",
Expand All @@ -188,10 +169,7 @@ func TestComposableSequentialFixtures(t *testing.T) {
value: "yv",
},
},
FindPeerFixtures: []struct {
peerID string
err error
}{
FindPeerFixtures: []findPeerFixture{
{
peerID: "pid1",
},
Expand Down Expand Up @@ -223,12 +201,7 @@ func TestComposableSequentialFixtures(t *testing.T) {
},
},
},
GetValueFixtures: []struct {
err error
key string
value string
searchValCount int
}{
GetValueFixtures: []getValueFixture{
{
key: "d",
value: "dv",
Expand All @@ -248,11 +221,7 @@ func TestComposableSequentialFixtures(t *testing.T) {
key: "/error/y",
},
},
PutValueFixtures: []struct {
err error
key string
value string
}{
PutValueFixtures: []putValueFixture{
{
key: "/error/x",
value: "xv",
Expand All @@ -262,10 +231,7 @@ func TestComposableSequentialFixtures(t *testing.T) {
value: "yv",
},
},
FindPeerFixtures: []struct {
peerID string
err error
}{
FindPeerFixtures: []findPeerFixture{
{
peerID: "pid1",
},
Expand Down Expand Up @@ -297,12 +263,7 @@ func TestComposableSequentialFixtures(t *testing.T) {
},
},
},
GetValueFixtures: []struct {
err error
key string
value string
searchValCount int
}{
GetValueFixtures: []getValueFixture{
{
err: errFailValue,
key: "d",
Expand Down Expand Up @@ -337,12 +298,7 @@ func TestComposableSequentialFixtures(t *testing.T) {
},
},
},
GetValueFixtures: []struct {
err error
key string
value string
searchValCount int
}{
GetValueFixtures: []getValueFixture{
{
key: "d",
value: "dv",
Expand All @@ -355,6 +311,26 @@ func TestComposableSequentialFixtures(t *testing.T) {
},
},
},
{
Name: "timeout=0 should disable the timeout, two routers with one disabled timeout should timeout on the other router",
routers: []*SequentialRouter{
{
Timeout: 0,
IgnoreError: false,
Router: &Compose{
ValueStore: newDummyValueStore(t, nil, nil),
},
},
{
Timeout: time.Minute,
IgnoreError: false,
Router: &Compose{
ValueStore: newDummyValueStore(t, []string{"a"}, []string{"av"}),
},
},
},
GetValueFixtures: []getValueFixture{{key: "/wait/100ms/a", value: "av", searchValCount: 1}},
},
}

for _, f := range fixtures {
Expand Down
22 changes: 22 additions & 0 deletions dummy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package routinghelpers
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -60,6 +62,26 @@ func (d *dummyValueStore) GetValue(ctx context.Context, key string, opts ...rout
<-ctx.Done()
return nil, ctx.Err()
}
// format: /wait/10s/key
// this will wait for the given duration and then perform the lookup normally on key,
// short circuiting if the context closes
if strings.HasPrefix(key, "/wait/") {
durationAndKey := strings.TrimPrefix(key, "/wait/")
split := strings.Split(durationAndKey, "/")
durationStr, key := split[0], split[1]
duration, err := time.ParseDuration(durationStr)
if err != nil {
return nil, fmt.Errorf("parsing wait duration: %w", err)
}
timer := time.NewTimer(duration)
defer timer.Stop()
select {
case <-timer.C:
return d.GetValue(ctx, key, opts...)
case <-ctx.Done():
return nil, ctx.Err()
}
}
if v, ok := (*sync.Map)(d).Load(key); ok {
return v.([]byte), nil
}
Expand Down