Skip to content

Commit

Permalink
sciond: Change refresh mechanism to be safer (#3434)
Browse files Browse the repository at this point in the history
Instead of deleting next query entries, just always request at the PS.
This can prevent a weird interaction between concurrent requests, which would lead to 0 cached paths.

Marked as breaking because the behavior changes, i.e., sciond no longer deletes anything in the DB on the refresh flag.

Fixes #2871
Fixes #3416
  • Loading branch information
lukedirtwalker authored Nov 28, 2019
1 parent 595d8d8 commit 3ed3032
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 86 deletions.
2 changes: 1 addition & 1 deletion go/lib/infra/modules/segfetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (cfg FetcherConfig) New() *Fetcher {
return &Fetcher{
Validator: cfg.Validator,
Splitter: cfg.Splitter,
Resolver: NewResolver(cfg.PathDB, cfg.RevCache, !cfg.SciondMode),
Resolver: NewResolver(cfg.PathDB, cfg.RevCache),
Requester: &DefaultRequester{API: cfg.RequestAPI, DstProvider: cfg.DstProvider},
ReplyHandler: &seghandler.Handler{
Verifier: &seghandler.DefaultVerifier{Verifier: cfg.VerificationFactory.NewVerifier()},
Expand Down
3 changes: 3 additions & 0 deletions go/lib/infra/modules/segfetcher/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ type RequestSet struct {
Up Request
Cores Requests
Down Request
// Fetch indicates the request should always be fetched from remote,
// regardless of whether is is cached.
Fetch bool
}

// IsLoaded returns true if all non-zero requests in the set are in state
Expand Down
73 changes: 46 additions & 27 deletions go/lib/infra/modules/segfetcher/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,17 @@ type Resolver interface {
// customized. E.g. a PS could inject a wrapper around GetNextQuery so that it
// always returns that the cache is up to date for segments that should be
// available local.
func NewResolver(DB pathdb.Read, revCache revcache.RevCache, ignoreRevs bool) *DefaultResolver {
func NewResolver(DB pathdb.Read, revCache revcache.RevCache) *DefaultResolver {
return &DefaultResolver{
DB: DB,
RevCache: revCache,
IgnoreRevs: ignoreRevs,
DB: DB,
RevCache: revCache,
}
}

// DefaultResolver is the default resolver implementation.
type DefaultResolver struct {
DB pathdb.Read
RevCache revcache.RevCache
IgnoreRevs bool
DB pathdb.Read
RevCache revcache.RevCache
}

// Resolve resolves a request set. It returns the segments that are locally
Expand Down Expand Up @@ -106,23 +104,27 @@ func (r *DefaultResolver) Resolve(ctx context.Context, segs Segments,
if err != nil {
return segs, req, err
}
coreSegs, filtered, err := r.resultsToSegs(ctx, coreRes)
allRev, err := r.allRevoked(ctx, coreRes)
if err != nil {
return segs, req, err
}
if len(coreSegs) == 0 && filtered > 0 && coreReq.State != Fetched {
if allRev && coreReq.State != Fetched {
req.Cores[i].State = Fetch
} else {
req.Cores[i].State = Loaded
}
segs.Core = append(segs.Core, coreSegs...)
segs.Core = append(segs.Core, coreRes.Segs()...)
}
return segs, req, nil
}

func (r *DefaultResolver) resolveUpSegs(ctx context.Context, segs Segments,
req RequestSet) (Segments, RequestSet, error) {

if req.Fetch && req.Up.State == Unresolved {
req.Up.State = Fetch
return segs, req, nil
}
var err error
segs.Up, req.Up, err = r.resolveSegment(ctx, req.Up, false)
return segs, req, err
Expand All @@ -131,6 +133,10 @@ func (r *DefaultResolver) resolveUpSegs(ctx context.Context, segs Segments,
func (r *DefaultResolver) resolveDownSegs(ctx context.Context, segs Segments,
req RequestSet) (Segments, RequestSet, error) {

if req.Fetch && req.Down.State == Unresolved {
req.Down.State = Fetch
return segs, req, nil
}
var err error
segs.Down, req.Down, err = r.resolveSegment(ctx, req.Down, true)
return segs, req, err
Expand Down Expand Up @@ -160,18 +166,17 @@ func (r *DefaultResolver) resolveSegment(ctx context.Context,
if err != nil {
return nil, req, err
}
segs, filtered, err := r.resultsToSegs(ctx, res)
allRev, err := r.allRevoked(ctx, res)
if err != nil {
return res.Segs(), req, err
}
// because of revocations our cache is empty, so refetch
if len(segs) == 0 && filtered > 0 {
if req.State == Unresolved {
req.State = Fetch
} else {
req.State = Loaded
}
return segs, req, err
if allRev && req.State == Unresolved {
req.State = Fetch
return nil, req, err
}
req.State = Loaded
return segs, req, err
return res.Segs(), req, err
}

func (r *DefaultResolver) needsFetching(ctx context.Context, req Request) (bool, error) {
Expand Down Expand Up @@ -220,11 +225,14 @@ func (r *DefaultResolver) expandNonCoreToCore(segs Segments,
// already resolved
return req.Cores, nil
}
if req.Fetch && coreReq.State == Unresolved {
coreReq.State = Fetch
}
upIAs := segs.Up.FirstIAs()
coreReqs := make([]Request, 0, len(upIAs))
for _, upIA := range upIAs {
if !upIA.Equal(coreReq.Dst) {
coreReqs = append(coreReqs, Request{Src: upIA, Dst: coreReq.Dst})
coreReqs = append(coreReqs, Request{State: coreReq.State, Src: upIA, Dst: coreReq.Dst})
}
}
return coreReqs, nil
Expand All @@ -239,11 +247,15 @@ func (r *DefaultResolver) expandCoreToNonCore(segs Segments,
// already resolved
return req.Cores, nil
}
if req.Fetch && coreReq.State == Unresolved {
coreReq.State = Fetch
}
downIAs := segs.Down.FirstIAs()
coreReqs := make([]Request, 0, len(downIAs))
for _, downIA := range downIAs {
if !downIA.Equal(coreReq.Src) {
coreReqs = append(coreReqs, Request{Src: coreReq.Src, Dst: downIA})
coreReqs = append(coreReqs,
Request{State: coreReq.State, Src: coreReq.Src, Dst: downIA})
}
}
return coreReqs, nil
Expand All @@ -262,13 +274,16 @@ func (r *DefaultResolver) expandNonCoreToNonCore(segs Segments,
return nil, serrors.WithCtx(ErrInvalidRequest,
"req", req, "reason", "Core either src & dst should be wildcard or none.")
}
if req.Fetch && coreReq.State == Unresolved {
coreReq.State = Fetch
}
upIAs := segs.Up.FirstIAs()
downIAs := segs.Down.FirstIAs()
var coreReqs []Request
for _, upIA := range upIAs {
for _, downIA := range downIAs {
if !upIA.Equal(downIA) {
coreReqs = append(coreReqs, Request{Src: upIA, Dst: downIA})
coreReqs = append(coreReqs, Request{State: coreReq.State, Src: upIA, Dst: downIA})
}
}
}
Expand All @@ -282,7 +297,11 @@ func (r *DefaultResolver) resolveCores(ctx context.Context,
needsFetching := make(map[Request]bool)
for i, coreReq := range req.Cores {
if coreReq.State == Fetched {
needsFetching[coreReq] = false
req.Cores[i].State = Cached
continue
}
if coreReq.State == Fetch {
continue
}
coreFetch, ok := needsFetching[coreReq]
if !ok {
Expand All @@ -301,17 +320,17 @@ func (r *DefaultResolver) resolveCores(ctx context.Context,
return req.Cores, nil
}

func (r *DefaultResolver) resultsToSegs(ctx context.Context,
results query.Results) (seg.Segments, int, error) {
func (r *DefaultResolver) allRevoked(ctx context.Context,
results query.Results) (bool, error) {

segs := results.Segs()
filtered, err := segs.FilterSegsErr(func(ps *seg.PathSegment) (bool, error) {
return revcache.NoRevokedHopIntf(ctx, r.RevCache, ps)
})
if err != nil {
return nil, 0, err
return false, err
}
return segs, filtered, nil
return len(segs) == 0 && filtered > 0, nil
}

func zeroUpDownSegsCached(r RequestSet, segs Segments) bool {
Expand Down
75 changes: 67 additions & 8 deletions go/lib/infra/modules/segfetcher/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (rt resolverTest) run(t *testing.T) {
} else {
revCache.EXPECT().Get(gomock.Any(), gomock.Any()).AnyTimes()
}
resolver := segfetcher.NewResolver(db, revCache, false)
resolver := segfetcher.NewResolver(db, revCache)
segs, remainingReqs, err := resolver.Resolve(context.Background(), rt.Segs, rt.Req)
assert.Equal(t, rt.ExpectedSegments, segs)
assert.Equal(t, rt.ExpectedReqSet, remainingReqs)
Expand Down Expand Up @@ -693,6 +693,67 @@ func TestResolver(t *testing.T) {
}
}

func TestResolverCacheBypass(t *testing.T) {
rootCtrl := gomock.NewController(t)
defer rootCtrl.Finish()
tg := newTestGraph(rootCtrl)
// futureT := time.Now().Add(2 * time.Minute)

tests := map[string]resolverTest{
"Up(cache-bypass) Core Down(cache-bypass)": {
Req: segfetcher.RequestSet{
Up: segfetcher.Request{Src: non_core_211, Dst: isd2},
Cores: []segfetcher.Request{{Src: isd2, Dst: isd1}},
Down: segfetcher.Request{Src: isd1, Dst: non_core_111},
Fetch: true,
},
ExpectCalls: func(db *mock_pathdb.MockPathDB) {},
ExpectedSegments: segfetcher.Segments{},
ExpectedReqSet: segfetcher.RequestSet{
Up: segfetcher.Request{Src: non_core_211, Dst: isd2, State: segfetcher.Fetch},
Cores: []segfetcher.Request{{Src: isd2, Dst: isd1}},
Down: segfetcher.Request{Src: isd1, Dst: non_core_111, State: segfetcher.Fetch},
Fetch: true,
},
},
"Up(fetched) Core Down(fetched)": {
Req: segfetcher.RequestSet{
Up: segfetcher.Request{State: segfetcher.Fetched, Src: non_core_211, Dst: isd2},
Cores: []segfetcher.Request{{Src: isd2, Dst: isd1}},
Down: segfetcher.Request{State: segfetcher.Fetched, Src: isd1, Dst: non_core_111},
Fetch: true,
},
ExpectCalls: func(db *mock_pathdb.MockPathDB) {
// cached up segments
db.EXPECT().Get(gomock.Any(), matchers.EqParams(&query.Params{
SegTypes: []proto.PathSegType{proto.PathSegType_up},
StartsAt: []addr.IA{isd2}, EndsAt: []addr.IA{non_core_211},
})).Return(resultsFromSegs(tg.seg210_211), nil)
db.EXPECT().Get(gomock.Any(), matchers.EqParams(&query.Params{
SegTypes: []proto.PathSegType{proto.PathSegType_down},
StartsAt: []addr.IA{isd1}, EndsAt: []addr.IA{non_core_111},
})).Return(resultsFromSegs(tg.seg120_111, tg.seg130_111), nil)
},
ExpectedSegments: segfetcher.Segments{
Up: seg.Segments{tg.seg210_211},
Down: seg.Segments{tg.seg120_111, tg.seg130_111},
},
ExpectedReqSet: segfetcher.RequestSet{
Up: segfetcher.Request{Src: non_core_211, Dst: isd2, State: segfetcher.Loaded},
Cores: []segfetcher.Request{
{Src: core_210, Dst: core_120, State: segfetcher.Fetch},
{Src: core_210, Dst: core_130, State: segfetcher.Fetch},
},
Down: segfetcher.Request{Src: isd1, Dst: non_core_111, State: segfetcher.Loaded},
Fetch: true,
},
},
}
for name, test := range tests {
t.Run(name, test.run)
}
}

func TestResolverWithRevocations(t *testing.T) {
rootCtrl := gomock.NewController(t)
defer rootCtrl.Finish()
Expand Down Expand Up @@ -727,14 +788,14 @@ func TestResolverWithRevocations(t *testing.T) {
revoke(t, revCache, key111_130)
revCache.EXPECT().Get(gomock.Any(), gomock.Any()).AnyTimes()
},
ExpectedSegments: segfetcher.Segments{
Up: seg.Segments{},
},
// On the initial fetch, if everything is revoked, just try again
// and fetch it.
ExpectedSegments: segfetcher.Segments{},
ExpectedReqSet: segfetcher.RequestSet{
Up: segfetcher.Request{Src: non_core_111, Dst: isd1, State: segfetcher.Fetch},
},
},
"Core (cached) with revocations returns partial result": {
"Core (cached) with revocations returns full result": {
Req: segfetcher.RequestSet{
Cores: []segfetcher.Request{
{Src: core_210, Dst: core_110},
Expand All @@ -756,8 +817,6 @@ func TestResolverWithRevocations(t *testing.T) {
// Other calls return 0
db.EXPECT().Get(gomock.Any(), gomock.Any()).Times(2)
},
// Revoke the shorter path via 110, so it should only return the
// longer path via 120.
ExpectRevcache: func(t *testing.T, revCache *mock_revcache.MockRevCache) {
key110 := revcache.Key{IA: core_110, IfId: graph.If_110_X_130_A}
ksMatcher := keySetContains{keys: []revcache.Key{key110}}
Expand All @@ -769,7 +828,7 @@ func TestResolverWithRevocations(t *testing.T) {
revCache.EXPECT().Get(gomock.Any(), gomock.Any()).AnyTimes()
},
ExpectedSegments: segfetcher.Segments{
Core: seg.Segments{tg.seg210_130_2},
Core: seg.Segments{tg.seg210_130, tg.seg210_130_2},
},
ExpectedReqSet: segfetcher.RequestSet{
Cores: []segfetcher.Request{
Expand Down
1 change: 0 additions & 1 deletion go/sciond/internal/fetcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ go_library(
"//go/lib/infra/modules/segfetcher:go_default_library",
"//go/lib/log:go_default_library",
"//go/lib/pathdb:go_default_library",
"//go/lib/pathdb/query:go_default_library",
"//go/lib/pathpol:go_default_library",
"//go/lib/revcache:go_default_library",
"//go/lib/sciond:go_default_library",
Expand Down
Loading

0 comments on commit 3ed3032

Please sign in to comment.