Skip to content

Commit

Permalink
Use do-not-send-first-blocks extension for restarts (#257)
Browse files Browse the repository at this point in the history
* feat: do-not-send-first-blocks

* refactor: move received blocks total onto channel state

* fix: record stream protocol on stream open
  • Loading branch information
dirkmc authored Oct 4, 2021
1 parent b06ea85 commit 717d0bf
Show file tree
Hide file tree
Showing 33 changed files with 757 additions and 285 deletions.
2 changes: 1 addition & 1 deletion benchmarks/testinstance/testinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func NewInstance(ctx context.Context, net tn.Network, tempDir string, diskBasedD

linkSystem := storeutil.LinkSystemForBlockstore(bstore)
gs := gsimpl.New(ctx, gsNet, linkSystem, gsimpl.RejectAllRequestsByDefault())
transport := gstransport.NewTransport(p, gs)
transport := gstransport.NewTransport(p, gs, dtNet)
dt, err := dtimpl.NewDataTransfer(namespace.Wrap(dstore, datastore.NewKey("/data-transfers/transfers")), os.TempDir(), dtNet, transport)
if err != nil {
return Instance{}, err
Expand Down
6 changes: 6 additions & 0 deletions channelmonitor/channelmonitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,8 @@ type mockChannelState struct {
complete bool
}

var _ datatransfer.ChannelState = (*mockChannelState)(nil)

func (m *mockChannelState) Queued() uint64 {
return m.queued
}
Expand Down Expand Up @@ -615,3 +617,7 @@ func (m *mockChannelState) ReceivedCids() []cid.Cid {
func (m *mockChannelState) ReceivedCidsLen() int {
panic("implement me")
}

func (m *mockChannelState) ReceivedCidsTotal() int64 {
panic("implement me")
}
12 changes: 11 additions & 1 deletion channels/channel_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type channelState struct {
sent uint64
// total bytes received by this node (0 if sender)
received uint64
// number of blocks that have been received, including blocks that are
// present in more than one place in the DAG
receivedBlocksTotal int64
// more informative status on a channel
message string
// additional vouchers
Expand Down Expand Up @@ -107,7 +110,7 @@ func (c channelState) ReceivedCids() []cid.Cid {
return receivedCids
}

// ReceivedCids returns the number of cids received so far on this channel
// ReceivedCids returns the number of unique cids received so far on this channel
func (c channelState) ReceivedCidsLen() int {
len, err := c.receivedCids.Len(c.ChannelID())
if err != nil {
Expand All @@ -116,6 +119,12 @@ func (c channelState) ReceivedCidsLen() int {
return len
}

// ReceivedCidsTotal returns the number of (non-unique) cids received so far
// on the channel - note that a block can exist in more than one place in the DAG
func (c channelState) ReceivedCidsTotal() int64 {
return c.receivedBlocksTotal
}

// Sender returns the peer id for the node that is sending data
func (c channelState) Sender() peer.ID { return c.sender }

Expand Down Expand Up @@ -213,6 +222,7 @@ func fromInternalChannelState(c internal.ChannelState, voucherDecoder DecoderByT
queued: c.Queued,
sent: c.Sent,
received: c.Received,
receivedBlocksTotal: c.ReceivedBlocksTotal,
message: c.Message,
vouchers: c.Vouchers,
voucherResults: c.VoucherResults,
Expand Down
28 changes: 23 additions & 5 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,27 @@ func (c *Channels) DataQueued(chid datatransfer.ChannelID, k cid.Cid, delta uint
}

// Returns true if this is the first time the block has been received
func (c *Channels) DataReceived(chid datatransfer.ChannelID, k cid.Cid, delta uint64) (bool, error) {
return c.fireProgressEvent(chid, datatransfer.DataReceived, datatransfer.DataReceivedProgress, k, delta)
func (c *Channels) DataReceived(chid datatransfer.ChannelID, k cid.Cid, delta uint64, index int64) (bool, error) {
if err := c.checkChannelExists(chid, datatransfer.DataReceived); err != nil {
return false, err
}

// Check if the block has already been seen
sid := seenCidsSetID(chid, datatransfer.DataReceived)
seen, err := c.seenCIDs.InsertSetCID(sid, k)
if err != nil {
return false, err
}

// If the block has not been seen before, fire the progress event
if !seen {
if err := c.stateMachines.Send(chid, datatransfer.DataReceivedProgress, delta); err != nil {
return false, err
}
}

// Fire the regular event
return !seen, c.stateMachines.Send(chid, datatransfer.DataReceived, index)
}

// PauseInitiator pauses the initator of this channel
Expand Down Expand Up @@ -358,6 +377,7 @@ func (c *Channels) HasChannel(chid datatransfer.ChannelID) (bool, error) {
// removeSeenCIDCaches cleans up the caches of "seen" blocks, ie
// blocks that have already been queued / sent / received
func (c *Channels) removeSeenCIDCaches(chid datatransfer.ChannelID) error {
// Clean up seen block caches
progressStates := []datatransfer.EventCode{
datatransfer.DataQueued,
datatransfer.DataSent,
Expand Down Expand Up @@ -432,9 +452,7 @@ func seenCidsSetID(chid datatransfer.ChannelID, evt datatransfer.EventCode) cids

// Convert from the internally used channel state format to the externally exposed ChannelState
func (c *Channels) fromInternalChannelState(ch internal.ChannelState) datatransfer.ChannelState {
rcr := &receivedCidsReader{
seenCIDs: c.seenCIDs,
}
rcr := &receivedCidsReader{seenCIDs: c.seenCIDs}
return fromInternalChannelState(ch, c.voucherDecoder, c.voucherResultDecoder, rcr)
}

Expand Down
12 changes: 8 additions & 4 deletions channels/channels_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,14 @@ var ChannelEvents = fsm.Events{
chst.AddLog("")
return nil
}),
fsm.Event(datatransfer.DataReceived).FromAny().ToNoChange().Action(func(chst *internal.ChannelState) error {
chst.AddLog("")
return nil
}),
fsm.Event(datatransfer.DataReceived).FromAny().ToNoChange().
Action(func(chst *internal.ChannelState, rcvdBlocksTotal int64) error {
if rcvdBlocksTotal > chst.ReceivedBlocksTotal {
chst.ReceivedBlocksTotal = rcvdBlocksTotal
}
chst.AddLog("")
return nil
}),
fsm.Event(datatransfer.DataReceivedProgress).FromMany(transferringStates...).ToNoChange().
Action(func(chst *internal.ChannelState, delta uint64) error {
chst.Received += delta
Expand Down
8 changes: 4 additions & 4 deletions channels/channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func TestChannels(t *testing.T) {
require.Equal(t, uint64(0), state.Sent())
require.Empty(t, state.ReceivedCids())

isNew, err := channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[0], 50)
isNew, err := channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[0], 50, 1)
require.NoError(t, err)
_ = checkEvent(ctx, t, received, datatransfer.DataReceivedProgress)
require.True(t, isNew)
Expand All @@ -207,15 +207,15 @@ func TestChannels(t *testing.T) {
require.Equal(t, []cid.Cid{cids[0]}, state.ReceivedCids())

// errors if channel does not exist
isNew, err = channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[1], Responder: peers[0], ID: tid1}, cids[1], 200)
isNew, err = channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[1], Responder: peers[0], ID: tid1}, cids[1], 200, 2)
require.True(t, xerrors.As(err, new(*channels.ErrNotFound)))
require.False(t, isNew)
isNew, err = channelList.DataSent(datatransfer.ChannelID{Initiator: peers[1], Responder: peers[0], ID: tid1}, cids[1], 200)
require.True(t, xerrors.As(err, new(*channels.ErrNotFound)))
require.Equal(t, []cid.Cid{cids[0]}, state.ReceivedCids())
require.False(t, isNew)

isNew, err = channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[1], 50)
isNew, err = channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[1], 50, 2)
require.NoError(t, err)
_ = checkEvent(ctx, t, received, datatransfer.DataReceivedProgress)
require.True(t, isNew)
Expand All @@ -232,7 +232,7 @@ func TestChannels(t *testing.T) {
require.Equal(t, uint64(100), state.Sent())
require.ElementsMatch(t, []cid.Cid{cids[0], cids[1]}, state.ReceivedCids())

isNew, err = channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[0], 50)
isNew, err = channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[0], 50, 3)
require.NoError(t, err)
require.False(t, isNew)
state = checkEvent(ctx, t, received, datatransfer.DataReceived)
Expand Down
3 changes: 3 additions & 0 deletions channels/internal/internalchannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ type ChannelState struct {
Message string
Vouchers []EncodedVoucher
VoucherResults []EncodedVoucherResult
// Number of blocks that have been received, including blocks that are
// present in more than one place in the DAG
ReceivedBlocksTotal int64

// Stages traces the execution fo a data transfer.
//
Expand Down
50 changes: 49 additions & 1 deletion channels/internal/internalchannel_cbor_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-ds-badger v0.2.6
github.com/ipfs/go-graphsync v0.9.1
github.com/ipfs/go-graphsync v0.10.0-rc2
github.com/ipfs/go-ipfs-blockstore v1.0.1
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
Expand All @@ -31,6 +31,7 @@ require (
github.com/libp2p/go-libp2p v0.13.0
github.com/libp2p/go-libp2p-core v0.8.5
github.com/libp2p/go-libp2p-record v0.1.1 // indirect
github.com/multiformats/go-multiaddr v0.3.1
github.com/stretchr/testify v1.6.1
github.com/whyrusleeping/cbor-gen v0.0.0-20210219115102-f37d292932f2
go.uber.org/atomic v1.6.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ github.com/ipfs/go-ds-badger v0.2.6/go.mod h1:02rnztVKA4aZwDuaRPTf8mpqcKmXP7mLl6
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-graphsync v0.9.1 h1:jo7ZaAZ3lal89RhKxKoRkPzIO8lmOY6KUWA1mDRZ2+U=
github.com/ipfs/go-graphsync v0.9.1/go.mod h1:J62ahWT9JbPsFL2UWsUM5rOu0lZJ0LOIH1chHdxGGcw=
github.com/ipfs/go-graphsync v0.10.0-rc2 h1:nS0IolMkkDnTrNZFSLolT1Kd+IaUSXU89jqce4aGq54=
github.com/ipfs/go-graphsync v0.10.0-rc2/go.mod h1:kQJlkg1aE9HfCwp577BgPy/UxlordJ7ScIBO2IHwPZU=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
github.com/ipfs/go-ipfs-blockstore v0.1.4/go.mod h1:Jxm3XMVjh6R17WvxFEiyKBLUGr86HgIYJW/D/MwqeYQ=
Expand Down
4 changes: 2 additions & 2 deletions impl/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func (m *manager) OnChannelOpened(chid datatransfer.ChannelID) error {
// It fires an event on the channel, updating the sum of received data and
// calls revalidators so they can pause / resume the channel or send a
// message over the transport.
func (m *manager) OnDataReceived(chid datatransfer.ChannelID, link ipld.Link, size uint64) error {
isNew, err := m.channels.DataReceived(chid, link.(cidlink.Link).Cid, size)
func (m *manager) OnDataReceived(chid datatransfer.ChannelID, link ipld.Link, size uint64, index int64) error {
isNew, err := m.channels.DataReceived(chid, link.(cidlink.Link).Cid, size, index)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions impl/initiating_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,8 @@ func TestDataTransferRestartInitiating(t *testing.T) {
testCids := testutil.GenerateCids(2)
ev, ok := h.dt.(datatransfer.EventsHandler)
require.True(t, ok)
require.NoError(t, ev.OnDataReceived(channelID, cidlink.Link{Cid: testCids[0]}, 12345))
require.NoError(t, ev.OnDataReceived(channelID, cidlink.Link{Cid: testCids[1]}, 12345))
require.NoError(t, ev.OnDataReceived(channelID, cidlink.Link{Cid: testCids[0]}, 12345, 1))
require.NoError(t, ev.OnDataReceived(channelID, cidlink.Link{Cid: testCids[1]}, 12345, 2))

// restart that pull channel
err = h.dt.RestartDataTransferChannel(ctx, channelID)
Expand All @@ -393,7 +393,7 @@ func TestDataTransferRestartInitiating(t *testing.T) {
require.Equal(t, openChannel.Selector, h.stor)
require.True(t, openChannel.Message.IsRequest())
// received cids should be a part of the channel req
require.ElementsMatch(t, []cid.Cid{testCids[0], testCids[1]}, openChannel.DoNotSendCids)
require.ElementsMatch(t, []cid.Cid{testCids[0], testCids[1]}, openChannel.Channel.ReceivedCids())

receivedRequest, ok := openChannel.Message.(datatransfer.Request)
require.True(t, ok)
Expand Down
Loading

0 comments on commit 717d0bf

Please sign in to comment.