diff --git a/go/lib/infra/modules/segfetcher/fetcher.go b/go/lib/infra/modules/segfetcher/fetcher.go index 57aed9b244..2b2a57b52f 100644 --- a/go/lib/infra/modules/segfetcher/fetcher.go +++ b/go/lib/infra/modules/segfetcher/fetcher.go @@ -122,7 +122,7 @@ func (f *Fetcher) FetchSegs(ctx context.Context, req Request) (Segments, error) } log.FromCtx(ctx).Trace("After resolving", "req", reqSet, "segs", segs, "iteration", i+1) - if reqSet.IsEmpty() { + if reqSet.IsLoaded() { break } // in 3 iteration (i==2) everything should be resolved. @@ -138,7 +138,7 @@ func (f *Fetcher) FetchSegs(ctx context.Context, req Request) (Segments, error) reqCtx, cancelF := context.WithTimeout(ctx, 3*time.Second) replies := f.Requester.Request(reqCtx, reqSet) // TODO(lukedirtwalker): We need to have early trigger for the last request. - if err := f.waitOnProcessed(ctx, replies); err != nil { + if reqSet, err = f.waitOnProcessed(ctx, replies, reqSet); err != nil { cancelF() return Segments{}, err } @@ -147,12 +147,14 @@ func (f *Fetcher) FetchSegs(ctx context.Context, req Request) (Segments, error) return segs, nil } -func (f *Fetcher) waitOnProcessed(ctx context.Context, replies <-chan ReplyOrErr) error { +func (f *Fetcher) waitOnProcessed(ctx context.Context, replies <-chan ReplyOrErr, + reqSet RequestSet) (RequestSet, error) { + logger := log.FromCtx(ctx) for reply := range replies { // TODO(lukedirtwalker): Should we do this in go routines? if reply.Err != nil { - return reply.Err + return reqSet, reply.Err } if reply.Reply == nil || reply.Reply.Recs == nil { continue @@ -161,29 +163,30 @@ func (f *Fetcher) waitOnProcessed(ctx context.Context, replies <-chan ReplyOrErr select { case <-r.FullReplyProcessed(): if err := r.Err(); err != nil { - return err + return reqSet, err + } + // TODO(lukedirtwalker): move state update to separate func + if reqSet.Up.EqualAddr(reply.Req) { + reqSet.Up.State = Fetched + } else if reqSet.Down.EqualAddr(reply.Req) { + reqSet.Down.State = Fetched + } else { + for i, coreReq := range reqSet.Cores { + if coreReq.EqualAddr(reply.Req) { + reqSet.Cores[i].State = Fetched + } + } } - // for _, rev := range r.Stats().VerifiedRevs { - // revInfo, err := rev.RevInfo() - // if err != nil { - // logger.Warn("Failed to extract rev info from verified rev", - // "err", err, "rev", rev) - // continue - // } - // // TODO(lukedirtwalker): collect all revInfos and delete only - // // once. - // f.NextQueryCleaner.ResetQueryCache(ctx, revInfo) - // } nextQuery := f.nextQuery(r.Stats().VerifiedSegs) _, err := f.PathDB.InsertNextQuery(ctx, reply.Req.Src, reply.Req.Dst, nil, nextQuery) if err != nil { logger.Warn("Failed to insert next query", "err", err) } case <-ctx.Done(): - return ctx.Err() + return reqSet, ctx.Err() } } - return nil + return reqSet, nil } func (f *Fetcher) verifyServer(reply ReplyOrErr) net.Addr { diff --git a/go/lib/infra/modules/segfetcher/request.go b/go/lib/infra/modules/segfetcher/request.go index ca40fadc8b..a89b2150a9 100644 --- a/go/lib/infra/modules/segfetcher/request.go +++ b/go/lib/infra/modules/segfetcher/request.go @@ -19,10 +19,28 @@ import ( "github.com/scionproto/scion/go/lib/ctrl/path_mgmt" ) +// RequestState is the state the request is in. +type RequestState int + +const ( + // Unresolved means the request is not yet resolved. + Unresolved RequestState = iota + // Fetch means the request needs to be fetched. + Fetch + // Cached means the request should be cached locally and can be loaded from + // DB. + Cached + // Fetched means the request has been fetched and should be in the DB. + Fetched + // Loaded means the request has been loaded from the DB. + Loaded +) + // Request represents a path or segment request. type Request struct { - Src addr.IA - Dst addr.IA + Src addr.IA + Dst addr.IA + State RequestState } // IsZero returns whether the request is empty. @@ -38,6 +56,11 @@ func (r Request) ToSegReq() *path_mgmt.SegReq { } } +// EqualAddr returns whether the two request refer to the same src/dst. +func (r Request) EqualAddr(other Request) bool { + return r.Src.Equal(other.Src) && r.Dst.Equal(other.Dst) +} + // RequestSet is a set of requests. type RequestSet struct { Up Request @@ -45,9 +68,12 @@ type RequestSet struct { Down Request } -// IsEmpty returns whether the request set is empty. -func (r RequestSet) IsEmpty() bool { - return r.Up.IsZero() && r.Cores.IsEmpty() && r.Down.IsZero() +// IsLoaded returns true if all non-zero requests in the set are in state +// loaded. +func (r RequestSet) IsLoaded() bool { + return (r.Up.IsZero() || r.Up.State == Loaded) && + (r.Down.IsZero() || r.Down.State == Loaded) && + r.Cores.AllLoaded() } // Requests is a list of requests and provides some convenience methods on top @@ -69,6 +95,16 @@ func (r Requests) IsEmpty() bool { return len(r) == 0 } +// AllLoaded returns whether all entries in request have state loaded. +func (r Requests) AllLoaded() bool { + for _, req := range r { + if req.State != Loaded { + return false + } + } + return true +} + func (r Requests) extractIAs(extract func(Request) addr.IA) []addr.IA { set := make(map[addr.IA]struct{}) for _, req := range r { diff --git a/go/lib/infra/modules/segfetcher/requester.go b/go/lib/infra/modules/segfetcher/requester.go index 8ff5574865..c7a76c3f9e 100644 --- a/go/lib/infra/modules/segfetcher/requester.go +++ b/go/lib/infra/modules/segfetcher/requester.go @@ -54,21 +54,16 @@ type DefaultRequester struct { DstProvider DstProvider } -// Request the missing segments from the remote. Note that this might only -// fetch a part of the full request set, i.e. if up or down segments are set, -// cores are not yet fetched, assuming the cores are not resolved. +// Request all requests in the request set that are in fetch state. func (r *DefaultRequester) Request(ctx context.Context, req RequestSet) <-chan ReplyOrErr { - switch { - case req.Up.IsZero() && req.Down.IsZero(): - // only cores to fetch - return r.fetchReqs(ctx, req.Cores) - case req.Up.IsZero() && !req.Down.IsZero(): - return r.fetchReqs(ctx, Requests{req.Down}) - case !req.Up.IsZero() && req.Down.IsZero(): - return r.fetchReqs(ctx, Requests{req.Up}) - default: - return r.fetchReqs(ctx, Requests{req.Up, req.Down}) + var reqs Requests + allReqs := append(Requests{req.Up, req.Down}, req.Cores...) + for _, req := range allReqs { + if req.State == Fetch { + reqs = append(reqs, req) + } } + return r.fetchReqs(ctx, reqs) } func (r *DefaultRequester) fetchReqs(ctx context.Context, reqs Requests) <-chan ReplyOrErr { diff --git a/go/lib/infra/modules/segfetcher/requester_test.go b/go/lib/infra/modules/segfetcher/requester_test.go index 0d5361dca7..1f132f66d9 100644 --- a/go/lib/infra/modules/segfetcher/requester_test.go +++ b/go/lib/infra/modules/segfetcher/requester_test.go @@ -45,15 +45,15 @@ var ( non_core_211 = xtest.MustParseIA("2-ff00:0:211") non_core_212 = xtest.MustParseIA("2-ff00:0:212") - req_111_1 = segfetcher.Request{Src: non_core_111, Dst: isd1} - req_1_111 = segfetcher.Request{Src: isd1, Dst: non_core_111} + req_111_1 = segfetcher.Request{Src: non_core_111, Dst: isd1, State: segfetcher.Fetch} + req_1_111 = segfetcher.Request{Src: isd1, Dst: non_core_111, State: segfetcher.Fetch} req_1_2 = segfetcher.Request{Src: isd1, Dst: isd2} req_1_210 = segfetcher.Request{Src: isd1, Dst: core_210} - req_2_211 = segfetcher.Request{Src: isd2, Dst: non_core_211} + req_2_211 = segfetcher.Request{Src: isd2, Dst: non_core_211, State: segfetcher.Fetch} req_210_1 = segfetcher.Request{Src: core_210, Dst: isd1} - req_210_110 = segfetcher.Request{Src: core_210, Dst: core_110} - req_210_120 = segfetcher.Request{Src: core_210, Dst: core_120} - req_210_130 = segfetcher.Request{Src: core_210, Dst: core_130} + req_210_110 = segfetcher.Request{Src: core_210, Dst: core_110, State: segfetcher.Fetch} + req_210_120 = segfetcher.Request{Src: core_210, Dst: core_120, State: segfetcher.Fetch} + req_210_130 = segfetcher.Request{Src: core_210, Dst: core_130, State: segfetcher.Fetch} ) func TestRequester(t *testing.T) { @@ -85,9 +85,7 @@ func TestRequester(t *testing.T) { } api.EXPECT().GetSegs(gomock.Any(), gomock.Eq(req), gomock.Any(), gomock.Any()). Return(reply, nil) - return []segfetcher.ReplyOrErr{ - {Req: segfetcher.Request{Src: non_core_111, Dst: isd1}, Reply: reply}, - } + return []segfetcher.ReplyOrErr{{Req: req_111_1, Reply: reply}} }, }, "Down only": { diff --git a/go/lib/infra/modules/segfetcher/resolver.go b/go/lib/infra/modules/segfetcher/resolver.go index befdba9b2a..a4c5ec88e8 100644 --- a/go/lib/infra/modules/segfetcher/resolver.go +++ b/go/lib/infra/modules/segfetcher/resolver.go @@ -62,19 +62,21 @@ func (r *DefaultResolver) Resolve(ctx context.Context, segs Segments, req RequestSet) (Segments, RequestSet, error) { var err error - if !req.Up.IsZero() { + if !req.Up.IsZero() && (req.Up.State == Unresolved || req.Up.State == Fetched) { if segs, req, err = r.resolveUpSegs(ctx, segs, req); err != nil { return segs, req, err } } - if !req.Down.IsZero() { + if !req.Down.IsZero() && (req.Down.State == Unresolved || req.Down.State == Fetched) { if segs, req, err = r.resolveDownSegs(ctx, segs, req); err != nil { return segs, req, err } } // If there are still up or down segments to request, or if there are no // core segments no more action can be done here. - if !req.Up.IsZero() || !req.Down.IsZero() || req.Cores.IsEmpty() { + if (!req.Up.IsZero() && req.Up.State != Loaded) || + (!req.Down.IsZero() && req.Down.State != Loaded) || + req.Cores.IsEmpty() { return segs, req, nil } // now resolve core segs: @@ -82,26 +84,34 @@ func (r *DefaultResolver) Resolve(ctx context.Context, segs Segments, if err != nil { return segs, req, err } - var cachedReqs Requests - if req.Cores, cachedReqs, err = r.resolveCores(ctx, req); err != nil { + if req.Cores, err = r.resolveCores(ctx, req); err != nil { return segs, req, err } if len(req.Cores) == 0 { req.Cores = nil } - if len(cachedReqs) > 0 { + for i, coreReq := range req.Cores { + if coreReq.State != Cached && coreReq.State != Fetched { + continue + } coreRes, err := r.DB.Get(ctx, &query.Params{ - StartsAt: cachedReqs.DstIAs(), - EndsAt: cachedReqs.SrcIAs(), + StartsAt: []addr.IA{coreReq.Dst}, + EndsAt: []addr.IA{coreReq.Src}, SegTypes: []proto.PathSegType{proto.PathSegType_core}, }) if err != nil { return segs, req, err } - segs.Core, err = r.resultsToSegs(ctx, coreRes) + coreSegs, filtered, err := r.resultsToSegs(ctx, coreRes) if err != nil { return segs, req, err } + if len(coreSegs) == 0 && filtered > 0 && coreReq.State != Fetched { + req.Cores[i].State = Fetch + } else { + req.Cores[i].State = Loaded + } + segs.Core = append(segs.Core, coreSegs...) } return segs, req, nil } @@ -125,9 +135,12 @@ func (r *DefaultResolver) resolveDownSegs(ctx context.Context, segs Segments, func (r *DefaultResolver) resolveSegment(ctx context.Context, req Request, consDir bool) (seg.Segments, Request, error) { - fetch, err := r.needsFetching(ctx, req) - if err != nil || fetch { - return nil, req, err + if req.State == Unresolved { + fetch, err := r.needsFetching(ctx, req) + if err != nil || fetch { + req.State = Fetch + return nil, req, err + } } start, end := req.Src, req.Dst segType := proto.PathSegType_down @@ -143,8 +156,18 @@ func (r *DefaultResolver) resolveSegment(ctx context.Context, if err != nil { return nil, req, err } - segs, err := r.resultsToSegs(ctx, res) - return segs, Request{}, err + segs, filtered, err := r.resultsToSegs(ctx, res) + // because of revocations our cache is empty, so refetch + if len(segs) == 0 && filtered > 0 { + if req.State == Unresolved { + req.State = Fetch + } else { + req.State = Loaded + } + return segs, req, err + } + req.State = Loaded + return segs, req, err } func (r *DefaultResolver) needsFetching(ctx context.Context, req Request) (bool, error) { @@ -248,44 +271,41 @@ func (r *DefaultResolver) expandNonCoreToNonCore(segs Segments, return coreReqs, nil } -// resolveCores returns cores that need to be requested and the ones which are -// already cached. +// resolveCores returns cores requests classified with a state. func (r *DefaultResolver) resolveCores(ctx context.Context, - req RequestSet) (Requests, Requests, error) { + req RequestSet) (Requests, error) { - var cachedReqs Requests - remainingCores := req.Cores[:0] needsFetching := make(map[Request]bool) - for _, coreReq := range req.Cores { + for i, coreReq := range req.Cores { + if coreReq.State == Fetched { + needsFetching[coreReq] = false + } coreFetch, ok := needsFetching[coreReq] if !ok { var err error if coreFetch, err = r.needsFetching(ctx, coreReq); err != nil { - return remainingCores, cachedReqs, err + return req.Cores, err } needsFetching[coreReq] = coreFetch } if coreFetch { - remainingCores = append(remainingCores, coreReq) + req.Cores[i].State = Fetch } else { - cachedReqs = append(cachedReqs, coreReq) + req.Cores[i].State = Cached } } - return remainingCores, cachedReqs, nil + return req.Cores, nil } func (r *DefaultResolver) resultsToSegs(ctx context.Context, - results query.Results) (seg.Segments, error) { + results query.Results) (seg.Segments, int, error) { segs := results.Segs() - if r.IgnoreRevs { - return segs, nil - } - _, err := segs.FilterSegsErr(func(ps *seg.PathSegment) (bool, error) { + filtered, err := segs.FilterSegsErr(func(ps *seg.PathSegment) (bool, error) { return revcache.NoRevokedHopIntf(ctx, r.RevCache, ps) }) if err != nil { - return nil, err + return nil, 0, err } - return segs, nil + return segs, filtered, nil } diff --git a/go/lib/infra/modules/segfetcher/resolver_test.go b/go/lib/infra/modules/segfetcher/resolver_test.go index ead221f9a8..30ce78bd22 100644 --- a/go/lib/infra/modules/segfetcher/resolver_test.go +++ b/go/lib/infra/modules/segfetcher/resolver_test.go @@ -88,13 +88,13 @@ func (rt resolverTest) run(t *testing.T) { db := mock_pathdb.NewMockPathDB(ctrl) rt.ExpectCalls(db) - var revCache revcache.RevCache + revCache := mock_revcache.NewMockRevCache(ctrl) if rt.ExpectRevcache != nil { - mRevCache := mock_revcache.NewMockRevCache(ctrl) - rt.ExpectRevcache(t, mRevCache) - revCache = mRevCache + rt.ExpectRevcache(t, revCache) + } else { + revCache.EXPECT().Get(gomock.Any(), gomock.Any()).AnyTimes() } - resolver := segfetcher.NewResolver(db, revCache, revCache == nil) + resolver := segfetcher.NewResolver(db, revCache, false) segs, remainingReqs, err := resolver.Resolve(context.Background(), rt.Segs, rt.Req) assert.Equal(t, rt.ExpectedSegments, segs) assert.Equal(t, rt.ExpectedReqSet, remainingReqs) @@ -118,7 +118,7 @@ func TestResolver(t *testing.T) { gomock.Eq(isd1), gomock.Any()) }, ExpectedReqSet: segfetcher.RequestSet{ - Up: segfetcher.Request{Src: non_core_111, Dst: isd1}, + Up: segfetcher.Request{Src: non_core_111, Dst: isd1, State: segfetcher.Fetch}, }, }, "Up wildcard (cached)": { @@ -137,6 +137,26 @@ func TestResolver(t *testing.T) { ExpectedSegments: segfetcher.Segments{ Up: seg.Segments{tg.seg120_111, tg.seg130_111}, }, + ExpectedReqSet: segfetcher.RequestSet{ + Up: segfetcher.Request{Src: non_core_111, Dst: isd1, State: segfetcher.Loaded}, + }, + }, + "Up wildcard (fetched)": { + Req: segfetcher.RequestSet{ + Up: segfetcher.Request{Src: non_core_111, Dst: isd1, State: segfetcher.Fetched}, + }, + ExpectCalls: func(db *mock_pathdb.MockPathDB) { + db.EXPECT().Get(gomock.Any(), matchers.EqParams(&query.Params{ + SegTypes: []proto.PathSegType{proto.PathSegType_up}, + StartsAt: []addr.IA{isd1}, EndsAt: []addr.IA{non_core_111}, + })).Return(resultsFromSegs(tg.seg120_111, tg.seg130_111), nil) + }, + ExpectedSegments: segfetcher.Segments{ + Up: seg.Segments{tg.seg120_111, tg.seg130_111}, + }, + ExpectedReqSet: segfetcher.RequestSet{ + Up: segfetcher.Request{Src: non_core_111, Dst: isd1, State: segfetcher.Loaded}, + }, }, "Up Core": { Req: segfetcher.RequestSet{ @@ -149,8 +169,10 @@ func TestResolver(t *testing.T) { gomock.Eq(isd1), gomock.Any()) }, ExpectedReqSet: segfetcher.RequestSet{ - Up: segfetcher.Request{Src: non_core_111, Dst: isd1}, - Cores: []segfetcher.Request{{Src: isd1, Dst: core_110}}, + Up: segfetcher.Request{Src: non_core_111, Dst: isd1, State: segfetcher.Fetch}, + Cores: []segfetcher.Request{ + {Src: isd1, Dst: core_110, State: segfetcher.Unresolved}, + }, }, }, "Up(cached) Core": { @@ -174,9 +196,10 @@ func TestResolver(t *testing.T) { }, ExpectedSegments: segfetcher.Segments{Up: seg.Segments{tg.seg120_111, tg.seg130_111}}, ExpectedReqSet: segfetcher.RequestSet{ + Up: segfetcher.Request{Src: non_core_111, Dst: isd1, State: segfetcher.Loaded}, Cores: []segfetcher.Request{ - {Src: core_120, Dst: core_110}, - {Src: core_130, Dst: core_110}, + {Src: core_120, Dst: core_110, State: segfetcher.Fetch}, + {Src: core_130, Dst: core_110, State: segfetcher.Fetch}, }, }, }, @@ -200,16 +223,62 @@ func TestResolver(t *testing.T) { gomock.Eq(core_110), gomock.Any()).Return(futureT, nil) db.EXPECT().Get(gomock.Any(), matchers.EqParams(&query.Params{ SegTypes: []proto.PathSegType{proto.PathSegType_core}, - StartsAt: []addr.IA{core_110}, EndsAt: []addr.IA{core_120, core_130}, - })).Return(resultsFromSegs(tg.seg110_120, tg.seg110_130), nil) + StartsAt: []addr.IA{core_110}, EndsAt: []addr.IA{core_120}, + })).Return(resultsFromSegs(tg.seg110_120), nil) + db.EXPECT().Get(gomock.Any(), matchers.EqParams(&query.Params{ + SegTypes: []proto.PathSegType{proto.PathSegType_core}, + StartsAt: []addr.IA{core_110}, EndsAt: []addr.IA{core_130}, + })).Return(resultsFromSegs(tg.seg110_130), nil) }, ExpectedSegments: segfetcher.Segments{ Up: seg.Segments{tg.seg120_111, tg.seg130_111}, Core: seg.Segments{tg.seg110_120, tg.seg110_130}, }, + ExpectedReqSet: segfetcher.RequestSet{ + Up: segfetcher.Request{Src: non_core_111, Dst: isd1, State: segfetcher.Loaded}, + Cores: []segfetcher.Request{ + {Src: core_120, Dst: core_110, State: segfetcher.Loaded}, + {Src: core_130, Dst: core_110, State: segfetcher.Loaded}, + }, + }, + }, + "Up(fetched) Core(fetched)": { + Req: segfetcher.RequestSet{ + Up: segfetcher.Request{Src: non_core_111, Dst: isd1, State: segfetcher.Fetched}, + Cores: []segfetcher.Request{ + {Src: core_120, Dst: core_110, State: segfetcher.Fetched}, + {Src: core_130, Dst: core_110, State: segfetcher.Fetched}, + }, + }, + ExpectCalls: func(db *mock_pathdb.MockPathDB) { + db.EXPECT().Get(gomock.Any(), matchers.EqParams(&query.Params{ + SegTypes: []proto.PathSegType{proto.PathSegType_up}, + StartsAt: []addr.IA{isd1}, EndsAt: []addr.IA{non_core_111}, + })).Return(resultsFromSegs(tg.seg120_111, tg.seg130_111), nil) + db.EXPECT().Get(gomock.Any(), matchers.EqParams(&query.Params{ + SegTypes: []proto.PathSegType{proto.PathSegType_core}, + StartsAt: []addr.IA{core_110}, EndsAt: []addr.IA{core_120}, + })).Return(resultsFromSegs(tg.seg110_120), nil) + db.EXPECT().Get(gomock.Any(), matchers.EqParams(&query.Params{ + SegTypes: []proto.PathSegType{proto.PathSegType_core}, + StartsAt: []addr.IA{core_110}, EndsAt: []addr.IA{core_130}, + })).Return(resultsFromSegs(tg.seg110_130), nil) + }, + ExpectedSegments: segfetcher.Segments{ + Up: seg.Segments{tg.seg120_111, tg.seg130_111}, + Core: seg.Segments{tg.seg110_120, tg.seg110_130}, + }, + ExpectedReqSet: segfetcher.RequestSet{ + Up: segfetcher.Request{Src: non_core_111, Dst: isd1, State: segfetcher.Loaded}, + Cores: []segfetcher.Request{ + {Src: core_120, Dst: core_110, State: segfetcher.Loaded}, + {Src: core_130, Dst: core_110, State: segfetcher.Loaded}, + }, + }, }, "Up(passed) Core": { Req: segfetcher.RequestSet{ + Up: segfetcher.Request{Src: non_core_111, Dst: isd1, State: segfetcher.Loaded}, Cores: []segfetcher.Request{{Src: core_120, Dst: core_110}}, }, Segs: segfetcher.Segments{ @@ -223,8 +292,9 @@ func TestResolver(t *testing.T) { Up: seg.Segments{tg.seg120_111}, }, ExpectedReqSet: segfetcher.RequestSet{ + Up: segfetcher.Request{Src: non_core_111, Dst: isd1, State: segfetcher.Loaded}, Cores: []segfetcher.Request{ - {Src: core_120, Dst: core_110}, + {Src: core_120, Dst: core_110, State: segfetcher.Fetch}, }, }, }, @@ -239,9 +309,9 @@ func TestResolver(t *testing.T) { db.EXPECT().GetNextQuery(gomock.Any(), isd2, non_core_212, gomock.Any()) }, ExpectedReqSet: segfetcher.RequestSet{ - Up: segfetcher.Request{Src: non_core_211, Dst: isd2}, + Up: segfetcher.Request{Src: non_core_211, Dst: isd2, State: segfetcher.Fetch}, Cores: []segfetcher.Request{{Src: isd2, Dst: isd2}}, - Down: segfetcher.Request{Src: isd2, Dst: non_core_212}, + Down: segfetcher.Request{Src: isd2, Dst: non_core_212, State: segfetcher.Fetch}, }, }, "Up(cached) down(cached)": { @@ -268,6 +338,10 @@ func TestResolver(t *testing.T) { Up: seg.Segments{tg.seg210_211}, Down: seg.Segments{tg.seg210_212}, }, + ExpectedReqSet: segfetcher.RequestSet{ + Up: segfetcher.Request{Src: non_core_211, Dst: isd2, State: segfetcher.Loaded}, + Down: segfetcher.Request{Src: isd2, Dst: non_core_212, State: segfetcher.Loaded}, + }, }, "Up Core Down": { Req: segfetcher.RequestSet{ @@ -284,9 +358,9 @@ func TestResolver(t *testing.T) { gomock.Eq(non_core_211), gomock.Any()) }, ExpectedReqSet: segfetcher.RequestSet{ - Up: segfetcher.Request{Src: non_core_111, Dst: isd1}, + Up: segfetcher.Request{Src: non_core_111, Dst: isd1, State: segfetcher.Fetch}, Cores: []segfetcher.Request{{Src: isd1, Dst: isd2}}, - Down: segfetcher.Request{Src: isd2, Dst: non_core_211}, + Down: segfetcher.Request{Src: isd2, Dst: non_core_211, State: segfetcher.Fetch}, }, }, "Up(cached) Core Down": { @@ -310,8 +384,9 @@ func TestResolver(t *testing.T) { Up: seg.Segments{tg.seg120_111, tg.seg130_111}, }, ExpectedReqSet: segfetcher.RequestSet{ + Up: segfetcher.Request{Src: non_core_111, Dst: isd1, State: segfetcher.Loaded}, Cores: []segfetcher.Request{{Src: isd1, Dst: isd2}}, - Down: segfetcher.Request{Src: isd2, Dst: non_core_211}, + Down: segfetcher.Request{Src: isd2, Dst: non_core_211, State: segfetcher.Fetch}, }, }, "Up(cached) Core Down(cached)": { @@ -346,15 +421,19 @@ func TestResolver(t *testing.T) { Down: seg.Segments{tg.seg120_111, tg.seg130_111}, }, ExpectedReqSet: segfetcher.RequestSet{ + Up: segfetcher.Request{Src: non_core_211, Dst: isd2, State: segfetcher.Loaded}, Cores: []segfetcher.Request{ - {Src: core_210, Dst: core_120}, - {Src: core_210, Dst: core_130}, + {Src: core_210, Dst: core_120, State: segfetcher.Fetch}, + {Src: core_210, Dst: core_130, State: segfetcher.Fetch}, }, + Down: segfetcher.Request{Src: isd1, Dst: non_core_111, State: segfetcher.Loaded}, }, }, "Up(passed) Core Down(passed)": { Req: segfetcher.RequestSet{ + Up: segfetcher.Request{Src: non_core_211, Dst: isd2, State: segfetcher.Loaded}, Cores: []segfetcher.Request{{Src: isd2, Dst: isd1}}, + Down: segfetcher.Request{Src: isd1, Dst: non_core_111, State: segfetcher.Loaded}, }, Segs: segfetcher.Segments{ Up: seg.Segments{tg.seg210_211}, @@ -369,9 +448,11 @@ func TestResolver(t *testing.T) { Down: seg.Segments{tg.seg120_111}, }, ExpectedReqSet: segfetcher.RequestSet{ + Up: segfetcher.Request{Src: non_core_211, Dst: isd2, State: segfetcher.Loaded}, Cores: []segfetcher.Request{ - {Src: core_210, Dst: core_120}, + {Src: core_210, Dst: core_120, State: segfetcher.Fetch}, }, + Down: segfetcher.Request{Src: isd1, Dst: non_core_111, State: segfetcher.Loaded}, }, }, "Core(partial cached)": { @@ -399,8 +480,9 @@ func TestResolver(t *testing.T) { }, ExpectedReqSet: segfetcher.RequestSet{ Cores: []segfetcher.Request{ - {Src: core_210, Dst: core_110}, - {Src: core_210, Dst: core_120}, + {Src: core_210, Dst: core_110, State: segfetcher.Fetch}, + {Src: core_210, Dst: core_120, State: segfetcher.Fetch}, + {Src: core_210, Dst: core_130, State: segfetcher.Loaded}, }, }, }, @@ -421,48 +503,21 @@ func TestResolver(t *testing.T) { gomock.Eq(core_130), gomock.Any()).Return(futureT, nil) db.EXPECT().Get(gomock.Any(), matchers.EqParams(&query.Params{ SegTypes: []proto.PathSegType{proto.PathSegType_core}, - StartsAt: []addr.IA{core_110, core_120, core_130}, EndsAt: []addr.IA{core_210}, + StartsAt: []addr.IA{core_130}, EndsAt: []addr.IA{core_210}, })).Return(resultsFromSegs(tg.seg210_130), nil) + // no result cached for the others + db.EXPECT().Get(gomock.Any(), gomock.Any()).Times(2) }, ExpectedSegments: segfetcher.Segments{ Core: seg.Segments{tg.seg210_130}, }, - }, - "Core (cached) with revocations": { - Req: segfetcher.RequestSet{ + ExpectedReqSet: segfetcher.RequestSet{ Cores: []segfetcher.Request{ - {Src: core_210, Dst: core_110}, - {Src: core_210, Dst: core_120}, - {Src: core_210, Dst: core_130}, + {Src: core_210, Dst: core_110, State: segfetcher.Loaded}, + {Src: core_210, Dst: core_120, State: segfetcher.Loaded}, + {Src: core_210, Dst: core_130, State: segfetcher.Loaded}, }, }, - ExpectCalls: func(db *mock_pathdb.MockPathDB) { - db.EXPECT().GetNextQuery(gomock.Any(), gomock.Eq(core_210), - gomock.Eq(core_110), gomock.Any()).Return(futureT, nil) - db.EXPECT().GetNextQuery(gomock.Any(), gomock.Eq(core_210), - gomock.Eq(core_120), gomock.Any()).Return(futureT, nil) - db.EXPECT().GetNextQuery(gomock.Any(), gomock.Eq(core_210), - gomock.Eq(core_130), gomock.Any()).Return(futureT, nil) - db.EXPECT().Get(gomock.Any(), matchers.EqParams(&query.Params{ - SegTypes: []proto.PathSegType{proto.PathSegType_core}, - StartsAt: []addr.IA{core_110, core_120, core_130}, EndsAt: []addr.IA{core_210}, - })).Return(resultsFromSegs(tg.seg210_130, tg.seg210_130_2), nil) - }, - // Revoke the shorter path via 110, so it should only return the - // longer path via 120. - ExpectRevcache: func(t *testing.T, revCache *mock_revcache.MockRevCache) { - key110 := revcache.Key{IA: core_110, IfId: graph.If_110_X_130_A} - ksMatcher := keySetContains{keys: []revcache.Key{key110}} - srev, err := path_mgmt.NewSignedRevInfo(&path_mgmt.RevInfo{}, infra.NullSigner) - require.NoError(t, err) - revCache.EXPECT().Get(gomock.Any(), ksMatcher).Return(revcache.Revocations{ - key110: srev, - }, nil) - revCache.EXPECT().Get(gomock.Any(), gomock.Any()).AnyTimes() - }, - ExpectedSegments: segfetcher.Segments{ - Core: seg.Segments{tg.seg210_130_2}, - }, }, "Core Down": { Req: segfetcher.RequestSet{ @@ -475,7 +530,7 @@ func TestResolver(t *testing.T) { }, ExpectedReqSet: segfetcher.RequestSet{ Cores: []segfetcher.Request{{Src: core_110, Dst: isd2}}, - Down: segfetcher.Request{Src: isd2, Dst: non_core_211}, + Down: segfetcher.Request{Src: isd2, Dst: non_core_211, State: segfetcher.Fetch}, }, }, "Core Down(cached)": { @@ -497,12 +552,16 @@ func TestResolver(t *testing.T) { Down: seg.Segments{tg.seg210_211}, }, ExpectedReqSet: segfetcher.RequestSet{ - Cores: []segfetcher.Request{{Src: core_110, Dst: core_210}}, + Cores: []segfetcher.Request{ + {Src: core_110, Dst: core_210, State: segfetcher.Fetch}, + }, + Down: segfetcher.Request{Src: isd2, Dst: non_core_211, State: segfetcher.Loaded}, }, }, "Core Down(passed)": { Req: segfetcher.RequestSet{ Cores: []segfetcher.Request{{Src: core_110, Dst: core_210}}, + Down: segfetcher.Request{Src: isd2, Dst: non_core_211, State: segfetcher.Loaded}, }, Segs: segfetcher.Segments{ Down: seg.Segments{tg.seg210_211}, @@ -515,7 +574,10 @@ func TestResolver(t *testing.T) { Down: seg.Segments{tg.seg210_211}, }, ExpectedReqSet: segfetcher.RequestSet{ - Cores: []segfetcher.Request{{Src: core_110, Dst: core_210}}, + Cores: []segfetcher.Request{ + {Src: core_110, Dst: core_210, State: segfetcher.Fetch}, + }, + Down: segfetcher.Request{Src: isd2, Dst: non_core_211, State: segfetcher.Loaded}, }, }, "Core(cached) Down(cached)": { @@ -536,13 +598,24 @@ func TestResolver(t *testing.T) { gomock.Eq(core_130), gomock.Any()).Return(futureT, nil) db.EXPECT().Get(gomock.Any(), matchers.EqParams(&query.Params{ SegTypes: []proto.PathSegType{proto.PathSegType_core}, - StartsAt: []addr.IA{core_120, core_130}, EndsAt: []addr.IA{core_210}, - })).Return(resultsFromSegs(tg.seg210_120, tg.seg210_130), nil) + StartsAt: []addr.IA{core_120}, EndsAt: []addr.IA{core_210}, + })).Return(resultsFromSegs(tg.seg210_120), nil) + db.EXPECT().Get(gomock.Any(), matchers.EqParams(&query.Params{ + SegTypes: []proto.PathSegType{proto.PathSegType_core}, + StartsAt: []addr.IA{core_130}, EndsAt: []addr.IA{core_210}, + })).Return(resultsFromSegs(tg.seg210_130), nil) }, ExpectedSegments: segfetcher.Segments{ Down: seg.Segments{tg.seg120_111, tg.seg130_111}, Core: seg.Segments{tg.seg210_120, tg.seg210_130}, }, + ExpectedReqSet: segfetcher.RequestSet{ + Cores: []segfetcher.Request{ + {Src: core_210, Dst: core_120, State: segfetcher.Loaded}, + {Src: core_210, Dst: core_130, State: segfetcher.Loaded}, + }, + Down: segfetcher.Request{Src: isd1, Dst: non_core_111, State: segfetcher.Loaded}, + }, }, "Down": { Req: segfetcher.RequestSet{ @@ -555,7 +628,7 @@ func TestResolver(t *testing.T) { }, ExpectedReqSet: segfetcher.RequestSet{ - Down: segfetcher.Request{Src: core_120, Dst: non_core_111}, + Down: segfetcher.Request{Src: core_120, Dst: non_core_111, State: segfetcher.Fetch}, }, }, "Down(cached)": { @@ -574,6 +647,102 @@ func TestResolver(t *testing.T) { ExpectedSegments: segfetcher.Segments{ Down: seg.Segments{tg.seg120_111}, }, + ExpectedReqSet: segfetcher.RequestSet{ + Down: segfetcher.Request{Src: core_120, Dst: non_core_111, + State: segfetcher.Loaded}, + }, + }, + } + for name, test := range tests { + t.Run(name, test.run) + } +} + +func TestResolverWithRevocations(t *testing.T) { + rootCtrl := gomock.NewController(t) + defer rootCtrl.Finish() + tg := newTestGraph(rootCtrl) + futureT := time.Now().Add(2 * time.Minute) + + revoke := func(t *testing.T, revCache *mock_revcache.MockRevCache, key revcache.Key) { + ksMatcher := keySetContains{keys: []revcache.Key{key}} + srev, err := path_mgmt.NewSignedRevInfo(&path_mgmt.RevInfo{}, infra.NullSigner) + require.NoError(t, err) + revCache.EXPECT().Get(gomock.Any(), ksMatcher). + Return(revcache.Revocations{key: srev}, nil) + } + tests := map[string]resolverTest{ + "Up wildcard (cached)": { + Req: segfetcher.RequestSet{ + Up: segfetcher.Request{Src: non_core_111, Dst: isd1}, + }, + ExpectCalls: func(db *mock_pathdb.MockPathDB) { + // cached up segments + db.EXPECT().GetNextQuery(gomock.Any(), gomock.Eq(non_core_111), + gomock.Eq(isd1), gomock.Any()).Return(futureT, nil) + db.EXPECT().Get(gomock.Any(), matchers.EqParams(&query.Params{ + SegTypes: []proto.PathSegType{proto.PathSegType_up}, + StartsAt: []addr.IA{isd1}, EndsAt: []addr.IA{non_core_111}, + })).Return(resultsFromSegs(tg.seg120_111, tg.seg130_111), nil) + }, + ExpectRevcache: func(t *testing.T, revCache *mock_revcache.MockRevCache) { + key111_120 := revcache.Key{IA: non_core_111, IfId: graph.If_111_B_120_X} + key111_130 := revcache.Key{IA: non_core_111, IfId: graph.If_111_A_130_B} + revoke(t, revCache, key111_120) + revoke(t, revCache, key111_130) + revCache.EXPECT().Get(gomock.Any(), gomock.Any()).AnyTimes() + }, + ExpectedSegments: segfetcher.Segments{ + Up: seg.Segments{}, + }, + ExpectedReqSet: segfetcher.RequestSet{ + Up: segfetcher.Request{Src: non_core_111, Dst: isd1, State: segfetcher.Fetch}, + }, + }, + "Core (cached) with revocations returns partial result": { + Req: segfetcher.RequestSet{ + Cores: []segfetcher.Request{ + {Src: core_210, Dst: core_110}, + {Src: core_210, Dst: core_120}, + {Src: core_210, Dst: core_130}, + }, + }, + ExpectCalls: func(db *mock_pathdb.MockPathDB) { + db.EXPECT().GetNextQuery(gomock.Any(), gomock.Eq(core_210), + gomock.Eq(core_110), gomock.Any()).Return(futureT, nil) + db.EXPECT().GetNextQuery(gomock.Any(), gomock.Eq(core_210), + gomock.Eq(core_120), gomock.Any()).Return(futureT, nil) + db.EXPECT().GetNextQuery(gomock.Any(), gomock.Eq(core_210), + gomock.Eq(core_130), gomock.Any()).Return(futureT, nil) + db.EXPECT().Get(gomock.Any(), matchers.EqParams(&query.Params{ + SegTypes: []proto.PathSegType{proto.PathSegType_core}, + StartsAt: []addr.IA{core_130}, EndsAt: []addr.IA{core_210}, + })).Return(resultsFromSegs(tg.seg210_130, tg.seg210_130_2), nil) + // Other calls return 0 + db.EXPECT().Get(gomock.Any(), gomock.Any()).Times(2) + }, + // Revoke the shorter path via 110, so it should only return the + // longer path via 120. + ExpectRevcache: func(t *testing.T, revCache *mock_revcache.MockRevCache) { + key110 := revcache.Key{IA: core_110, IfId: graph.If_110_X_130_A} + ksMatcher := keySetContains{keys: []revcache.Key{key110}} + srev, err := path_mgmt.NewSignedRevInfo(&path_mgmt.RevInfo{}, infra.NullSigner) + require.NoError(t, err) + revCache.EXPECT().Get(gomock.Any(), ksMatcher).Return(revcache.Revocations{ + key110: srev, + }, nil) + revCache.EXPECT().Get(gomock.Any(), gomock.Any()).AnyTimes() + }, + ExpectedSegments: segfetcher.Segments{ + Core: seg.Segments{tg.seg210_130_2}, + }, + ExpectedReqSet: segfetcher.RequestSet{ + Cores: []segfetcher.Request{ + {Src: core_210, Dst: core_110, State: segfetcher.Loaded}, + {Src: core_210, Dst: core_120, State: segfetcher.Loaded}, + {Src: core_210, Dst: core_130, State: segfetcher.Loaded}, + }, + }, }, } for name, test := range tests { diff --git a/go/path_srv/internal/handlers/segrevoc.go b/go/path_srv/internal/handlers/segrevoc.go index 54442a9c12..6c7c5d318e 100644 --- a/go/path_srv/internal/handlers/segrevoc.go +++ b/go/path_srv/internal/handlers/segrevoc.go @@ -76,10 +76,6 @@ func (h *revocHandler) Handle() *infra.HandlerResult { sendAck(proto.Ack_ErrCode_reject, messenger.AckRejectFailedToVerify) return infra.MetricsErrInvalid } - // if err := h.NextQueryCleaner.ResetQueryCache(ctx, revInfo); err != nil { - // logger.Warn("Couldn't reset pathdb cache for revocation", "err", err) - // } - _, err = h.revCache.Insert(ctx, revocation) if err != nil { logger.Error("Failed to insert revInfo", "err", err) diff --git a/go/sciond/internal/servers/handlers.go b/go/sciond/internal/servers/handlers.go index 44b7439090..da3590ff85 100644 --- a/go/sciond/internal/servers/handlers.go +++ b/go/sciond/internal/servers/handlers.go @@ -263,13 +263,6 @@ func (h *RevNotificationHandler) Handle(ctx context.Context, conn net.PacketConn switch { case isValid(err): revReply.Result = sciond.RevValid - // revInfo, err := revNotification.SRevInfo.RevInfo() - // if err != nil { - // logger.Error("Failed to extract error from rev info", "err", err) - // } - // if err := h.NextQueryCleaner.ResetQueryCache(ctx, revInfo); err != nil { - // logger.Error("Failed to delete query cache", "err", err) - // } case isStale(err): revReply.Result = sciond.RevStale case isInvalid(err):