Skip to content

Commit

Permalink
Fix empty loading as well
Browse files Browse the repository at this point in the history
  • Loading branch information
lukedirtwalker committed Oct 11, 2019
1 parent 6d38438 commit 7e19c95
Show file tree
Hide file tree
Showing 8 changed files with 360 additions and 150 deletions.
39 changes: 21 additions & 18 deletions go/lib/infra/modules/segfetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (f *Fetcher) FetchSegs(ctx context.Context, req Request) (Segments, error)
}
log.FromCtx(ctx).Trace("After resolving",
"req", reqSet, "segs", segs, "iteration", i+1)
if reqSet.IsEmpty() {
if reqSet.IsLoaded() {
break
}
// in 3 iteration (i==2) everything should be resolved.
Expand All @@ -138,7 +138,7 @@ func (f *Fetcher) FetchSegs(ctx context.Context, req Request) (Segments, error)
reqCtx, cancelF := context.WithTimeout(ctx, 3*time.Second)
replies := f.Requester.Request(reqCtx, reqSet)
// TODO(lukedirtwalker): We need to have early trigger for the last request.
if err := f.waitOnProcessed(ctx, replies); err != nil {
if reqSet, err = f.waitOnProcessed(ctx, replies, reqSet); err != nil {
cancelF()
return Segments{}, err
}
Expand All @@ -147,12 +147,14 @@ func (f *Fetcher) FetchSegs(ctx context.Context, req Request) (Segments, error)
return segs, nil
}

func (f *Fetcher) waitOnProcessed(ctx context.Context, replies <-chan ReplyOrErr) error {
func (f *Fetcher) waitOnProcessed(ctx context.Context, replies <-chan ReplyOrErr,
reqSet RequestSet) (RequestSet, error) {

logger := log.FromCtx(ctx)
for reply := range replies {
// TODO(lukedirtwalker): Should we do this in go routines?
if reply.Err != nil {
return reply.Err
return reqSet, reply.Err
}
if reply.Reply == nil || reply.Reply.Recs == nil {
continue
Expand All @@ -161,29 +163,30 @@ func (f *Fetcher) waitOnProcessed(ctx context.Context, replies <-chan ReplyOrErr
select {
case <-r.FullReplyProcessed():
if err := r.Err(); err != nil {
return err
return reqSet, err
}
// TODO(lukedirtwalker): move state update to separate func
if reqSet.Up.EqualAddr(reply.Req) {
reqSet.Up.State = Fetched
} else if reqSet.Down.EqualAddr(reply.Req) {
reqSet.Down.State = Fetched
} else {
for i, coreReq := range reqSet.Cores {
if coreReq.EqualAddr(reply.Req) {
reqSet.Cores[i].State = Fetched
}
}
}
// for _, rev := range r.Stats().VerifiedRevs {
// revInfo, err := rev.RevInfo()
// if err != nil {
// logger.Warn("Failed to extract rev info from verified rev",
// "err", err, "rev", rev)
// continue
// }
// // TODO(lukedirtwalker): collect all revInfos and delete only
// // once.
// f.NextQueryCleaner.ResetQueryCache(ctx, revInfo)
// }
nextQuery := f.nextQuery(r.Stats().VerifiedSegs)
_, err := f.PathDB.InsertNextQuery(ctx, reply.Req.Src, reply.Req.Dst, nil, nextQuery)
if err != nil {
logger.Warn("Failed to insert next query", "err", err)
}
case <-ctx.Done():
return ctx.Err()
return reqSet, ctx.Err()
}
}
return nil
return reqSet, nil
}

func (f *Fetcher) verifyServer(reply ReplyOrErr) net.Addr {
Expand Down
46 changes: 41 additions & 5 deletions go/lib/infra/modules/segfetcher/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,28 @@ import (
"github.com/scionproto/scion/go/lib/ctrl/path_mgmt"
)

// RequestState is the state the request is in.
type RequestState int

const (
// Unresolved means the request is not yet resolved.
Unresolved RequestState = iota
// Fetch means the request needs to be fetched.
Fetch
// Cached means the request should be cached locally and can be loaded from
// DB.
Cached
// Fetched means the request has been fetched and should be in the DB.
Fetched
// Loaded means the request has been loaded from the DB.
Loaded
)

// Request represents a path or segment request.
type Request struct {
Src addr.IA
Dst addr.IA
Src addr.IA
Dst addr.IA
State RequestState
}

// IsZero returns whether the request is empty.
Expand All @@ -38,16 +56,24 @@ func (r Request) ToSegReq() *path_mgmt.SegReq {
}
}

// EqualAddr returns whether the two request refer to the same src/dst.
func (r Request) EqualAddr(other Request) bool {
return r.Src.Equal(other.Src) && r.Dst.Equal(other.Dst)
}

// RequestSet is a set of requests.
type RequestSet struct {
Up Request
Cores Requests
Down Request
}

// IsEmpty returns whether the request set is empty.
func (r RequestSet) IsEmpty() bool {
return r.Up.IsZero() && r.Cores.IsEmpty() && r.Down.IsZero()
// IsLoaded returns true if all non-zero requests in the set are in state
// loaded.
func (r RequestSet) IsLoaded() bool {
return (r.Up.IsZero() || r.Up.State == Loaded) &&
(r.Down.IsZero() || r.Down.State == Loaded) &&
r.Cores.AllLoaded()
}

// Requests is a list of requests and provides some convenience methods on top
Expand All @@ -69,6 +95,16 @@ func (r Requests) IsEmpty() bool {
return len(r) == 0
}

// AllLoaded returns whether all entries in request have state loaded.
func (r Requests) AllLoaded() bool {
for _, req := range r {
if req.State != Loaded {
return false
}
}
return true
}

func (r Requests) extractIAs(extract func(Request) addr.IA) []addr.IA {
set := make(map[addr.IA]struct{})
for _, req := range r {
Expand Down
21 changes: 8 additions & 13 deletions go/lib/infra/modules/segfetcher/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,16 @@ type DefaultRequester struct {
DstProvider DstProvider
}

// Request the missing segments from the remote. Note that this might only
// fetch a part of the full request set, i.e. if up or down segments are set,
// cores are not yet fetched, assuming the cores are not resolved.
// Request all requests in the request set that are in fetch state.
func (r *DefaultRequester) Request(ctx context.Context, req RequestSet) <-chan ReplyOrErr {
switch {
case req.Up.IsZero() && req.Down.IsZero():
// only cores to fetch
return r.fetchReqs(ctx, req.Cores)
case req.Up.IsZero() && !req.Down.IsZero():
return r.fetchReqs(ctx, Requests{req.Down})
case !req.Up.IsZero() && req.Down.IsZero():
return r.fetchReqs(ctx, Requests{req.Up})
default:
return r.fetchReqs(ctx, Requests{req.Up, req.Down})
var reqs Requests
allReqs := append(Requests{req.Up, req.Down}, req.Cores...)
for _, req := range allReqs {
if req.State == Fetch {
reqs = append(reqs, req)
}
}
return r.fetchReqs(ctx, reqs)
}

func (r *DefaultRequester) fetchReqs(ctx context.Context, reqs Requests) <-chan ReplyOrErr {
Expand Down
16 changes: 7 additions & 9 deletions go/lib/infra/modules/segfetcher/requester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ var (
non_core_211 = xtest.MustParseIA("2-ff00:0:211")
non_core_212 = xtest.MustParseIA("2-ff00:0:212")

req_111_1 = segfetcher.Request{Src: non_core_111, Dst: isd1}
req_1_111 = segfetcher.Request{Src: isd1, Dst: non_core_111}
req_111_1 = segfetcher.Request{Src: non_core_111, Dst: isd1, State: segfetcher.Fetch}
req_1_111 = segfetcher.Request{Src: isd1, Dst: non_core_111, State: segfetcher.Fetch}
req_1_2 = segfetcher.Request{Src: isd1, Dst: isd2}
req_1_210 = segfetcher.Request{Src: isd1, Dst: core_210}
req_2_211 = segfetcher.Request{Src: isd2, Dst: non_core_211}
req_2_211 = segfetcher.Request{Src: isd2, Dst: non_core_211, State: segfetcher.Fetch}
req_210_1 = segfetcher.Request{Src: core_210, Dst: isd1}
req_210_110 = segfetcher.Request{Src: core_210, Dst: core_110}
req_210_120 = segfetcher.Request{Src: core_210, Dst: core_120}
req_210_130 = segfetcher.Request{Src: core_210, Dst: core_130}
req_210_110 = segfetcher.Request{Src: core_210, Dst: core_110, State: segfetcher.Fetch}
req_210_120 = segfetcher.Request{Src: core_210, Dst: core_120, State: segfetcher.Fetch}
req_210_130 = segfetcher.Request{Src: core_210, Dst: core_130, State: segfetcher.Fetch}
)

func TestRequester(t *testing.T) {
Expand Down Expand Up @@ -85,9 +85,7 @@ func TestRequester(t *testing.T) {
}
api.EXPECT().GetSegs(gomock.Any(), gomock.Eq(req), gomock.Any(), gomock.Any()).
Return(reply, nil)
return []segfetcher.ReplyOrErr{
{Req: segfetcher.Request{Src: non_core_111, Dst: isd1}, Reply: reply},
}
return []segfetcher.ReplyOrErr{{Req: req_111_1, Reply: reply}}
},
},
"Down only": {
Expand Down
82 changes: 51 additions & 31 deletions go/lib/infra/modules/segfetcher/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,46 +62,56 @@ func (r *DefaultResolver) Resolve(ctx context.Context, segs Segments,
req RequestSet) (Segments, RequestSet, error) {

var err error
if !req.Up.IsZero() {
if !req.Up.IsZero() && (req.Up.State == Unresolved || req.Up.State == Fetched) {
if segs, req, err = r.resolveUpSegs(ctx, segs, req); err != nil {
return segs, req, err
}
}
if !req.Down.IsZero() {
if !req.Down.IsZero() && (req.Down.State == Unresolved || req.Down.State == Fetched) {
if segs, req, err = r.resolveDownSegs(ctx, segs, req); err != nil {
return segs, req, err
}
}
// If there are still up or down segments to request, or if there are no
// core segments no more action can be done here.
if !req.Up.IsZero() || !req.Down.IsZero() || req.Cores.IsEmpty() {
if (!req.Up.IsZero() && req.Up.State != Loaded) ||
(!req.Down.IsZero() && req.Down.State != Loaded) ||
req.Cores.IsEmpty() {
return segs, req, nil
}
// now resolve core segs:
req.Cores, err = r.expandCores(segs, req)
if err != nil {
return segs, req, err
}
var cachedReqs Requests
if req.Cores, cachedReqs, err = r.resolveCores(ctx, req); err != nil {
if req.Cores, err = r.resolveCores(ctx, req); err != nil {
return segs, req, err
}
if len(req.Cores) == 0 {
req.Cores = nil
}
if len(cachedReqs) > 0 {
for i, coreReq := range req.Cores {
if coreReq.State != Cached && coreReq.State != Fetched {
continue
}
coreRes, err := r.DB.Get(ctx, &query.Params{
StartsAt: cachedReqs.DstIAs(),
EndsAt: cachedReqs.SrcIAs(),
StartsAt: []addr.IA{coreReq.Dst},
EndsAt: []addr.IA{coreReq.Src},
SegTypes: []proto.PathSegType{proto.PathSegType_core},
})
if err != nil {
return segs, req, err
}
segs.Core, err = r.resultsToSegs(ctx, coreRes)
coreSegs, filtered, err := r.resultsToSegs(ctx, coreRes)
if err != nil {
return segs, req, err
}
if len(coreSegs) == 0 && filtered > 0 && coreReq.State != Fetched {
req.Cores[i].State = Fetch
} else {
req.Cores[i].State = Loaded
}
segs.Core = append(segs.Core, coreSegs...)
}
return segs, req, nil
}
Expand All @@ -125,9 +135,12 @@ func (r *DefaultResolver) resolveDownSegs(ctx context.Context, segs Segments,
func (r *DefaultResolver) resolveSegment(ctx context.Context,
req Request, consDir bool) (seg.Segments, Request, error) {

fetch, err := r.needsFetching(ctx, req)
if err != nil || fetch {
return nil, req, err
if req.State == Unresolved {
fetch, err := r.needsFetching(ctx, req)
if err != nil || fetch {
req.State = Fetch
return nil, req, err
}
}
start, end := req.Src, req.Dst
segType := proto.PathSegType_down
Expand All @@ -143,8 +156,18 @@ func (r *DefaultResolver) resolveSegment(ctx context.Context,
if err != nil {
return nil, req, err
}
segs, err := r.resultsToSegs(ctx, res)
return segs, Request{}, err
segs, filtered, err := r.resultsToSegs(ctx, res)
// because of revocations our cache is empty, so refetch
if len(segs) == 0 && filtered > 0 {
if req.State == Unresolved {
req.State = Fetch
} else {
req.State = Loaded
}
return segs, req, err
}
req.State = Loaded
return segs, req, err
}

func (r *DefaultResolver) needsFetching(ctx context.Context, req Request) (bool, error) {
Expand Down Expand Up @@ -248,44 +271,41 @@ func (r *DefaultResolver) expandNonCoreToNonCore(segs Segments,
return coreReqs, nil
}

// resolveCores returns cores that need to be requested and the ones which are
// already cached.
// resolveCores returns cores requests classified with a state.
func (r *DefaultResolver) resolveCores(ctx context.Context,
req RequestSet) (Requests, Requests, error) {
req RequestSet) (Requests, error) {

var cachedReqs Requests
remainingCores := req.Cores[:0]
needsFetching := make(map[Request]bool)
for _, coreReq := range req.Cores {
for i, coreReq := range req.Cores {
if coreReq.State == Fetched {
needsFetching[coreReq] = false
}
coreFetch, ok := needsFetching[coreReq]
if !ok {
var err error
if coreFetch, err = r.needsFetching(ctx, coreReq); err != nil {
return remainingCores, cachedReqs, err
return req.Cores, err
}
needsFetching[coreReq] = coreFetch
}
if coreFetch {
remainingCores = append(remainingCores, coreReq)
req.Cores[i].State = Fetch
} else {
cachedReqs = append(cachedReqs, coreReq)
req.Cores[i].State = Cached
}
}
return remainingCores, cachedReqs, nil
return req.Cores, nil
}

func (r *DefaultResolver) resultsToSegs(ctx context.Context,
results query.Results) (seg.Segments, error) {
results query.Results) (seg.Segments, int, error) {

segs := results.Segs()
if r.IgnoreRevs {
return segs, nil
}
_, err := segs.FilterSegsErr(func(ps *seg.PathSegment) (bool, error) {
filtered, err := segs.FilterSegsErr(func(ps *seg.PathSegment) (bool, error) {
return revcache.NoRevokedHopIntf(ctx, r.RevCache, ps)
})
if err != nil {
return nil, err
return nil, 0, err
}
return segs, nil
return segs, filtered, nil
}
Loading

0 comments on commit 7e19c95

Please sign in to comment.