Skip to content

Commit

Permalink
fix: push channel monitor watch for accept and complete events
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Mar 12, 2021
1 parent 20d3d75 commit 733e6f5
Show file tree
Hide file tree
Showing 4 changed files with 345 additions and 58 deletions.
74 changes: 50 additions & 24 deletions impl/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,29 +93,9 @@ func ChannelRemoveTimeout(timeout time.Duration) DataTransferOption {

// PushChannelRestartConfig sets the configuration options for automatically
// restarting push channels
// - interval is the time over which minBytesSent must have been sent
// - checksPerInterval is the number of times to check per interval
// - minBytesSent is the minimum amount of data that must have been sent over
// the interval
// - restartBackoff is the time to wait before checking again for restarts
// - maxConsecutiveRestarts is the maximum number of restarts in a row to
// attempt where no data is transferred. When the limit is reached the
// channel is closed.
func PushChannelRestartConfig(
interval time.Duration,
checksPerInterval uint32,
minBytesSent uint64,
restartBackoff time.Duration,
maxConsecutiveRestarts uint32,
) DataTransferOption {
func PushChannelRestartConfig(cfg pushchannelmonitor.Config) DataTransferOption {
return func(m *manager) {
m.pushChannelMonitorCfg = &pushchannelmonitor.Config{
Interval: interval,
ChecksPerInterval: checksPerInterval,
MinBytesSent: minBytesSent,
RestartBackoff: restartBackoff,
MaxConsecutiveRestarts: maxConsecutiveRestarts,
}
m.pushChannelMonitorCfg = &cfg
}
}

Expand Down Expand Up @@ -318,30 +298,76 @@ func (m *manager) CloseDataTransferChannel(ctx context.Context, chid datatransfe
if err != nil {
return err
}

// Close the channel on the local transport
err = m.transport.CloseChannel(ctx, chid)
if err != nil {
log.Warnf("unable to close channel %s: %s", chid, err)
}

log.Infof("%s: sending close channel to %s for channel %s", m.peerID, chst.OtherPeer(), chid)
// Send a cancel message to the remote peer
log.Infof("%s: sending cancel channel to %s for channel %s", m.peerID, chst.OtherPeer(), chid)
err = m.dataTransferNetwork.SendMessage(ctx, chst.OtherPeer(), m.cancelMessage(chid))
if err != nil {
err = fmt.Errorf("Unable to send cancel message: %w", err)
err = fmt.Errorf("unable to send cancel message for channel %s to peer %s: %w",
chid, m.peerID, err)
_ = m.OnRequestDisconnected(ctx, chid)
log.Warn(err)
}

// Fire a cancel event
fsmerr := m.channels.Cancel(chid)
// If it wasn't possible to send a cancel message to the peer, return
// that error
if err != nil {
return err
}
// If it wasn't possible to fire a cancel event, return that error
if fsmerr != nil {
return xerrors.Errorf("unable to send cancel to channel FSM: %w", fsmerr)
}

return nil
}

// close an open channel and fire an error event
func (m *manager) CloseDataTransferChannelWithError(ctx context.Context, chid datatransfer.ChannelID, cherr error) error {
log.Infof("close channel %s with error %s", chid, cherr)

chst, err := m.channels.GetByID(ctx, chid)
if err != nil {
return err
}

// Cancel the channel on the local transport
err = m.transport.CloseChannel(ctx, chid)
if err != nil {
log.Warnf("unable to close channel %s: %s", chid, err)
}

// Try to send a cancel message to the remote peer. It's quite likely
// we aren't able to send the message to the peer because the channel
// is already in an error state, which is probably because of connection
// issues, so if we cant send the message just log a warning.
log.Infof("%s: sending cancel channel to %s for channel %s", m.peerID, chst.OtherPeer(), chid)
err = m.dataTransferNetwork.SendMessage(ctx, chst.OtherPeer(), m.cancelMessage(chid))
if err != nil {
// Just log a warning here because it's important that we fire the
// error event with the original error so that it doesn't get masked
// by subsequent errors.
log.Warnf("unable to send cancel message for channel %s to peer %s: %w",
chid, m.peerID, err)
}

// Fire an error event
err = m.channels.Error(chid, cherr)
if err != nil {
return xerrors.Errorf("unable to send error %s to channel FSM: %w", cherr, err)
}

return nil
}

// pause a running data transfer channel
func (m *manager) PauseDataTransferChannel(ctx context.Context, chid datatransfer.ChannelID) error {
log.Infof("pause channel %s", chid)
Expand Down
88 changes: 68 additions & 20 deletions impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
. "github.com/filecoin-project/go-data-transfer/impl"
"github.com/filecoin-project/go-data-transfer/message"
"github.com/filecoin-project/go-data-transfer/network"
"github.com/filecoin-project/go-data-transfer/pushchannelmonitor"
"github.com/filecoin-project/go-data-transfer/testutil"
tp "github.com/filecoin-project/go-data-transfer/transport/graphsync"
"github.com/filecoin-project/go-data-transfer/transport/graphsync/extension"
Expand Down Expand Up @@ -526,33 +527,37 @@ func (dc *disconnectCoordinator) onDisconnect() {
// TestPushRequestAutoRestart tests that if the connection for a push request
// goes down, it will automatically restart (given the right config options)
func TestPushRequestAutoRestart(t *testing.T) {
//logging.SetLogLevel("dt-pushchanmon", "debug")

testCases := []struct {
name string
expectInitiatorDTFail bool
disconnectOnRequestComplete bool
registerResponder func(responder datatransfer.Manager, dc *disconnectCoordinator)
}{{
// Test what happens when the disconnect happens right when the
// responder receives the open channel request (ie the responder
// doesn't get a chance to respond to the open channel request)
name: "when responder receives incoming request",
// Verify that the client fires an error event when the disconnect
// occurs right when the responder receives the open channel request
// (ie the responder doesn't get a chance to respond to the open
// channel request)
name: "when responder receives incoming request",
expectInitiatorDTFail: true,
registerResponder: func(responder datatransfer.Manager, dc *disconnectCoordinator) {
subscriber := func(event datatransfer.Event, channelState datatransfer.ChannelState) {
t.Logf("%s: %s\n", datatransfer.Events[event.Code], datatransfer.Statuses[channelState.Status()])

if event.Code == datatransfer.Open {
dc.signalReadyForDisconnect(true)
}
}
responder.SubscribeToEvents(subscriber)
},
}, {
// Test what happens when the disconnect happens right after the
// responder receives the first block
// Verify that if a disconnect happens right after the responder
// receives the first block, the transfer will complete automatically
// when the link comes back up
name: "when responder receives first block",
registerResponder: func(responder datatransfer.Manager, dc *disconnectCoordinator) {
rcvdCount := 0
subscriber := func(event datatransfer.Event, channelState datatransfer.ChannelState) {
t.Logf("%s: %s\n", datatransfer.Events[event.Code], datatransfer.Statuses[channelState.Status()])
//t.Logf("resp: %s / %s\n", datatransfer.Events[event.Code], datatransfer.Statuses[channelState.Status()])
if event.Code == datatransfer.DataReceived {
rcvdCount++
if rcvdCount == 1 {
Expand All @@ -563,11 +568,12 @@ func TestPushRequestAutoRestart(t *testing.T) {
responder.SubscribeToEvents(subscriber)
},
}, {
// Test what happens when the disconnect happens right before the
// requester sends the complete message (ie all blocks have been
// received but the responder doesn't get a chance to tell
// Verify that the client fires an error event when disconnect occurs
// right before the responder sends the complete message (ie all blocks
// have been received but the responder doesn't get a chance to tell
// the initiator before the disconnect)
name: "before requester sends complete message",
expectInitiatorDTFail: true,
disconnectOnRequestComplete: true,
}}
for _, tc := range testCases {
Expand All @@ -579,8 +585,8 @@ func TestPushRequestAutoRestart(t *testing.T) {
// Create an object to coordinate disconnect events
dc := newDisconnectCoordinator()

// If the test should disconnect before the request is complete,
// add a hook to do so
// If the test should disconnect just before the responder sends
// the Complete message, add a hook to do so
var responderTransportOpts []tp.Option
if tc.disconnectOnRequestComplete {
responderTransportOpts = []tp.Option{
Expand All @@ -599,14 +605,27 @@ func TestPushRequestAutoRestart(t *testing.T) {
initiatorGSTspt := gsData.SetupGSTransportHost1()
responderGSTspt := gsData.SetupGSTransportHost2(responderTransportOpts...)

restartConf := PushChannelRestartConfig(100*time.Millisecond, 1, 10, 200*time.Millisecond, 5)
restartConf := PushChannelRestartConfig(pushchannelmonitor.Config{
AcceptTimeout: 100 * time.Millisecond,
Interval: 100 * time.Millisecond,
MinBytesSent: 1,
ChecksPerInterval: 10,
RestartBackoff: 200 * time.Millisecond,
MaxConsecutiveRestarts: 5,
CompleteTimeout: 100 * time.Millisecond,
})
initiator, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, initiatorGSTspt, gsData.StoredCounter1, restartConf)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, initiator)
responder, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, responderGSTspt, gsData.StoredCounter2)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, responder)

//initiator.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) {
// t.Logf("clnt: evt %s / status %s", datatransfer.Events[event.Code], datatransfer.Statuses[channelState.Status()])
//})

// Watch for successful completion
finished := make(chan struct{}, 2)
var subscriber datatransfer.Subscriber = func(event datatransfer.Event, channelState datatransfer.ChannelState) {
if channelState.Status() == datatransfer.Completed {
Expand All @@ -633,6 +652,16 @@ func TestPushRequestAutoRestart(t *testing.T) {
tc.registerResponder(responder, dc)
}

// If the initiator is expected to fail, watch for the Failed event
initiatorFailed := make(chan struct{})
if tc.expectInitiatorDTFail {
initiator.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) {
if channelState.Status() == datatransfer.Failed {
close(initiatorFailed)
}
})
}

// Open a push channel
chid, err := initiator.OpenPushDataChannel(ctx, host2.ID(), &voucher, rootCid, gsData.AllSelector)
require.NoError(t, err)
Expand All @@ -655,14 +684,25 @@ func TestPushRequestAutoRestart(t *testing.T) {
t.Logf("Sleep for a second")
time.Sleep(1 * time.Second)

// Restore connection
t.Logf("Restore connection")
// Restore link
t.Logf("Restore link")
require.NoError(t, gsData.Mn.LinkAll())
time.Sleep(200 * time.Millisecond)
conn, err := gsData.Mn.ConnectPeers(host1.ID(), host2.ID())
require.NoError(t, err)
require.NotNil(t, conn)

// If we're expecting a Failed event, verify that it occurs
if tc.expectInitiatorDTFail {
select {
case <-ctx.Done():
t.Fatal("Initiator data-transfer did not fail as expected")
return
case <-initiatorFailed:
t.Logf("Initiator data-transfer failed as expected")
return
}
}

// We're not expecting a failure event, wait for the transfer to
// complete
t.Logf("Waiting for auto-restart on push channel %s", chid)

(func() {
Expand Down Expand Up @@ -1569,3 +1609,11 @@ func (r *receiver) ReceiveRestartExistingChannelRequest(ctx context.Context,
incoming datatransfer.Request) {

}

//func SetDTLogLevelDebug() {
// _ = logging.SetLogLevel("dt-impl", "debug")
// _ = logging.SetLogLevel("dt-pushchanmon", "debug")
// _ = logging.SetLogLevel("dt_graphsync", "debug")
// _ = logging.SetLogLevel("data_transfer", "debug")
// _ = logging.SetLogLevel("data_transfer_network", "debug")
//}
Loading

0 comments on commit 733e6f5

Please sign in to comment.