From fb4bb9589bd19851e2a98651a2db5809c7d6a4d7 Mon Sep 17 00:00:00 2001 From: Lukas Vogel Date: Mon, 2 Sep 2019 15:16:40 +0200 Subject: [PATCH] Add seghandler to verify and store segs & revs Generalizes `SegReplyHandler` from segfetcher package into `seghandler.Handler`. This can be used to for all kind of segment storing and verification also for hidden paths etc. Fixes #3076 --- go/lib/infra/modules/segfetcher/BUILD.bazel | 6 +- go/lib/infra/modules/segfetcher/fetcher.go | 23 +- .../segfetcher/mock_segfetcher/BUILD.bazel | 2 +- .../segfetcher/mock_segfetcher/segfetcher.go | 97 +------ .../modules/segfetcher/segreplyhandler.go | 272 ------------------ go/lib/infra/modules/seghandler/BUILD.bazel | 45 +++ .../seghandler/mock_seghandler/BUILD.bazel | 14 + .../seghandler/mock_seghandler/seghandler.go | 104 +++++++ go/lib/infra/modules/seghandler/result.go | 62 ++++ go/lib/infra/modules/seghandler/seghandler.go | 162 +++++++++++ .../seghandler_test.go} | 136 ++++----- go/lib/infra/modules/seghandler/storage.go | 92 ++++++ go/lib/infra/modules/seghandler/verifier.go | 41 +++ .../infra/modules/segverifier/segverifier.go | 47 --- go/path_srv/internal/handlers/BUILD.bazel | 1 + go/path_srv/internal/handlers/common.go | 70 ----- go/path_srv/internal/handlers/segreg.go | 20 +- go/path_srv/internal/handlers/segsync.go | 20 +- tools/gomocks | 3 +- 19 files changed, 649 insertions(+), 568 deletions(-) delete mode 100644 go/lib/infra/modules/segfetcher/segreplyhandler.go create mode 100644 go/lib/infra/modules/seghandler/BUILD.bazel create mode 100644 go/lib/infra/modules/seghandler/mock_seghandler/BUILD.bazel create mode 100644 go/lib/infra/modules/seghandler/mock_seghandler/seghandler.go create mode 100644 go/lib/infra/modules/seghandler/result.go create mode 100644 go/lib/infra/modules/seghandler/seghandler.go rename go/lib/infra/modules/{segfetcher/segreplyhandler_test.go => seghandler/seghandler_test.go} (71%) create mode 100644 go/lib/infra/modules/seghandler/storage.go create mode 100644 go/lib/infra/modules/seghandler/verifier.go diff --git a/go/lib/infra/modules/segfetcher/BUILD.bazel b/go/lib/infra/modules/segfetcher/BUILD.bazel index 56d94d6fe3..daa00a8aec 100644 --- a/go/lib/infra/modules/segfetcher/BUILD.bazel +++ b/go/lib/infra/modules/segfetcher/BUILD.bazel @@ -9,7 +9,6 @@ go_library( "requester.go", "resolver.go", "revocation.go", - "segreplyhandler.go", "segs.go", "splitter.go", "validator.go", @@ -23,7 +22,7 @@ go_library( "//go/lib/ctrl/seg:go_default_library", "//go/lib/infra:go_default_library", "//go/lib/infra/messenger:go_default_library", - "//go/lib/infra/modules/segverifier:go_default_library", + "//go/lib/infra/modules/seghandler:go_default_library", "//go/lib/log:go_default_library", "//go/lib/pathdb:go_default_library", "//go/lib/pathdb/query:go_default_library", @@ -39,7 +38,6 @@ go_test( "requester_test.go", "resolver_test.go", "revocation_test.go", - "segreplyhandler_test.go", ], embed = [":go_default_library"], deps = [ @@ -49,8 +47,6 @@ go_test( "//go/lib/ctrl/seg:go_default_library", "//go/lib/infra:go_default_library", "//go/lib/infra/modules/segfetcher/mock_segfetcher:go_default_library", - "//go/lib/infra/modules/segverifier:go_default_library", - "//go/lib/mocks/net/mock_net:go_default_library", "//go/lib/pathdb/mock_pathdb:go_default_library", "//go/lib/pathdb/query:go_default_library", "//go/lib/revcache:go_default_library", diff --git a/go/lib/infra/modules/segfetcher/fetcher.go b/go/lib/infra/modules/segfetcher/fetcher.go index 471ef1dc96..ae2604ec04 100644 --- a/go/lib/infra/modules/segfetcher/fetcher.go +++ b/go/lib/infra/modules/segfetcher/fetcher.go @@ -21,12 +21,20 @@ import ( "github.com/scionproto/scion/go/lib/addr" "github.com/scionproto/scion/go/lib/common" + "github.com/scionproto/scion/go/lib/ctrl/path_mgmt" "github.com/scionproto/scion/go/lib/infra" + "github.com/scionproto/scion/go/lib/infra/modules/seghandler" "github.com/scionproto/scion/go/lib/log" "github.com/scionproto/scion/go/lib/pathdb" "github.com/scionproto/scion/go/lib/revcache" ) +// ReplyHandler handles replies. +type ReplyHandler interface { + Handle(ctx context.Context, recs seghandler.Segments, server net.Addr, + earlyTrigger <-chan struct{}) *seghandler.ProcessedResult +} + // FetcherConfig is the configuration for the fetcher. type FetcherConfig struct { // QueryInterval specifies after how much time segments should be @@ -62,9 +70,9 @@ func (cfg FetcherConfig) New() *Fetcher { Splitter: cfg.Splitter, Resolver: NewResolver(cfg.PathDB, cfg.RevCache, !cfg.SciondMode), Requester: &DefaultRequester{API: cfg.RequestAPI, DstProvider: cfg.DstProvider}, - ReplyHandler: &SegReplyHandler{ - Verifier: &SegVerifier{Verifier: cfg.VerificationFactory.NewVerifier()}, - Storage: &DefaultStorage{PathDB: cfg.PathDB, RevCache: cfg.RevCache}, + ReplyHandler: &seghandler.Handler{ + Verifier: &seghandler.DefaultVerifier{Verifier: cfg.VerificationFactory.NewVerifier()}, + Storage: &seghandler.DefaultStorage{PathDB: cfg.PathDB, RevCache: cfg.RevCache}, }, PathDB: cfg.PathDB, QueryInterval: cfg.QueryInterval, @@ -144,7 +152,7 @@ func (f *Fetcher) waitOnProcessed(ctx context.Context, replies <-chan ReplyOrErr if reply.Reply == nil || reply.Reply.Recs == nil { continue } - r := f.ReplyHandler.Handle(ctx, reply.Reply, f.verifyServer(reply), nil) + r := f.ReplyHandler.Handle(ctx, replyToRecs(reply.Reply), f.verifyServer(reply), nil) select { case <-r.FullReplyProcessed(): if err := r.Err(); err != nil { @@ -182,3 +190,10 @@ func (f *Fetcher) verifyServer(reply ReplyOrErr) net.Addr { } return reply.Peer } + +func replyToRecs(reply *path_mgmt.SegReply) seghandler.Segments { + return seghandler.Segments{ + Segs: reply.Recs.Recs, + SRevInfos: reply.Recs.SRevInfos, + } +} diff --git a/go/lib/infra/modules/segfetcher/mock_segfetcher/BUILD.bazel b/go/lib/infra/modules/segfetcher/mock_segfetcher/BUILD.bazel index bad22f4c37..ecc17cb448 100644 --- a/go/lib/infra/modules/segfetcher/mock_segfetcher/BUILD.bazel +++ b/go/lib/infra/modules/segfetcher/mock_segfetcher/BUILD.bazel @@ -8,7 +8,7 @@ go_library( deps = [ "//go/lib/ctrl/path_mgmt:go_default_library", "//go/lib/infra/modules/segfetcher:go_default_library", - "//go/lib/infra/modules/segverifier:go_default_library", + "//go/lib/infra/modules/seghandler:go_default_library", "@com_github_golang_mock//gomock:go_default_library", ], ) diff --git a/go/lib/infra/modules/segfetcher/mock_segfetcher/segfetcher.go b/go/lib/infra/modules/segfetcher/mock_segfetcher/segfetcher.go index f4ea0593e9..b1f2db69f5 100644 --- a/go/lib/infra/modules/segfetcher/mock_segfetcher/segfetcher.go +++ b/go/lib/infra/modules/segfetcher/mock_segfetcher/segfetcher.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/scionproto/scion/go/lib/infra/modules/segfetcher (interfaces: DstProvider,ReplyHandler,Requester,RequestAPI,Resolver,Splitter,Storage,Validator,Verifier) +// Source: github.com/scionproto/scion/go/lib/infra/modules/segfetcher (interfaces: DstProvider,ReplyHandler,Requester,RequestAPI,Resolver,Splitter,Validator) // Package mock_segfetcher is a generated GoMock package. package mock_segfetcher @@ -9,7 +9,7 @@ import ( gomock "github.com/golang/mock/gomock" path_mgmt "github.com/scionproto/scion/go/lib/ctrl/path_mgmt" segfetcher "github.com/scionproto/scion/go/lib/infra/modules/segfetcher" - segverifier "github.com/scionproto/scion/go/lib/infra/modules/segverifier" + seghandler "github.com/scionproto/scion/go/lib/infra/modules/seghandler" net "net" reflect "reflect" ) @@ -76,10 +76,10 @@ func (m *MockReplyHandler) EXPECT() *MockReplyHandlerMockRecorder { } // Handle mocks base method -func (m *MockReplyHandler) Handle(arg0 context.Context, arg1 *path_mgmt.SegReply, arg2 net.Addr, arg3 <-chan struct{}) *segfetcher.ProcessedResult { +func (m *MockReplyHandler) Handle(arg0 context.Context, arg1 seghandler.Segments, arg2 net.Addr, arg3 <-chan struct{}) *seghandler.ProcessedResult { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Handle", arg0, arg1, arg2, arg3) - ret0, _ := ret[0].(*segfetcher.ProcessedResult) + ret0, _ := ret[0].(*seghandler.ProcessedResult) return ret0 } @@ -241,57 +241,6 @@ func (mr *MockSplitterMockRecorder) Split(arg0, arg1 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Split", reflect.TypeOf((*MockSplitter)(nil).Split), arg0, arg1) } -// MockStorage is a mock of Storage interface -type MockStorage struct { - ctrl *gomock.Controller - recorder *MockStorageMockRecorder -} - -// MockStorageMockRecorder is the mock recorder for MockStorage -type MockStorageMockRecorder struct { - mock *MockStorage -} - -// NewMockStorage creates a new mock instance -func NewMockStorage(ctrl *gomock.Controller) *MockStorage { - mock := &MockStorage{ctrl: ctrl} - mock.recorder = &MockStorageMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use -func (m *MockStorage) EXPECT() *MockStorageMockRecorder { - return m.recorder -} - -// StoreRevs mocks base method -func (m *MockStorage) StoreRevs(arg0 context.Context, arg1 []*path_mgmt.SignedRevInfo) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "StoreRevs", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// StoreRevs indicates an expected call of StoreRevs -func (mr *MockStorageMockRecorder) StoreRevs(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StoreRevs", reflect.TypeOf((*MockStorage)(nil).StoreRevs), arg0, arg1) -} - -// StoreSegs mocks base method -func (m *MockStorage) StoreSegs(arg0 context.Context, arg1 []*segfetcher.SegWithHP) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "StoreSegs", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// StoreSegs indicates an expected call of StoreSegs -func (mr *MockStorageMockRecorder) StoreSegs(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StoreSegs", reflect.TypeOf((*MockStorage)(nil).StoreSegs), arg0, arg1) -} - // MockValidator is a mock of Validator interface type MockValidator struct { ctrl *gomock.Controller @@ -328,41 +277,3 @@ func (mr *MockValidatorMockRecorder) Validate(arg0, arg1 interface{}) *gomock.Ca mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Validate", reflect.TypeOf((*MockValidator)(nil).Validate), arg0, arg1) } - -// MockVerifier is a mock of Verifier interface -type MockVerifier struct { - ctrl *gomock.Controller - recorder *MockVerifierMockRecorder -} - -// MockVerifierMockRecorder is the mock recorder for MockVerifier -type MockVerifierMockRecorder struct { - mock *MockVerifier -} - -// NewMockVerifier creates a new mock instance -func NewMockVerifier(ctrl *gomock.Controller) *MockVerifier { - mock := &MockVerifier{ctrl: ctrl} - mock.recorder = &MockVerifierMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use -func (m *MockVerifier) EXPECT() *MockVerifierMockRecorder { - return m.recorder -} - -// Verify mocks base method -func (m *MockVerifier) Verify(arg0 context.Context, arg1 *path_mgmt.SegReply, arg2 net.Addr) (chan segverifier.UnitResult, int) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Verify", arg0, arg1, arg2) - ret0, _ := ret[0].(chan segverifier.UnitResult) - ret1, _ := ret[1].(int) - return ret0, ret1 -} - -// Verify indicates an expected call of Verify -func (mr *MockVerifierMockRecorder) Verify(arg0, arg1, arg2 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Verify", reflect.TypeOf((*MockVerifier)(nil).Verify), arg0, arg1, arg2) -} diff --git a/go/lib/infra/modules/segfetcher/segreplyhandler.go b/go/lib/infra/modules/segfetcher/segreplyhandler.go deleted file mode 100644 index 45cddbf3bd..0000000000 --- a/go/lib/infra/modules/segfetcher/segreplyhandler.go +++ /dev/null @@ -1,272 +0,0 @@ -// Copyright 2019 Anapaya Systems -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package segfetcher - -import ( - "context" - "fmt" - "net" - - "github.com/scionproto/scion/go/lib/common" - "github.com/scionproto/scion/go/lib/ctrl/path_mgmt" - "github.com/scionproto/scion/go/lib/ctrl/seg" - "github.com/scionproto/scion/go/lib/infra" - "github.com/scionproto/scion/go/lib/infra/modules/segverifier" - "github.com/scionproto/scion/go/lib/log" - "github.com/scionproto/scion/go/lib/pathdb" - "github.com/scionproto/scion/go/lib/pathdb/query" - "github.com/scionproto/scion/go/lib/revcache" -) - -// Verifier is used to verify a segment reply. -type Verifier interface { - Verify(context.Context, *path_mgmt.SegReply, net.Addr) (chan segverifier.UnitResult, int) -} - -// SegVerifier is a convenience wrapper around segverifier that implements -// the Verifier interface. -type SegVerifier struct { - Verifier infra.Verifier -} - -// Verify calls segverifier for the given reply. -func (v *SegVerifier) Verify(ctx context.Context, reply *path_mgmt.SegReply, - server net.Addr) (chan segverifier.UnitResult, int) { - - return segverifier.StartVerification(ctx, v.Verifier, server, reply.Recs.Recs, - reply.Recs.SRevInfos) -} - -// SegWithHP is a segment with hidden path cfg ids. -type SegWithHP struct { - Seg *seg.Meta - HPCfgIds []*query.HPCfgID -} - -func (s *SegWithHP) String() string { - return fmt.Sprintf("{Seg: %v, HPCfgIds: %v}", s.Seg, s.HPCfgIds) -} - -// Storage is used to store segments and revocations. -type Storage interface { - StoreSegs(context.Context, []*SegWithHP) error - StoreRevs(context.Context, []*path_mgmt.SignedRevInfo) error -} - -// DefaultStorage wraps path DB and revocation cache and offers -// convenience methods that implement the Storage interface. -type DefaultStorage struct { - PathDB pathdb.PathDB - RevCache revcache.RevCache -} - -// StoreSegs stores the given segments in the pathdb in a transaction. -func (s *DefaultStorage) StoreSegs(ctx context.Context, segs []*SegWithHP) error { - tx, err := s.PathDB.BeginTransaction(ctx, nil) - if err != nil { - return err - } - var insertedSegmentIDs []string - defer tx.Rollback() - for _, seg := range segs { - if n, err := tx.InsertWithHPCfgIDs(ctx, seg.Seg, seg.HPCfgIds); err != nil { - return err - } else if n > 0 { - insertedSegmentIDs = append(insertedSegmentIDs, seg.Seg.Segment.GetLoggingID()) - } - } - if err := tx.Commit(); err != nil { - return err - } - if len(insertedSegmentIDs) > 0 { - log.FromCtx(ctx).Debug("Segments inserted in DB", "segments", insertedSegmentIDs) - } - return nil -} - -// StoreRevs stores the given revocations in the revocation cache. -func (s *DefaultStorage) StoreRevs(ctx context.Context, - revs []*path_mgmt.SignedRevInfo) error { - - for _, rev := range revs { - if _, err := s.RevCache.Insert(ctx, rev); err != nil { - return err - } - } - return nil -} - -// ProcessedResult is the result of handling a segment reply. -type ProcessedResult struct { - early chan int - full chan struct{} - segs int - revs []*path_mgmt.SignedRevInfo - err error - verifyErrs []error -} - -// EarlyTriggerProcessed returns a channel that will contain the number of -// successfully stored segments once it is done processing the early trigger. -func (r *ProcessedResult) EarlyTriggerProcessed() <-chan int { - return r.early -} - -// FullReplyProcessed returns a channel that will be closed once the full reply -// has been processed. -func (r *ProcessedResult) FullReplyProcessed() <-chan struct{} { - return r.full -} - -// VerifiedSegs returns the amount of verified segs. This should only be -// accessed after FullReplyProcessed channel has been closed. -func (r *ProcessedResult) VerifiedSegs() int { - return r.segs -} - -// VerifiedRevs returns the verified revocations. This should only be accessed -// after FullReplyProcessed channel has been closed. -func (r *ProcessedResult) VerifiedRevs() []*path_mgmt.SignedRevInfo { - return r.revs -} - -// Err indicates the error that happened when storing the segments. This should -// only be accessed after FullReplyProcessed channel has been closed. -func (r *ProcessedResult) Err() error { - return r.err -} - -// VerificationErrors returns the list of verification errors that happened. -func (r *ProcessedResult) VerificationErrors() []error { - return r.verifyErrs -} - -// ReplyHandler handles replies. -type ReplyHandler interface { - Handle(ctx context.Context, reply *path_mgmt.SegReply, server net.Addr, - earlyTrigger <-chan struct{}) *ProcessedResult -} - -// SegReplyHandler is a handler that verifies and stores seg replies. The -// handler supports an early trigger, so that a partial result can be stored -// early to possibly reply to clients earlier. -type SegReplyHandler struct { - Verifier Verifier - Storage Storage -} - -// Handle handles verifies and stores a single seg reply. -func (h *SegReplyHandler) Handle(ctx context.Context, reply *path_mgmt.SegReply, server net.Addr, - earlyTrigger <-chan struct{}) *ProcessedResult { - - result := &ProcessedResult{ - early: make(chan int, 1), - full: make(chan struct{}), - } - verifiedCh, units := h.Verifier.Verify(ctx, reply, server) - if units == 0 { - close(result.early) - close(result.full) - return result - } - - go func() { - defer log.LogPanicAndExit() - h.verifyAndStore(ctx, earlyTrigger, result, verifiedCh, units) - }() - return result -} - -func (h *SegReplyHandler) verifyAndStore(ctx context.Context, - earlyTrigger <-chan struct{}, result *ProcessedResult, - verifiedCh <-chan segverifier.UnitResult, units int) { - - verifiedUnits := make([]segverifier.UnitResult, 0, units) - var allVerifyErrs []error - totalSegsSaved := 0 - var allRevs []*path_mgmt.SignedRevInfo - defer close(result.full) - defer func() { - if earlyTrigger != nil { - // Unblock channel if done before triggered - result.early <- totalSegsSaved - } - }() - for u := 0; u < units; u++ { - select { - case verifiedUnit := <-verifiedCh: - verifiedUnits = append(verifiedUnits, verifiedUnit) - case <-earlyTrigger: - // Reduce u since this does not process an additional unit. - u-- - segs, revs, verifyErrs, err := h.storeResults(ctx, verifiedUnits) - allVerifyErrs = append(allVerifyErrs, verifyErrs...) - totalSegsSaved += segs - allRevs = append(allRevs, revs...) - result.early <- segs - // TODO(lukedirtwalker): log early store failure - if err == nil { - // clear already processed units - verifiedUnits = verifiedUnits[:0] - } - // Make sure we do not select from this channel again - earlyTrigger = nil - } - } - segs, revs, verifyErrs, err := h.storeResults(ctx, verifiedUnits) - result.verifyErrs = append(allVerifyErrs, verifyErrs...) - result.err = err - totalSegsSaved += segs - result.segs = totalSegsSaved - result.revs = append(allRevs, revs...) -} - -func (h *SegReplyHandler) storeResults(ctx context.Context, - verifiedUnits []segverifier.UnitResult) (int, []*path_mgmt.SignedRevInfo, []error, error) { - - var verifyErrs []error - segs := make([]*SegWithHP, 0, len(verifiedUnits)) - var revs []*path_mgmt.SignedRevInfo - for _, unit := range verifiedUnits { - if err := unit.SegError(); err != nil { - verifyErrs = append(verifyErrs, common.NewBasicError("Failed to verify seg", err, - "seg", unit.Unit.SegMeta.Segment)) - } else { - segs = append(segs, &SegWithHP{ - Seg: unit.Unit.SegMeta, - HPCfgIds: []*query.HPCfgID{&query.NullHpCfgID}, - }) - } - for idx, rev := range unit.Unit.SRevInfos { - if err, ok := unit.Errors[idx]; ok { - verifyErrs = append(verifyErrs, common.NewBasicError("Failed to verify rev", err, - "rev", rev)) - } else { - revs = append(revs, rev) - } - } - } - if len(segs) > 0 { - if err := h.Storage.StoreSegs(ctx, segs); err != nil { - return 0, nil, verifyErrs, err - } - } - if len(revs) > 0 { - if err := h.Storage.StoreRevs(ctx, revs); err != nil { - return len(segs), nil, verifyErrs, h.Storage.StoreRevs(ctx, revs) - } - } - return len(segs), revs, verifyErrs, nil -} diff --git a/go/lib/infra/modules/seghandler/BUILD.bazel b/go/lib/infra/modules/seghandler/BUILD.bazel new file mode 100644 index 0000000000..85a44bfb8d --- /dev/null +++ b/go/lib/infra/modules/seghandler/BUILD.bazel @@ -0,0 +1,45 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "result.go", + "seghandler.go", + "storage.go", + "verifier.go", + ], + importpath = "github.com/scionproto/scion/go/lib/infra/modules/seghandler", + visibility = ["//visibility:public"], + deps = [ + "//go/lib/addr:go_default_library", + "//go/lib/common:go_default_library", + "//go/lib/ctrl/path_mgmt:go_default_library", + "//go/lib/ctrl/seg:go_default_library", + "//go/lib/hiddenpath:go_default_library", + "//go/lib/infra:go_default_library", + "//go/lib/infra/modules/segverifier:go_default_library", + "//go/lib/log:go_default_library", + "//go/lib/pathdb:go_default_library", + "//go/lib/pathdb/query:go_default_library", + "//go/lib/revcache:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["seghandler_test.go"], + embed = [":go_default_library"], + deps = [ + "//go/lib/common:go_default_library", + "//go/lib/ctrl/path_mgmt:go_default_library", + "//go/lib/ctrl/seg:go_default_library", + "//go/lib/infra:go_default_library", + "//go/lib/infra/modules/seghandler/mock_seghandler:go_default_library", + "//go/lib/infra/modules/segverifier:go_default_library", + "//go/lib/mocks/net/mock_net:go_default_library", + "//go/lib/xtest:go_default_library", + "//go/proto:go_default_library", + "@com_github_golang_mock//gomock:go_default_library", + "@com_github_stretchr_testify//assert:go_default_library", + ], +) diff --git a/go/lib/infra/modules/seghandler/mock_seghandler/BUILD.bazel b/go/lib/infra/modules/seghandler/mock_seghandler/BUILD.bazel new file mode 100644 index 0000000000..38ef87562c --- /dev/null +++ b/go/lib/infra/modules/seghandler/mock_seghandler/BUILD.bazel @@ -0,0 +1,14 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["seghandler.go"], + importpath = "github.com/scionproto/scion/go/lib/infra/modules/seghandler/mock_seghandler", + visibility = ["//visibility:public"], + deps = [ + "//go/lib/ctrl/path_mgmt:go_default_library", + "//go/lib/infra/modules/seghandler:go_default_library", + "//go/lib/infra/modules/segverifier:go_default_library", + "@com_github_golang_mock//gomock:go_default_library", + ], +) diff --git a/go/lib/infra/modules/seghandler/mock_seghandler/seghandler.go b/go/lib/infra/modules/seghandler/mock_seghandler/seghandler.go new file mode 100644 index 0000000000..c792a44252 --- /dev/null +++ b/go/lib/infra/modules/seghandler/mock_seghandler/seghandler.go @@ -0,0 +1,104 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/scionproto/scion/go/lib/infra/modules/seghandler (interfaces: Storage,Verifier) + +// Package mock_seghandler is a generated GoMock package. +package mock_seghandler + +import ( + context "context" + gomock "github.com/golang/mock/gomock" + path_mgmt "github.com/scionproto/scion/go/lib/ctrl/path_mgmt" + seghandler "github.com/scionproto/scion/go/lib/infra/modules/seghandler" + segverifier "github.com/scionproto/scion/go/lib/infra/modules/segverifier" + net "net" + reflect "reflect" +) + +// MockStorage is a mock of Storage interface +type MockStorage struct { + ctrl *gomock.Controller + recorder *MockStorageMockRecorder +} + +// MockStorageMockRecorder is the mock recorder for MockStorage +type MockStorageMockRecorder struct { + mock *MockStorage +} + +// NewMockStorage creates a new mock instance +func NewMockStorage(ctrl *gomock.Controller) *MockStorage { + mock := &MockStorage{ctrl: ctrl} + mock.recorder = &MockStorageMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockStorage) EXPECT() *MockStorageMockRecorder { + return m.recorder +} + +// StoreRevs mocks base method +func (m *MockStorage) StoreRevs(arg0 context.Context, arg1 []*path_mgmt.SignedRevInfo) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StoreRevs", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// StoreRevs indicates an expected call of StoreRevs +func (mr *MockStorageMockRecorder) StoreRevs(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StoreRevs", reflect.TypeOf((*MockStorage)(nil).StoreRevs), arg0, arg1) +} + +// StoreSegs mocks base method +func (m *MockStorage) StoreSegs(arg0 context.Context, arg1 []*seghandler.SegWithHP) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StoreSegs", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// StoreSegs indicates an expected call of StoreSegs +func (mr *MockStorageMockRecorder) StoreSegs(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StoreSegs", reflect.TypeOf((*MockStorage)(nil).StoreSegs), arg0, arg1) +} + +// MockVerifier is a mock of Verifier interface +type MockVerifier struct { + ctrl *gomock.Controller + recorder *MockVerifierMockRecorder +} + +// MockVerifierMockRecorder is the mock recorder for MockVerifier +type MockVerifierMockRecorder struct { + mock *MockVerifier +} + +// NewMockVerifier creates a new mock instance +func NewMockVerifier(ctrl *gomock.Controller) *MockVerifier { + mock := &MockVerifier{ctrl: ctrl} + mock.recorder = &MockVerifierMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockVerifier) EXPECT() *MockVerifierMockRecorder { + return m.recorder +} + +// Verify mocks base method +func (m *MockVerifier) Verify(arg0 context.Context, arg1 seghandler.Segments, arg2 net.Addr) (chan segverifier.UnitResult, int) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Verify", arg0, arg1, arg2) + ret0, _ := ret[0].(chan segverifier.UnitResult) + ret1, _ := ret[1].(int) + return ret0, ret1 +} + +// Verify indicates an expected call of Verify +func (mr *MockVerifierMockRecorder) Verify(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Verify", reflect.TypeOf((*MockVerifier)(nil).Verify), arg0, arg1, arg2) +} diff --git a/go/lib/infra/modules/seghandler/result.go b/go/lib/infra/modules/seghandler/result.go new file mode 100644 index 0000000000..523baa96cd --- /dev/null +++ b/go/lib/infra/modules/seghandler/result.go @@ -0,0 +1,62 @@ +// Copyright 2019 Anapaya Systems +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package seghandler + +import "github.com/scionproto/scion/go/lib/ctrl/path_mgmt" + +// ProcessedResult is the result of handling a segment reply. +type ProcessedResult struct { + early chan int + full chan struct{} + segs int + revs []*path_mgmt.SignedRevInfo + err error + verifyErrs []error +} + +// EarlyTriggerProcessed returns a channel that will contain the number of +// successfully stored segments once it is done processing the early trigger. +func (r *ProcessedResult) EarlyTriggerProcessed() <-chan int { + return r.early +} + +// FullReplyProcessed returns a channel that will be closed once the full reply +// has been processed. +func (r *ProcessedResult) FullReplyProcessed() <-chan struct{} { + return r.full +} + +// VerifiedSegs returns the amount of verified segs. This should only be +// accessed after FullReplyProcessed channel has been closed. +func (r *ProcessedResult) VerifiedSegs() int { + return r.segs +} + +// VerifiedRevs returns the verified revocations. This should only be accessed +// after FullReplyProcessed channel has been closed. +func (r *ProcessedResult) VerifiedRevs() []*path_mgmt.SignedRevInfo { + return r.revs +} + +// Err indicates the error that happened when storing the segments. This should +// only be accessed after FullReplyProcessed channel has been closed. +func (r *ProcessedResult) Err() error { + return r.err +} + +// VerificationErrors returns the list of verification errors that happened. +func (r *ProcessedResult) VerificationErrors() []error { + return r.verifyErrs +} diff --git a/go/lib/infra/modules/seghandler/seghandler.go b/go/lib/infra/modules/seghandler/seghandler.go new file mode 100644 index 0000000000..4df32dca34 --- /dev/null +++ b/go/lib/infra/modules/seghandler/seghandler.go @@ -0,0 +1,162 @@ +// Copyright 2019 Anapaya Systems +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package seghandler + +import ( + "context" + "net" + + "github.com/scionproto/scion/go/lib/addr" + "github.com/scionproto/scion/go/lib/common" + "github.com/scionproto/scion/go/lib/ctrl/path_mgmt" + "github.com/scionproto/scion/go/lib/ctrl/seg" + "github.com/scionproto/scion/go/lib/hiddenpath" + "github.com/scionproto/scion/go/lib/infra/modules/segverifier" + "github.com/scionproto/scion/go/lib/log" + "github.com/scionproto/scion/go/lib/pathdb/query" +) + +// Segments is a list of segments and revocations belonging to them. +// Optionally a hidden path group ID is attached. +type Segments struct { + Segs []*seg.Meta + SRevInfos []*path_mgmt.SignedRevInfo + HPGroupID hiddenpath.GroupId +} + +// Handler is a handler that verifies and stores seg replies. The handler +// supports an early trigger, so that a partial result can be stored early to +// possibly reply to clients earlier. +type Handler struct { + Verifier Verifier + Storage Storage +} + +// Handle handles verifies and stores a set of segments. +func (h *Handler) Handle(ctx context.Context, recs Segments, server net.Addr, + earlyTrigger <-chan struct{}) *ProcessedResult { + + result := &ProcessedResult{ + early: make(chan int, 1), + full: make(chan struct{}), + } + verifiedCh, units := h.Verifier.Verify(ctx, recs, server) + if units == 0 { + close(result.early) + close(result.full) + return result + } + + go func() { + defer log.LogPanicAndExit() + h.verifyAndStore(ctx, earlyTrigger, result, verifiedCh, + units, recs.HPGroupID) + }() + return result +} + +func (h *Handler) verifyAndStore(ctx context.Context, + earlyTrigger <-chan struct{}, result *ProcessedResult, + verifiedCh <-chan segverifier.UnitResult, + units int, hpGroupID hiddenpath.GroupId) { + + verifiedUnits := make([]segverifier.UnitResult, 0, units) + var allVerifyErrs []error + totalSegsSaved := 0 + var allRevs []*path_mgmt.SignedRevInfo + defer close(result.full) + defer func() { + if earlyTrigger != nil { + // Unblock channel if done before triggered + result.early <- totalSegsSaved + } + }() + for u := 0; u < units; u++ { + select { + case verifiedUnit := <-verifiedCh: + verifiedUnits = append(verifiedUnits, verifiedUnit) + case <-earlyTrigger: + // Reduce u since this does not process an additional unit. + u-- + segs, revs, verifyErrs, err := h.storeResults(ctx, verifiedUnits, hpGroupID) + allVerifyErrs = append(allVerifyErrs, verifyErrs...) + totalSegsSaved += segs + allRevs = append(allRevs, revs...) + result.early <- segs + // TODO(lukedirtwalker): log early store failure + if err == nil { + // clear already processed units + verifiedUnits = verifiedUnits[:0] + } + // Make sure we do not select from this channel again + earlyTrigger = nil + } + } + segs, revs, verifyErrs, err := h.storeResults(ctx, verifiedUnits, hpGroupID) + result.verifyErrs = append(allVerifyErrs, verifyErrs...) + result.err = err + totalSegsSaved += segs + result.segs = totalSegsSaved + result.revs = append(allRevs, revs...) +} + +func (h *Handler) storeResults(ctx context.Context, verifiedUnits []segverifier.UnitResult, + hpGroupID hiddenpath.GroupId) (int, []*path_mgmt.SignedRevInfo, []error, error) { + + var verifyErrs []error + segs := make([]*SegWithHP, 0, len(verifiedUnits)) + var revs []*path_mgmt.SignedRevInfo + for _, unit := range verifiedUnits { + if err := unit.SegError(); err != nil { + verifyErrs = append(verifyErrs, common.NewBasicError("Failed to verify seg", err, + "seg", unit.Unit.SegMeta.Segment)) + } else { + segs = append(segs, &SegWithHP{ + Seg: unit.Unit.SegMeta, + HPGroup: hpGroupID, + }) + } + for idx, rev := range unit.Unit.SRevInfos { + if err, ok := unit.Errors[idx]; ok { + verifyErrs = append(verifyErrs, common.NewBasicError("Failed to verify rev", err, + "rev", rev)) + } else { + revs = append(revs, rev) + } + } + } + if len(segs) > 0 { + if err := h.Storage.StoreSegs(ctx, segs); err != nil { + return 0, nil, verifyErrs, err + } + } + if len(revs) > 0 { + if err := h.Storage.StoreRevs(ctx, revs); err != nil { + return len(segs), nil, verifyErrs, h.Storage.StoreRevs(ctx, revs) + } + } + return len(segs), revs, verifyErrs, nil +} + +func convertHPGroupID(id hiddenpath.GroupId) []*query.HPCfgID { + return []*query.HPCfgID{ + { + IA: addr.IA{ + A: id.OwnerAS, + }, + ID: uint64(id.Suffix), + }, + } +} diff --git a/go/lib/infra/modules/segfetcher/segreplyhandler_test.go b/go/lib/infra/modules/seghandler/seghandler_test.go similarity index 71% rename from go/lib/infra/modules/segfetcher/segreplyhandler_test.go rename to go/lib/infra/modules/seghandler/seghandler_test.go index d9269a8b14..74de44c129 100644 --- a/go/lib/infra/modules/segfetcher/segreplyhandler_test.go +++ b/go/lib/infra/modules/seghandler/seghandler_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package segfetcher_test +package seghandler_test import ( "context" @@ -26,11 +26,10 @@ import ( "github.com/scionproto/scion/go/lib/ctrl/path_mgmt" "github.com/scionproto/scion/go/lib/ctrl/seg" "github.com/scionproto/scion/go/lib/infra" - "github.com/scionproto/scion/go/lib/infra/modules/segfetcher" - "github.com/scionproto/scion/go/lib/infra/modules/segfetcher/mock_segfetcher" + "github.com/scionproto/scion/go/lib/infra/modules/seghandler" + "github.com/scionproto/scion/go/lib/infra/modules/seghandler/mock_seghandler" "github.com/scionproto/scion/go/lib/infra/modules/segverifier" "github.com/scionproto/scion/go/lib/mocks/net/mock_net" - "github.com/scionproto/scion/go/lib/pathdb/query" "github.com/scionproto/scion/go/lib/xtest" "github.com/scionproto/scion/go/proto" ) @@ -44,21 +43,21 @@ func TestReplyHandlerEmptyReply(t *testing.T) { ctx, cancelF := context.WithTimeout(context.Background(), TestTimeout) defer cancelF() - reply := &path_mgmt.SegReply{} + segs := seghandler.Segments{} earlyTrigger := make(chan struct{}) verified := make(chan segverifier.UnitResult) close(verified) server := mock_net.NewMockAddr(ctrl) - storage := mock_segfetcher.NewMockStorage(ctrl) - verifier := mock_segfetcher.NewMockVerifier(ctrl) - verifier.EXPECT().Verify(ctx, reply, gomock.Eq(server)).Return(verified, 0) - handler := segfetcher.SegReplyHandler{ + storage := mock_seghandler.NewMockStorage(ctrl) + verifier := mock_seghandler.NewMockVerifier(ctrl) + verifier.EXPECT().Verify(ctx, segs, gomock.Eq(server)).Return(verified, 0) + handler := seghandler.Handler{ Storage: storage, Verifier: verifier, } - r := handler.Handle(ctx, reply, server, earlyTrigger) + r := handler.Handle(ctx, segs, server, earlyTrigger) AssertRead(t, r.EarlyTriggerProcessed(), 0, time.Second/2) xtest.AssertReadReturnsBefore(t, r.FullReplyProcessed(), time.Second/2) assert.NoError(t, r.Err()) @@ -75,7 +74,7 @@ func TestReplyHandlerErrors(t *testing.T) { ctx, cancelF := context.WithTimeout(context.Background(), TestTimeout) defer cancelF() - reply := &path_mgmt.SegReply{} + segs := seghandler.Segments{} earlyTrigger := make(chan struct{}) verified := make(chan segverifier.UnitResult) @@ -88,14 +87,14 @@ func TestReplyHandlerErrors(t *testing.T) { rev1, err := path_mgmt.NewSignedRevInfo(&path_mgmt.RevInfo{}, infra.NullSigner) xtest.FailOnErr(t, err) - storage := mock_segfetcher.NewMockStorage(ctrl) - verifier := mock_segfetcher.NewMockVerifier(ctrl) - verifier.EXPECT().Verify(ctx, reply, gomock.Any()).Return(verified, 3) - handler := segfetcher.SegReplyHandler{ + storage := mock_seghandler.NewMockStorage(ctrl) + verifier := mock_seghandler.NewMockVerifier(ctrl) + verifier.EXPECT().Verify(ctx, segs, gomock.Any()).Return(verified, 3) + handler := seghandler.Handler{ Storage: storage, Verifier: verifier, } - r := handler.Handle(ctx, reply, nil, earlyTrigger) + r := handler.Handle(ctx, segs, nil, earlyTrigger) verified <- segverifier.UnitResult{ Unit: &segverifier.Unit{ SegMeta: &seg.Meta{}, @@ -133,39 +132,36 @@ func TestReplyHandlerNoErrors(t *testing.T) { ctx, cancelF := context.WithTimeout(context.Background(), TestTimeout) defer cancelF() - seg1 := &segfetcher.SegWithHP{ - Seg: &seg.Meta{Type: proto.PathSegType_down}, - HPCfgIds: []*query.HPCfgID{&query.NullHpCfgID}, + seg1 := &seghandler.SegWithHP{ + Seg: &seg.Meta{Type: proto.PathSegType_down}, } - seg2 := &segfetcher.SegWithHP{ - Seg: &seg.Meta{Type: proto.PathSegType_up}, - HPCfgIds: []*query.HPCfgID{&query.NullHpCfgID}, + seg2 := &seghandler.SegWithHP{ + Seg: &seg.Meta{Type: proto.PathSegType_up}, } - seg3 := &segfetcher.SegWithHP{ - Seg: &seg.Meta{Type: proto.PathSegType_core}, - HPCfgIds: []*query.HPCfgID{&query.NullHpCfgID}, + seg3 := &seghandler.SegWithHP{ + Seg: &seg.Meta{Type: proto.PathSegType_core}, } rev1, err := path_mgmt.NewSignedRevInfo(&path_mgmt.RevInfo{}, infra.NullSigner) xtest.FailOnErr(t, err) - reply := &path_mgmt.SegReply{} + segs := seghandler.Segments{} earlyTrigger := make(chan struct{}) verified := make(chan segverifier.UnitResult) - storage := mock_segfetcher.NewMockStorage(ctrl) - verifier := mock_segfetcher.NewMockVerifier(ctrl) - verifier.EXPECT().Verify(ctx, reply, gomock.Any()).Return(verified, 3) - handler := segfetcher.SegReplyHandler{ + storage := mock_seghandler.NewMockStorage(ctrl) + verifier := mock_seghandler.NewMockVerifier(ctrl) + verifier.EXPECT().Verify(ctx, segs, gomock.Any()).Return(verified, 3) + handler := seghandler.Handler{ Storage: storage, Verifier: verifier, } seg1Store := storage.EXPECT().StoreSegs(gomock.Any(), - gomock.Eq([]*segfetcher.SegWithHP{seg1})) + gomock.Eq([]*seghandler.SegWithHP{seg1})) storage.EXPECT().StoreSegs(gomock.Any(), - gomock.Eq([]*segfetcher.SegWithHP{seg2, seg3})).After(seg1Store) + gomock.Eq([]*seghandler.SegWithHP{seg2, seg3})).After(seg1Store) storage.EXPECT().StoreRevs(gomock.Any(), gomock.Eq([]*path_mgmt.SignedRevInfo{rev1})).After(seg1Store) - r := handler.Handle(ctx, reply, nil, earlyTrigger) + r := handler.Handle(ctx, segs, nil, earlyTrigger) verified <- segverifier.UnitResult{ Unit: &segverifier.Unit{ SegMeta: seg1.Seg, @@ -197,33 +193,31 @@ func TestReplyHandlerAllVerifiedInEarlyInterval(t *testing.T) { ctx, cancelF := context.WithTimeout(context.Background(), TestTimeout) defer cancelF() - seg1 := &segfetcher.SegWithHP{ - Seg: &seg.Meta{Type: proto.PathSegType_down}, - HPCfgIds: []*query.HPCfgID{&query.NullHpCfgID}, + seg1 := &seghandler.SegWithHP{ + Seg: &seg.Meta{Type: proto.PathSegType_down}, } - seg2 := &segfetcher.SegWithHP{ - Seg: &seg.Meta{Type: proto.PathSegType_up}, - HPCfgIds: []*query.HPCfgID{&query.NullHpCfgID}, + seg2 := &seghandler.SegWithHP{ + Seg: &seg.Meta{Type: proto.PathSegType_up}, } rev1, err := path_mgmt.NewSignedRevInfo(&path_mgmt.RevInfo{}, infra.NullSigner) xtest.FailOnErr(t, err) - reply := &path_mgmt.SegReply{} + segs := seghandler.Segments{} earlyTrigger := make(chan struct{}) verified := make(chan segverifier.UnitResult) - storage := mock_segfetcher.NewMockStorage(ctrl) - verifier := mock_segfetcher.NewMockVerifier(ctrl) - verifier.EXPECT().Verify(ctx, reply, gomock.Any()).Return(verified, 2) - handler := segfetcher.SegReplyHandler{ + storage := mock_seghandler.NewMockStorage(ctrl) + verifier := mock_seghandler.NewMockVerifier(ctrl) + verifier.EXPECT().Verify(ctx, segs, gomock.Any()).Return(verified, 2) + handler := seghandler.Handler{ Storage: storage, Verifier: verifier, } storage.EXPECT().StoreSegs(gomock.Any(), - gomock.Eq([]*segfetcher.SegWithHP{seg1, seg2})) + gomock.Eq([]*seghandler.SegWithHP{seg1, seg2})) storage.EXPECT().StoreRevs(gomock.Any(), gomock.Eq([]*path_mgmt.SignedRevInfo{rev1})) - r := handler.Handle(ctx, reply, nil, earlyTrigger) + r := handler.Handle(ctx, segs, nil, earlyTrigger) verified <- segverifier.UnitResult{ Unit: &segverifier.Unit{ SegMeta: seg1.Seg, @@ -249,36 +243,34 @@ func TestReplyHandlerEarlyTriggerStorageError(t *testing.T) { ctx, cancelF := context.WithTimeout(context.Background(), TestTimeout) defer cancelF() - seg1 := &segfetcher.SegWithHP{ - Seg: &seg.Meta{Type: proto.PathSegType_down}, - HPCfgIds: []*query.HPCfgID{&query.NullHpCfgID}, + seg1 := &seghandler.SegWithHP{ + Seg: &seg.Meta{Type: proto.PathSegType_down}, } - seg2 := &segfetcher.SegWithHP{ - Seg: &seg.Meta{Type: proto.PathSegType_up}, - HPCfgIds: []*query.HPCfgID{&query.NullHpCfgID}, + seg2 := &seghandler.SegWithHP{ + Seg: &seg.Meta{Type: proto.PathSegType_up}, } rev1, err := path_mgmt.NewSignedRevInfo(&path_mgmt.RevInfo{}, infra.NullSigner) xtest.FailOnErr(t, err) - reply := &path_mgmt.SegReply{} + segs := seghandler.Segments{} earlyTrigger := make(chan struct{}) verified := make(chan segverifier.UnitResult) - storage := mock_segfetcher.NewMockStorage(ctrl) - verifier := mock_segfetcher.NewMockVerifier(ctrl) - verifier.EXPECT().Verify(ctx, reply, gomock.Any()).Return(verified, 2) - handler := segfetcher.SegReplyHandler{ + storage := mock_seghandler.NewMockStorage(ctrl) + verifier := mock_seghandler.NewMockVerifier(ctrl) + verifier.EXPECT().Verify(ctx, segs, gomock.Any()).Return(verified, 2) + handler := seghandler.Handler{ Storage: storage, Verifier: verifier, } seg1Store := storage.EXPECT().StoreSegs(gomock.Any(), - gomock.Eq([]*segfetcher.SegWithHP{seg1})). + gomock.Eq([]*seghandler.SegWithHP{seg1})). Return(common.NewBasicError("Test error", nil)) storage.EXPECT().StoreSegs(gomock.Any(), - gomock.Eq([]*segfetcher.SegWithHP{seg1, seg2})).After(seg1Store) + gomock.Eq([]*seghandler.SegWithHP{seg1, seg2})).After(seg1Store) storage.EXPECT().StoreRevs(gomock.Any(), gomock.Eq([]*path_mgmt.SignedRevInfo{rev1})) - r := handler.Handle(ctx, reply, nil, earlyTrigger) + r := handler.Handle(ctx, segs, nil, earlyTrigger) verified <- segverifier.UnitResult{ Unit: &segverifier.Unit{ SegMeta: seg1.Seg, @@ -305,32 +297,30 @@ func TestReplyHandlerStorageError(t *testing.T) { ctx, cancelF := context.WithTimeout(context.Background(), TestTimeout) defer cancelF() - seg1 := &segfetcher.SegWithHP{ - Seg: &seg.Meta{Type: proto.PathSegType_down}, - HPCfgIds: []*query.HPCfgID{&query.NullHpCfgID}, + seg1 := &seghandler.SegWithHP{ + Seg: &seg.Meta{Type: proto.PathSegType_down}, } - seg2 := &segfetcher.SegWithHP{ - Seg: &seg.Meta{Type: proto.PathSegType_up}, - HPCfgIds: []*query.HPCfgID{&query.NullHpCfgID}, + seg2 := &seghandler.SegWithHP{ + Seg: &seg.Meta{Type: proto.PathSegType_up}, } - reply := &path_mgmt.SegReply{} + segs := seghandler.Segments{} earlyTrigger := make(chan struct{}) verified := make(chan segverifier.UnitResult) - storage := mock_segfetcher.NewMockStorage(ctrl) - verifier := mock_segfetcher.NewMockVerifier(ctrl) - verifier.EXPECT().Verify(ctx, reply, gomock.Any()).Return(verified, 2) - handler := segfetcher.SegReplyHandler{ + storage := mock_seghandler.NewMockStorage(ctrl) + verifier := mock_seghandler.NewMockVerifier(ctrl) + verifier.EXPECT().Verify(ctx, segs, gomock.Any()).Return(verified, 2) + handler := seghandler.Handler{ Storage: storage, Verifier: verifier, } storageErr := common.NewBasicError("Test error", nil) storage.EXPECT().StoreSegs(gomock.Any(), - gomock.Eq([]*segfetcher.SegWithHP{seg1, seg2})). + gomock.Eq([]*seghandler.SegWithHP{seg1, seg2})). Return(storageErr) close(earlyTrigger) - r := handler.Handle(ctx, reply, nil, earlyTrigger) + r := handler.Handle(ctx, segs, nil, earlyTrigger) AssertRead(t, r.EarlyTriggerProcessed(), 0, time.Second/2) verified <- segverifier.UnitResult{ Unit: &segverifier.Unit{ diff --git a/go/lib/infra/modules/seghandler/storage.go b/go/lib/infra/modules/seghandler/storage.go new file mode 100644 index 0000000000..e9f3a99e0b --- /dev/null +++ b/go/lib/infra/modules/seghandler/storage.go @@ -0,0 +1,92 @@ +// Copyright 2019 Anapaya Systems +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package seghandler + +import ( + "context" + "fmt" + "sort" + + "github.com/scionproto/scion/go/lib/ctrl/path_mgmt" + "github.com/scionproto/scion/go/lib/ctrl/seg" + "github.com/scionproto/scion/go/lib/hiddenpath" + "github.com/scionproto/scion/go/lib/log" + "github.com/scionproto/scion/go/lib/pathdb" + "github.com/scionproto/scion/go/lib/revcache" +) + +// SegWithHP is a segment with hidden path cfg ids. +type SegWithHP struct { + Seg *seg.Meta + HPGroup hiddenpath.GroupId +} + +func (s *SegWithHP) String() string { + return fmt.Sprintf("{Seg: %v, HPGroup: %s}", s.Seg, s.HPGroup) +} + +// Storage is used to store segments and revocations. +type Storage interface { + StoreSegs(context.Context, []*SegWithHP) error + StoreRevs(context.Context, []*path_mgmt.SignedRevInfo) error +} + +// DefaultStorage wraps path DB and revocation cache and offers +// convenience methods that implement the Storage interface. +type DefaultStorage struct { + PathDB pathdb.PathDB + RevCache revcache.RevCache +} + +// StoreSegs stores the given segments in the pathdb in a transaction. +func (s *DefaultStorage) StoreSegs(ctx context.Context, segs []*SegWithHP) error { + tx, err := s.PathDB.BeginTransaction(ctx, nil) + if err != nil { + return err + } + // Sort to prevent sql deadlock. + sort.Slice(segs, func(i, j int) bool { + return segs[i].Seg.Segment.GetLoggingID() < segs[j].Seg.Segment.GetLoggingID() + }) + var insertedSegmentIDs []string + defer tx.Rollback() + for _, seg := range segs { + n, err := tx.InsertWithHPCfgIDs(ctx, seg.Seg, convertHPGroupID(seg.HPGroup)) + if err != nil { + return err + } else if n > 0 { + insertedSegmentIDs = append(insertedSegmentIDs, seg.Seg.Segment.GetLoggingID()) + } + } + if err := tx.Commit(); err != nil { + return err + } + if len(insertedSegmentIDs) > 0 { + log.FromCtx(ctx).Debug("Segments inserted in DB", "segments", insertedSegmentIDs) + } + return nil +} + +// StoreRevs stores the given revocations in the revocation cache. +func (s *DefaultStorage) StoreRevs(ctx context.Context, + revs []*path_mgmt.SignedRevInfo) error { + + for _, rev := range revs { + if _, err := s.RevCache.Insert(ctx, rev); err != nil { + return err + } + } + return nil +} diff --git a/go/lib/infra/modules/seghandler/verifier.go b/go/lib/infra/modules/seghandler/verifier.go new file mode 100644 index 0000000000..1f5459da4c --- /dev/null +++ b/go/lib/infra/modules/seghandler/verifier.go @@ -0,0 +1,41 @@ +// Copyright 2019 Anapaya Systems +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package seghandler + +import ( + "context" + "net" + + "github.com/scionproto/scion/go/lib/infra" + "github.com/scionproto/scion/go/lib/infra/modules/segverifier" +) + +// Verifier is used to verify a segment reply. +type Verifier interface { + Verify(context.Context, Segments, net.Addr) (chan segverifier.UnitResult, int) +} + +// DefaultVerifier is a convenience wrapper around segverifier that implements +// the Verifier interface. +type DefaultVerifier struct { + Verifier infra.Verifier +} + +// Verify calls segverifier for the given reply. +func (v *DefaultVerifier) Verify(ctx context.Context, recs Segments, + server net.Addr) (chan segverifier.UnitResult, int) { + + return segverifier.StartVerification(ctx, v.Verifier, server, recs.Segs, recs.SRevInfos) +} diff --git a/go/lib/infra/modules/segverifier/segverifier.go b/go/lib/infra/modules/segverifier/segverifier.go index 79dc68ea9c..581085a638 100644 --- a/go/lib/infra/modules/segverifier/segverifier.go +++ b/go/lib/infra/modules/segverifier/segverifier.go @@ -45,53 +45,6 @@ const ( segErrIndex = -1 ) -// SegVerified is the callback for a successful segment verification. -// The function must adhere to the given context. -type SegVerified func(context.Context, *seg.Meta) - -// SegVerificationFailed is the callback for a failed segment verification. -// The function must return immediately. -type SegVerificationFailed func(*seg.Meta, error) - -// RevVerified is the callback for a successful revocation verification. -// The function must adhere to the given context. -type RevVerified func(context.Context, *path_mgmt.SignedRevInfo) - -// RevVerificationFailed is the callback for a failed revocation verification. -// The function must return immediately. -type RevVerificationFailed func(*path_mgmt.SignedRevInfo, error) - -// Verify starts the verification for the given segMeta and sRevInfos. -// The verifiedSeg and verifiedRev callbacks are called for verified segs/revs. -// The segError/revError callbacks are called for verification errors. -func Verify(ctx context.Context, verifier infra.Verifier, server net.Addr, segMetas []*seg.Meta, - sRevInfos []*path_mgmt.SignedRevInfo, verifiedSeg SegVerified, verifiedRev RevVerified, - segError SegVerificationFailed, revError RevVerificationFailed) { - - unitResultsC, units := StartVerification(ctx, verifier, server, segMetas, sRevInfos) -Loop: - for numResults := 0; numResults < units; numResults++ { - select { - case result := <-unitResultsC: - // Insert successfully verified revocations into the revcache - for index, revocation := range result.Unit.SRevInfos { - if err, ok := result.Errors[index]; ok { - revError(revocation, err) - } else { - verifiedRev(ctx, revocation) - } - } - if err := result.SegError(); err != nil { - segError(result.Unit.SegMeta, err) - } else { - verifiedSeg(ctx, result.Unit.SegMeta) - } - case <-ctx.Done(): - break Loop - } - } -} - // StartVerification builds the units for the given segMetas and sRevInfos // and spawns verify method on the units. // StartVerification returns a channel for the UnitResult and the expected amount of results. diff --git a/go/path_srv/internal/handlers/BUILD.bazel b/go/path_srv/internal/handlers/BUILD.bazel index 6d5f61b562..de5982f109 100644 --- a/go/path_srv/internal/handlers/BUILD.bazel +++ b/go/path_srv/internal/handlers/BUILD.bazel @@ -21,6 +21,7 @@ go_library( "//go/lib/infra:go_default_library", "//go/lib/infra/messenger:go_default_library", "//go/lib/infra/modules/segfetcher:go_default_library", + "//go/lib/infra/modules/seghandler:go_default_library", "//go/lib/infra/modules/segverifier:go_default_library", "//go/lib/log:go_default_library", "//go/lib/pathdb:go_default_library", diff --git a/go/path_srv/internal/handlers/common.go b/go/path_srv/internal/handlers/common.go index 02d1a62694..bc5a10d74a 100644 --- a/go/path_srv/internal/handlers/common.go +++ b/go/path_srv/internal/handlers/common.go @@ -16,21 +16,15 @@ package handlers import ( "context" - "net" - "sort" - "sync" "time" "github.com/opentracing/opentracing-go" "github.com/scionproto/scion/go/lib/addr" "github.com/scionproto/scion/go/lib/common" - "github.com/scionproto/scion/go/lib/ctrl/path_mgmt" "github.com/scionproto/scion/go/lib/ctrl/seg" "github.com/scionproto/scion/go/lib/infra" "github.com/scionproto/scion/go/lib/infra/modules/segfetcher" - "github.com/scionproto/scion/go/lib/infra/modules/segverifier" - "github.com/scionproto/scion/go/lib/log" "github.com/scionproto/scion/go/lib/pathdb" "github.com/scionproto/scion/go/lib/pathdb/query" "github.com/scionproto/scion/go/lib/revcache" @@ -135,67 +129,3 @@ func (h *baseHandler) sleepOrTimeout(ctx context.Context) error { return nil } } - -func (h *baseHandler) verifyAndStore(ctx context.Context, src net.Addr, - recs []*seg.Meta, revInfos []*path_mgmt.SignedRevInfo) error { - // TODO(lukedirtwalker): collect the verified segs/revoc and return them. - - logger := log.FromCtx(ctx) - // verify and store the segments - var insertedSegmentIDs []string - var mtx sync.Mutex - verifiedSegs := make([]*seg.Meta, 0, len(recs)) - verifiedSeg := func(ctx context.Context, s *seg.Meta) { - mtx.Lock() - defer mtx.Unlock() - verifiedSegs = append(verifiedSegs, s) - } - verifiedRev := func(ctx context.Context, rev *path_mgmt.SignedRevInfo) { - if _, err := h.revCache.Insert(ctx, rev); err != nil { - logger.Error("Unable to insert revocation into revcache", "rev", rev, "err", err) - } - } - segErr := func(s *seg.Meta, err error) { - logger.Warn("Segment verification failed", "segment", s.Segment, "err", err) - } - revErr := func(revocation *path_mgmt.SignedRevInfo, err error) { - logger.Warn("Revocation verification failed", "revocation", revocation, "err", err) - } - segverifier.Verify(ctx, h.verifierFactory.NewVerifier(), src, recs, revInfos, verifiedSeg, - verifiedRev, segErr, revErr) - - // Return early if we have nothing to insert. - if len(verifiedSegs) == 0 { - return common.NewBasicError(NoSegmentsErr, nil) - } - tx, err := h.pathDB.BeginTransaction(ctx, nil) - if err != nil { - return err - } - // sort to prevent sql deadlock - sort.Slice(verifiedSegs, func(i, j int) bool { - return verifiedSegs[i].Segment.GetLoggingID() < verifiedSegs[j].Segment.GetLoggingID() - }) - for _, s := range verifiedSegs { - n, err := tx.Insert(ctx, s) - if err != nil { - if errRollback := tx.Rollback(); errRollback != nil { - err = common.NewBasicError("Unable to rollback", err, "rollbackErr", errRollback) - } - return common.NewBasicError("Unable to insert segment into path database", err, - "seg", s.Segment) - } - if wasInserted := n > 0; wasInserted { - insertedSegmentIDs = append(insertedSegmentIDs, s.Segment.GetLoggingID()) - } - } - err = tx.Commit() - if err != nil { - return common.NewBasicError("Failed to commit transaction", err) - } - if len(insertedSegmentIDs) > 0 { - logger.Debug("Segments inserted in DB", "count", len(insertedSegmentIDs), - "segments", insertedSegmentIDs) - } - return nil -} diff --git a/go/path_srv/internal/handlers/segreg.go b/go/path_srv/internal/handlers/segreg.go index 06fe6f7e38..d1e8ca3821 100644 --- a/go/path_srv/internal/handlers/segreg.go +++ b/go/path_srv/internal/handlers/segreg.go @@ -20,6 +20,7 @@ import ( "github.com/scionproto/scion/go/lib/ctrl/path_mgmt" "github.com/scionproto/scion/go/lib/infra" "github.com/scionproto/scion/go/lib/infra/messenger" + "github.com/scionproto/scion/go/lib/infra/modules/seghandler" "github.com/scionproto/scion/go/lib/log" "github.com/scionproto/scion/go/lib/snet" "github.com/scionproto/scion/go/proto" @@ -28,6 +29,7 @@ import ( type segRegHandler struct { *baseHandler localIA addr.IA + handler seghandler.Handler } func NewSegRegHandler(args HandlerArgs) infra.Handler { @@ -35,6 +37,15 @@ func NewSegRegHandler(args HandlerArgs) infra.Handler { handler := &segRegHandler{ baseHandler: newBaseHandler(r, args), localIA: args.IA, + handler: seghandler.Handler{ + Verifier: &seghandler.DefaultVerifier{ + Verifier: args.VerifierFactory.NewVerifier(), + }, + Storage: &seghandler.DefaultStorage{ + PathDB: args.PathDB, + RevCache: args.RevCache, + }, + }, } return handler.Handle() } @@ -76,7 +87,14 @@ func (h *segRegHandler) Handle() *infra.HandlerResult { NextHop: peerPath.OverlayNextHop(), Host: addr.NewSVCUDPAppAddr(addr.SvcBS), } - if err := h.verifyAndStore(ctx, svcToQuery, segReg.Recs, segReg.SRevInfos); err != nil { + segs := seghandler.Segments{ + Segs: segReg.Recs, + SRevInfos: segReg.SRevInfos, + } + res := h.handler.Handle(ctx, segs, svcToQuery, nil) + // wait until processing is done. + <-res.FullReplyProcessed() + if err := res.Err(); err != nil { sendAck(proto.Ack_ErrCode_reject, err.Error()) return infra.MetricsErrInvalid } diff --git a/go/path_srv/internal/handlers/segsync.go b/go/path_srv/internal/handlers/segsync.go index 08f2e129df..64023e9b81 100644 --- a/go/path_srv/internal/handlers/segsync.go +++ b/go/path_srv/internal/handlers/segsync.go @@ -20,6 +20,7 @@ import ( "github.com/scionproto/scion/go/lib/ctrl/path_mgmt" "github.com/scionproto/scion/go/lib/infra" "github.com/scionproto/scion/go/lib/infra/messenger" + "github.com/scionproto/scion/go/lib/infra/modules/seghandler" "github.com/scionproto/scion/go/lib/log" "github.com/scionproto/scion/go/lib/snet" "github.com/scionproto/scion/go/proto" @@ -28,6 +29,7 @@ import ( type syncHandler struct { *baseHandler localIA addr.IA + handler seghandler.Handler } func NewSyncHandler(args HandlerArgs) infra.Handler { @@ -35,6 +37,15 @@ func NewSyncHandler(args HandlerArgs) infra.Handler { handler := &syncHandler{ baseHandler: newBaseHandler(r, args), localIA: args.IA, + handler: seghandler.Handler{ + Verifier: &seghandler.DefaultVerifier{ + Verifier: args.VerifierFactory.NewVerifier(), + }, + Storage: &seghandler.DefaultStorage{ + PathDB: args.PathDB, + RevCache: args.RevCache, + }, + }, } return handler.Handle() } @@ -75,7 +86,14 @@ func (h *syncHandler) Handle() *infra.HandlerResult { NextHop: peerPath.OverlayNextHop(), Host: addr.NewSVCUDPAppAddr(addr.SvcPS), } - if err := h.verifyAndStore(ctx, svcToQuery, segSync.Recs, segSync.SRevInfos); err != nil { + segs := seghandler.Segments{ + Segs: segSync.Recs, + SRevInfos: segSync.SRevInfos, + } + res := h.handler.Handle(ctx, segs, svcToQuery, nil) + // wait until processing is done. + <-res.FullReplyProcessed() + if err := res.Err(); err != nil { sendAck(proto.Ack_ErrCode_reject, err.Error()) return infra.MetricsErrInvalid } diff --git a/tools/gomocks b/tools/gomocks index e8ccbaa7c3..37ba4a49e9 100755 --- a/tools/gomocks +++ b/tools/gomocks @@ -42,7 +42,8 @@ MOCK_TARGETS = [ (SCION_PACKAGE_PREFIX + "/go/lib/infra/messenger", "LocalSVCRouter,Resolver"), (SCION_PACKAGE_PREFIX + "/go/lib/infra/modules/segfetcher", "DstProvider,ReplyHandler,Requester,RequestAPI,Resolver," + - "Splitter,Storage,Validator,Verifier"), + "Splitter,Validator"), + (SCION_PACKAGE_PREFIX + "/go/lib/infra/modules/seghandler", "Storage,Verifier"), (SCION_PACKAGE_PREFIX + "/go/lib/infra/modules/trust/trustdb", "TrustDB"), (SCION_PACKAGE_PREFIX + "/go/lib/l4", "L4Header"), (SCION_PACKAGE_PREFIX + "/go/lib/log", "Handler,Logger"),