Skip to content

Commit

Permalink
PS,SD: Use revocations from SegReply to invalidate NextQuery (#3058)
Browse files Browse the repository at this point in the history
In the PS and SD we invalidate the NextQuery for all segments that have received a revocation. But currently we only do this for explicit revocations that come in as Revocation message, we should also do it for revocations that we receive in a SegReply from another PS.
This commit also invalidates NextQuery cache for revocations received in the SegReply.

Fixes #3057
  • Loading branch information
lukedirtwalker authored Aug 29, 2019
1 parent 63b110f commit 40c4563
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 14 deletions.
25 changes: 19 additions & 6 deletions go/lib/infra/modules/segfetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (cfg FetcherConfig) New() *Fetcher {
},
PathDB: cfg.PathDB,
QueryInterval: cfg.QueryInterval,
NextQueryCleaner: NextQueryCleaner{PathDB: cfg.PathDB},
CryptoLookupAtLocalCS: cfg.SciondMode,
}
}
Expand All @@ -81,6 +82,7 @@ type Fetcher struct {
ReplyHandler ReplyHandler
PathDB pathdb.PathDB
QueryInterval time.Duration
NextQueryCleaner NextQueryCleaner
CryptoLookupAtLocalCS bool
}

Expand All @@ -100,19 +102,21 @@ func (f *Fetcher) FetchSegs(ctx context.Context, req Request) (Segments, error)
var segs Segments
i := 0
for {
log.FromCtx(ctx).Trace("Request to process", "req", reqSet, "segs", segs)
log.FromCtx(ctx).Trace("Request to process",
"req", reqSet, "segs", segs, "iteration", i+1)
segs, reqSet, err = f.Resolver.Resolve(ctx, segs, reqSet)
if err != nil {
return Segments{}, err
}
log.FromCtx(ctx).Trace("After resolving", "req", reqSet, "segs", segs)
log.FromCtx(ctx).Trace("After resolving",
"req", reqSet, "segs", segs, "iteration", i+1)
if reqSet.IsEmpty() {
break
}
if i > 3 {
log.FromCtx(ctx).Error("No convergence in lookup", "iteration", i+1)
return segs, common.NewBasicError("Segment lookup doesn't converge", nil,
"iterations", i)
"iterations", i+1)
}
// XXX(lukedirtwalker): Optimally we wouldn't need a different timeout
// here. The problem is that revocations can't be differentiated from
Expand All @@ -131,6 +135,7 @@ func (f *Fetcher) FetchSegs(ctx context.Context, req Request) (Segments, error)
}

func (f *Fetcher) waitOnProcessed(ctx context.Context, replies <-chan ReplyOrErr) error {
logger := log.FromCtx(ctx)
for reply := range replies {
// TODO(lukedirtwalker): Should we do this in go routines?
if reply.Err != nil {
Expand All @@ -146,15 +151,23 @@ func (f *Fetcher) waitOnProcessed(ctx context.Context, replies <-chan ReplyOrErr
return err
}
queryInt := f.QueryInterval
for _, rev := range r.VerifiedRevs() {
revInfo, err := rev.RevInfo()
if err != nil {
logger.Warn("Failed to extract rev info from verified rev",
"err", err, "rev", rev)
continue
}
f.NextQueryCleaner.ResetQueryCache(ctx, revInfo)
}
// TODO(lukedirtwalker): make the short interval configurable
// TODO(lukedirtwalker): only count successfully verified entries:
if len(reply.Reply.Recs.Recs) <= 0 {
if r.VerifiedSegs() <= 0 {
queryInt = 2 * time.Second
}
_, err := f.PathDB.InsertNextQuery(ctx, reply.Req.Src, reply.Req.Dst, nil,
time.Now().Add(queryInt))
if err != nil {
log.FromCtx(ctx).Warn("Failed to insert next query", "err", err)
logger.Warn("Failed to insert next query", "err", err)
}
case <-ctx.Done():
return ctx.Err()
Expand Down
5 changes: 3 additions & 2 deletions go/lib/infra/modules/segfetcher/revocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (c *NextQueryCleaner) ResetQueryCache(ctx context.Context, revInfo *path_mg
func DeleteNextQueryEntries(ctx context.Context, tx pathdb.Transaction,
results query.Results) error {

logger := log.FromCtx(ctx)
nextQueriesToDelete := make(map[Request]struct{})
for _, r := range results {
var req Request
Expand All @@ -66,13 +67,13 @@ func DeleteNextQueryEntries(ctx context.Context, tx pathdb.Transaction,
case proto.PathSegType_down:
req = Request{Src: addr.IA{I: r.Seg.FirstIA().I}, Dst: r.Seg.LastIA()}
default:
log.Error("Invalid seg type", "segType", r.Type)
logger.Error("Invalid seg type", "segType", r.Type)
continue
}
nextQueriesToDelete[req] = struct{}{}
}
for nq := range nextQueriesToDelete {
log.Trace("Delete NQ", "src", nq.Src, "dst", nq.Dst)
logger.Trace("Delete NQ", "src", nq.Src, "dst", nq.Dst)
if _, err := tx.DeleteNQ(ctx, nq.Src, nq.Dst, nil); err != nil {
return err
}
Expand Down
32 changes: 26 additions & 6 deletions go/lib/infra/modules/segfetcher/segreplyhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ func (s *DefaultStorage) StoreRevs(ctx context.Context,
type ProcessedResult struct {
early chan int
full chan struct{}
segs int
revs []*path_mgmt.SignedRevInfo
err error
verifyErrs []error
}
Expand All @@ -128,6 +130,18 @@ func (r *ProcessedResult) FullReplyProcessed() <-chan struct{} {
return r.full
}

// VerifiedSegs returns the amount of verified segs. This should only be
// accessed after FullReplyProcessed channel has been closed.
func (r *ProcessedResult) VerifiedSegs() int {
return r.segs
}

// VerifiedRevs returns the verified revocations. This should only be accessed
// after FullReplyProcessed channel has been closed.
func (r *ProcessedResult) VerifiedRevs() []*path_mgmt.SignedRevInfo {
return r.revs
}

// Err indicates the error that happened when storing the segments. This should
// only be accessed after FullReplyProcessed channel has been closed.
func (r *ProcessedResult) Err() error {
Expand Down Expand Up @@ -182,6 +196,7 @@ func (h *SegReplyHandler) verifyAndStore(ctx context.Context,
verifiedUnits := make([]segverifier.UnitResult, 0, units)
var allVerifyErrs []error
totalSegsSaved := 0
var allRevs []*path_mgmt.SignedRevInfo
defer close(result.full)
defer func() {
if earlyTrigger != nil {
Expand All @@ -196,9 +211,10 @@ func (h *SegReplyHandler) verifyAndStore(ctx context.Context,
case <-earlyTrigger:
// Reduce u since this does not process an additional unit.
u--
segs, verifyErrs, err := h.storeResults(ctx, verifiedUnits)
segs, revs, verifyErrs, err := h.storeResults(ctx, verifiedUnits)
allVerifyErrs = append(allVerifyErrs, verifyErrs...)
totalSegsSaved += segs
allRevs = append(allRevs, revs...)
result.early <- segs
// TODO(lukedirtwalker): log early store failure
if err == nil {
Expand All @@ -209,14 +225,16 @@ func (h *SegReplyHandler) verifyAndStore(ctx context.Context,
earlyTrigger = nil
}
}
segs, verifyErrs, err := h.storeResults(ctx, verifiedUnits)
segs, revs, verifyErrs, err := h.storeResults(ctx, verifiedUnits)
result.verifyErrs = append(allVerifyErrs, verifyErrs...)
result.err = err
totalSegsSaved += segs
result.segs = totalSegsSaved
result.revs = append(allRevs, revs...)
}

func (h *SegReplyHandler) storeResults(ctx context.Context,
verifiedUnits []segverifier.UnitResult) (int, []error, error) {
verifiedUnits []segverifier.UnitResult) (int, []*path_mgmt.SignedRevInfo, []error, error) {

var verifyErrs []error
segs := make([]*SegWithHP, 0, len(verifiedUnits))
Expand All @@ -242,11 +260,13 @@ func (h *SegReplyHandler) storeResults(ctx context.Context,
}
if len(segs) > 0 {
if err := h.Storage.StoreSegs(ctx, segs); err != nil {
return 0, verifyErrs, err
return 0, nil, verifyErrs, err
}
}
if len(revs) > 0 {
return len(segs), verifyErrs, h.Storage.StoreRevs(ctx, revs)
if err := h.Storage.StoreRevs(ctx, revs); err != nil {
return len(segs), nil, verifyErrs, h.Storage.StoreRevs(ctx, revs)
}
}
return len(segs), verifyErrs, nil
return len(segs), revs, verifyErrs, nil
}
17 changes: 17 additions & 0 deletions go/lib/infra/modules/segfetcher/segreplyhandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (

var TestTimeout = time.Second

// TestReplyHandlerEmptyReply test that we can handle an empty SegReply.
func TestReplyHandlerEmptyReply(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand All @@ -62,8 +63,12 @@ func TestReplyHandlerEmptyReply(t *testing.T) {
xtest.AssertReadReturnsBefore(t, r.FullReplyProcessed(), time.Second/2)
assert.NoError(t, r.Err())
assert.Nil(t, r.VerificationErrors())
assert.Zero(t, r.VerifiedSegs())
assert.Empty(t, r.VerifiedRevs())
}

// TestReplyHandlerErrors tests erros that happen during verification are
// properly stored in the result struct.
func TestReplyHandlerErrors(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down Expand Up @@ -116,8 +121,12 @@ func TestReplyHandlerErrors(t *testing.T) {
xtest.AssertReadReturnsBefore(t, r.FullReplyProcessed(), time.Second/2)
assert.NoError(t, r.Err())
assert.Len(t, r.VerificationErrors(), len(verifyErrs))
assert.Zero(t, r.VerifiedSegs())
assert.Empty(t, r.VerifiedRevs())
}

// TestReplyHandlerNoErrors tests the happy case of the reply handler: 3
// segments and 1 revocation are successfully verified and stored.
func TestReplyHandlerNoErrors(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down Expand Up @@ -178,6 +187,8 @@ func TestReplyHandlerNoErrors(t *testing.T) {
xtest.AssertReadReturnsBefore(t, r.FullReplyProcessed(), time.Second/2)
assert.NoError(t, r.Err())
assert.Nil(t, r.VerificationErrors())
assert.Equal(t, 3, r.VerifiedSegs())
assert.ElementsMatch(t, []*path_mgmt.SignedRevInfo{rev1}, r.VerifiedRevs())
}

func TestReplyHandlerAllVerifiedInEarlyInterval(t *testing.T) {
Expand Down Expand Up @@ -228,6 +239,8 @@ func TestReplyHandlerAllVerifiedInEarlyInterval(t *testing.T) {
xtest.AssertReadReturnsBefore(t, r.FullReplyProcessed(), time.Second/2)
assert.NoError(t, r.Err())
assert.Nil(t, r.VerificationErrors())
assert.Equal(t, 2, r.VerifiedSegs())
assert.ElementsMatch(t, []*path_mgmt.SignedRevInfo{rev1}, r.VerifiedRevs())
}

func TestReplyHandlerEarlyTriggerStorageError(t *testing.T) {
Expand Down Expand Up @@ -282,6 +295,8 @@ func TestReplyHandlerEarlyTriggerStorageError(t *testing.T) {
xtest.AssertReadReturnsBefore(t, r.FullReplyProcessed(), time.Second/2)
assert.NoError(t, r.Err())
assert.Nil(t, r.VerificationErrors())
assert.Equal(t, 2, r.VerifiedSegs())
assert.ElementsMatch(t, []*path_mgmt.SignedRevInfo{rev1}, r.VerifiedRevs())
}

func TestReplyHandlerStorageError(t *testing.T) {
Expand Down Expand Up @@ -330,6 +345,8 @@ func TestReplyHandlerStorageError(t *testing.T) {
xtest.AssertReadReturnsBefore(t, r.FullReplyProcessed(), time.Second/2)
assert.Error(t, r.Err())
assert.Nil(t, r.VerificationErrors())
assert.Zero(t, r.VerifiedSegs())
assert.Empty(t, r.VerifiedRevs())
}

func AssertChanEmpty(t *testing.T, ch <-chan struct{}) {
Expand Down

0 comments on commit 40c4563

Please sign in to comment.