Skip to content

Commit

Permalink
Disambiguate whether a revalidator recognized a request when checking…
Browse files Browse the repository at this point in the history
… for a need to revalidate (#87)

* feat(revalidator): add handled bool

Previously, the revalidators were very ambiguous in identifying if they had actually handled a
request for a revalidation check or not -- a "nil, nil" or "voucher result, nil" was interpreted as
not being handled. This is unfortunate, cause particularly a "voucher result, nil" is an indication
the request is handled by this check, and we should stop processing other revalidators that might
override the value with "nil, nil". We now add a boolean to disambiguate whether the revalidator
"recognized" this request --- if true, processing stops.

* style(comments): fix types
  • Loading branch information
hannahhoward authored Sep 29, 2020
1 parent 694cf47 commit 8a1e460
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 26 deletions.
24 changes: 18 additions & 6 deletions impl/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,14 @@ func (m *manager) OnDataReceived(chid datatransfer.ChannelID, link ipld.Link, si
if chid.Initiator != m.peerID {
var result datatransfer.VoucherResult
var err error
var handled bool
_ = m.revalidators.Each(func(_ datatransfer.TypeIdentifier, _ encoding.Decoder, processor registry.Processor) error {
revalidator := processor.(datatransfer.Revalidator)
result, err = revalidator.OnPushDataReceived(chid, size)
return err
handled, result, err = revalidator.OnPushDataReceived(chid, size)
if handled {
return errors.New("stop processing")
}
return nil
})
if err != nil || result != nil {
msg, err := m.processRevalidationResult(chid, result, err)
Expand All @@ -58,10 +62,14 @@ func (m *manager) OnDataSent(chid datatransfer.ChannelID, link ipld.Link, size u
if chid.Initiator != m.peerID {
var result datatransfer.VoucherResult
var err error
var handled bool
_ = m.revalidators.Each(func(_ datatransfer.TypeIdentifier, _ encoding.Decoder, processor registry.Processor) error {
revalidator := processor.(datatransfer.Revalidator)
result, err = revalidator.OnPullDataSent(chid, size)
return err
handled, result, err = revalidator.OnPullDataSent(chid, size)
if handled {
return errors.New("stop processing")
}
return nil
})
if err != nil || result != nil {
return m.processRevalidationResult(chid, result, err)
Expand Down Expand Up @@ -330,10 +338,14 @@ func (m *manager) processRevalidationResult(chid datatransfer.ChannelID, result
func (m *manager) completeMessage(chid datatransfer.ChannelID) (datatransfer.Response, error) {
var result datatransfer.VoucherResult
var resultErr error
var handled bool
_ = m.revalidators.Each(func(_ datatransfer.TypeIdentifier, _ encoding.Decoder, processor registry.Processor) error {
revalidator := processor.(datatransfer.Revalidator)
result, resultErr = revalidator.OnComplete(chid)
return resultErr
handled, result, resultErr = revalidator.OnComplete(chid)
if handled {
return errors.New("stop processing")
}
return nil
})
if result != nil {
err := m.channels.NewVoucherResult(chid, result)
Expand Down
14 changes: 7 additions & 7 deletions impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,21 +593,21 @@ type retrievalRevalidator struct {
finalVoucher datatransfer.VoucherResult
}

func (r *retrievalRevalidator) OnPullDataSent(chid datatransfer.ChannelID, additionalBytesSent uint64) (datatransfer.VoucherResult, error) {
func (r *retrievalRevalidator) OnPullDataSent(chid datatransfer.ChannelID, additionalBytesSent uint64) (bool, datatransfer.VoucherResult, error) {
r.dataSoFar += additionalBytesSent
if r.providerPausePoint < len(r.pausePoints) &&
r.dataSoFar >= r.pausePoints[r.providerPausePoint] {
r.providerPausePoint++
return testutil.NewFakeDTType(), datatransfer.ErrPause
return true, testutil.NewFakeDTType(), datatransfer.ErrPause
}
return nil, nil
return true, nil, nil
}

func (r *retrievalRevalidator) OnPushDataReceived(chid datatransfer.ChannelID, additionalBytesReceived uint64) (datatransfer.VoucherResult, error) {
return nil, nil
func (r *retrievalRevalidator) OnPushDataReceived(chid datatransfer.ChannelID, additionalBytesReceived uint64) (bool, datatransfer.VoucherResult, error) {
return false, nil, nil
}
func (r *retrievalRevalidator) OnComplete(chid datatransfer.ChannelID) (datatransfer.VoucherResult, error) {
return r.finalVoucher, datatransfer.ErrPause
func (r *retrievalRevalidator) OnComplete(chid datatransfer.ChannelID) (bool, datatransfer.VoucherResult, error) {
return true, r.finalVoucher, datatransfer.ErrPause
}

func TestSimulatedRetrievalFlow(t *testing.T) {
Expand Down
24 changes: 17 additions & 7 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,30 @@ type Revalidator interface {
// Revalidate revalidates a request with a new voucher
Revalidate(channelID ChannelID, voucher Voucher) (VoucherResult, error)
// OnPullDataSent is called on the responder side when more bytes are sent
// for a given pull request. It should return a VoucherResult + ErrPause to
// for a given pull request. The first value indicates whether the request was
// recognized by this revalidator and should be considered 'handled'. If true,
// the remaining two values are interpreted. If 'false' the request is passed on
// to the next revalidators.
// It should return a VoucherResult + ErrPause to
// request revalidation or nil to continue uninterrupted,
// other errors will terminate the request
OnPullDataSent(chid ChannelID, additionalBytesSent uint64) (VoucherResult, error)
// other errors will terminate the request.
OnPullDataSent(chid ChannelID, additionalBytesSent uint64) (bool, VoucherResult, error)
// OnPushDataReceived is called on the responder side when more bytes are received
// for a given push request. It should return a VoucherResult + ErrPause to
// for a given push request. The first value indicates whether the request was
// recognized by this revalidator and should be considered 'handled'. If true,
// the remaining two values are interpreted. If 'false' the request is passed on
// to the next revalidators. It should return a VoucherResult + ErrPause to
// request revalidation or nil to continue uninterrupted,
// other errors will terminate the request
OnPushDataReceived(chid ChannelID, additionalBytesReceived uint64) (VoucherResult, error)
OnPushDataReceived(chid ChannelID, additionalBytesReceived uint64) (bool, VoucherResult, error)
// OnComplete is called to make a final request for revalidation -- often for the
// purpose of settlement.
// purpose of settlement. The first value indicates whether the request was
// recognized by this revalidator and should be considered 'handled'. If true,
// the remaining two values are interpreted. If 'false' the request is passed on
// to the next revalidators.
// if VoucherResult is non nil, the request will enter a settlement phase awaiting
// a final update
OnComplete(chid ChannelID) (VoucherResult, error)
OnComplete(chid ChannelID) (bool, VoucherResult, error)
}

// TransportConfigurer provides a mechanism to provide transport specific configuration for a given voucher type
Expand Down
12 changes: 6 additions & 6 deletions testutil/stubbedvalidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,21 +165,21 @@ func NewStubbedRevalidator() *StubbedRevalidator {
}

// OnPullDataSent returns a stubbed result for checking when pull data is sent
func (srv *StubbedRevalidator) OnPullDataSent(chid datatransfer.ChannelID, additionalBytesSent uint64) (datatransfer.VoucherResult, error) {
func (srv *StubbedRevalidator) OnPullDataSent(chid datatransfer.ChannelID, additionalBytesSent uint64) (bool, datatransfer.VoucherResult, error) {
srv.didPullCheck = true
return srv.revalidationResult, srv.pullCheckError
return srv.expectPullCheck, srv.revalidationResult, srv.pullCheckError
}

// OnPushDataReceived returns a stubbed result for checking when push data is received
func (srv *StubbedRevalidator) OnPushDataReceived(chid datatransfer.ChannelID, additionalBytesReceived uint64) (datatransfer.VoucherResult, error) {
func (srv *StubbedRevalidator) OnPushDataReceived(chid datatransfer.ChannelID, additionalBytesReceived uint64) (bool, datatransfer.VoucherResult, error) {
srv.didPushCheck = true
return srv.revalidationResult, srv.pushCheckError
return srv.expectPushCheck, srv.revalidationResult, srv.pushCheckError
}

// OnComplete returns a stubbed result for checking when the requests completes
func (srv *StubbedRevalidator) OnComplete(chid datatransfer.ChannelID) (datatransfer.VoucherResult, error) {
func (srv *StubbedRevalidator) OnComplete(chid datatransfer.ChannelID) (bool, datatransfer.VoucherResult, error) {
srv.didComplete = true
return srv.revalidationResult, srv.completeError
return srv.expectComplete, srv.revalidationResult, srv.completeError
}

// Revalidate returns a stubbed result for revalidating a request
Expand Down

0 comments on commit 8a1e460

Please sign in to comment.