Skip to content

Commit

Permalink
test: check total blocks sent when theres a restart
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Jan 28, 2021
1 parent 2caf7c1 commit a977980
Show file tree
Hide file tree
Showing 7 changed files with 303 additions and 48 deletions.
92 changes: 84 additions & 8 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ import (
"errors"
"time"

"github.com/ipfs/go-datastore/namespace"

"github.com/filecoin-project/go-data-transfer/cidsets"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipld/go-ipld-prime"
Expand Down Expand Up @@ -53,6 +57,7 @@ type Channels struct {
stateMachines fsm.Group
migrateStateMachines func(context.Context) error
cidLists cidlists.CIDLists
seenCIDs *cidsets.CIDSetManager
}

// ChannelEnvironment -- just a proxy for DTNetwork for now
Expand All @@ -71,8 +76,11 @@ func New(ds datastore.Batching,
voucherResultDecoder DecoderByTypeFunc,
env ChannelEnvironment,
selfPeer peer.ID) (*Channels, error) {

seenCIDsDS := namespace.Wrap(ds, datastore.NewKey("seencids"))
c := &Channels{
cidLists: cidLists,
seenCIDs: cidsets.NewCIDSetManager(seenCIDsDS),
notifier: notifier,
voucherDecoder: voucherDecoder,
voucherResultDecoder: voucherResultDecoder,
Expand Down Expand Up @@ -117,6 +125,19 @@ func (c *Channels) dispatch(eventName fsm.EventName, channel fsm.StateType) {
}

c.notifier(evt, fromInternalChannelState(realChannel, c.voucherDecoder, c.voucherResultDecoder, c.cidLists.ReadList))

// When the channel has been cleaned up, remove the caches of seen cids
if evt.Code == datatransfer.CleanupComplete {
chid := datatransfer.ChannelID{
Initiator: realChannel.Initiator,
Responder: realChannel.Responder,
ID: realChannel.TransferID,
}
err := c.removeSeenCIDCaches(chid)
if err != nil {
log.Errorf("failed to clean up channel %s: %s", err)
}
}
}

// CreateNew creates a new channel id and channel state and saves to channels.
Expand Down Expand Up @@ -206,20 +227,33 @@ func (c *Channels) CompleteCleanupOnRestart(chid datatransfer.ChannelID) error {
return c.send(chid, datatransfer.CompleteCleanupOnRestart)
}

func (c *Channels) DataSent(chid datatransfer.ChannelID, cid cid.Cid, delta uint64) error {
return c.send(chid, datatransfer.DataSent, delta)
func (c *Channels) DataSent(chid datatransfer.ChannelID, k cid.Cid, delta uint64) error {
if err := c.onProgress(chid, datatransfer.DataSentProgress, k, delta); err != nil {
return err
}

return c.send(chid, datatransfer.DataSent)
}

func (c *Channels) DataQueued(chid datatransfer.ChannelID, cid cid.Cid, delta uint64) error {
return c.send(chid, datatransfer.DataQueued, delta)
func (c *Channels) DataQueued(chid datatransfer.ChannelID, k cid.Cid, delta uint64) error {
if err := c.onProgress(chid, datatransfer.DataQueuedProgress, k, delta); err != nil {
return err
}

return c.send(chid, datatransfer.DataQueued)
}

func (c *Channels) DataReceived(chid datatransfer.ChannelID, cid cid.Cid, delta uint64) error {
err := c.cidLists.AppendList(chid, cid)
func (c *Channels) DataReceived(chid datatransfer.ChannelID, k cid.Cid, delta uint64) error {
err := c.cidLists.AppendList(chid, k)
if err != nil {
return err
}
return c.send(chid, datatransfer.DataReceived, delta)

if err := c.onProgress(chid, datatransfer.DataReceivedProgress, k, delta); err != nil {
return err
}

return c.send(chid, datatransfer.DataReceived)
}

// PauseInitiator pauses the initator of this channel
Expand Down Expand Up @@ -304,7 +338,49 @@ func (c *Channels) HasChannel(chid datatransfer.ChannelID) (bool, error) {
return c.stateMachines.Has(chid)
}

func (c *Channels) removeSeenCIDCaches(chid datatransfer.ChannelID) error {
progressStates := []datatransfer.EventCode{
datatransfer.DataQueuedProgress,
datatransfer.DataSentProgress,
datatransfer.DataReceivedProgress,
}
for _, evt := range progressStates {
sid := cidsets.SetID(chid.String() + "/" + datatransfer.Events[evt])
err := c.seenCIDs.DeleteSet(sid)
if err != nil {
return err
}
}
return nil
}

func (c *Channels) onProgress(chid datatransfer.ChannelID, evt datatransfer.EventCode, k cid.Cid, delta uint64) error {
err := c.checkChannelExists(chid, evt)
if err != nil {
return err
}

sid := cidsets.SetID(chid.String() + "/" + datatransfer.Events[evt])
seen, err := c.seenCIDs.InsertSetCID(sid, k)
if err != nil {
return err
}
if seen {
return nil
}

return c.stateMachines.Send(chid, evt, delta)
}

func (c *Channels) send(chid datatransfer.ChannelID, code datatransfer.EventCode, args ...interface{}) error {
err := c.checkChannelExists(chid, code)
if err != nil {
return err
}
return c.stateMachines.Send(chid, code, args...)
}

func (c *Channels) checkChannelExists(chid datatransfer.ChannelID, code datatransfer.EventCode) error {
has, err := c.stateMachines.Has(chid)
if err != nil {
return err
Expand All @@ -313,5 +389,5 @@ func (c *Channels) send(chid datatransfer.ChannelID, code datatransfer.EventCode
return xerrors.Errorf("cannot send FSM event %s to data-transfer channel %s: %w",
datatransfer.Events[code], chid, NewErrNotFound(chid))
}
return c.stateMachines.Send(chid, code, args...)
return nil
}
61 changes: 28 additions & 33 deletions channels/channels_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@ import (

var log = logging.Logger("data-transfer")

var transferringStates = []fsm.StateKey{
datatransfer.Requested,
datatransfer.Ongoing,
datatransfer.InitiatorPaused,
datatransfer.ResponderPaused,
datatransfer.BothPaused,
datatransfer.ResponderCompleted,
datatransfer.ResponderFinalizing,
}

// ChannelEvents describe the events taht can
var ChannelEvents = fsm.Events{
fsm.Event(datatransfer.Open).FromAny().To(datatransfer.Requested),
Expand All @@ -23,40 +33,25 @@ var ChannelEvents = fsm.Events{

fsm.Event(datatransfer.Cancel).FromAny().To(datatransfer.Cancelling),

fsm.Event(datatransfer.DataReceived).FromMany(
datatransfer.Requested,
datatransfer.Ongoing,
datatransfer.InitiatorPaused,
datatransfer.ResponderPaused,
datatransfer.BothPaused,
datatransfer.ResponderCompleted,
datatransfer.ResponderFinalizing).ToNoChange().Action(func(chst *internal.ChannelState, delta uint64) error {
chst.Received += delta
return nil
}),
fsm.Event(datatransfer.DataReceived).FromMany(transferringStates...).ToNoChange(),
fsm.Event(datatransfer.DataReceivedProgress).FromMany(transferringStates...).ToNoChange().
Action(func(chst *internal.ChannelState, delta uint64) error {
chst.Received += delta
return nil
}),

fsm.Event(datatransfer.DataSent).FromMany(
datatransfer.Requested,
datatransfer.Ongoing,
datatransfer.InitiatorPaused,
datatransfer.ResponderPaused,
datatransfer.BothPaused,
datatransfer.ResponderCompleted,
datatransfer.ResponderFinalizing).ToNoChange().Action(func(chst *internal.ChannelState, delta uint64) error {
chst.Sent += delta
return nil
}),
fsm.Event(datatransfer.DataQueued).FromMany(
datatransfer.Requested,
datatransfer.Ongoing,
datatransfer.InitiatorPaused,
datatransfer.ResponderPaused,
datatransfer.BothPaused,
datatransfer.ResponderCompleted,
datatransfer.ResponderFinalizing).ToNoChange().Action(func(chst *internal.ChannelState, delta uint64) error {
chst.Queued += delta
return nil
}),
fsm.Event(datatransfer.DataSent).FromMany(transferringStates...).ToNoChange(),
fsm.Event(datatransfer.DataSentProgress).FromMany(transferringStates...).ToNoChange().
Action(func(chst *internal.ChannelState, delta uint64) error {
chst.Sent += delta
return nil
}),
fsm.Event(datatransfer.DataQueued).FromMany(transferringStates...).ToNoChange(),
fsm.Event(datatransfer.DataQueuedProgress).FromMany(transferringStates...).ToNoChange().
Action(func(chst *internal.ChannelState, delta uint64) error {
chst.Queued += delta
return nil
}),
fsm.Event(datatransfer.Disconnected).FromAny().ToNoChange().Action(func(chst *internal.ChannelState) error {
chst.Message = datatransfer.ErrDisconnected.Error()
return nil
Expand Down
91 changes: 91 additions & 0 deletions cidsets/cidsets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package cidsets

import (
"sync"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query"

"github.com/ipfs/go-cid"
)

type SetID string

type CIDSetManager struct {
ds datastore.Datastore
}

func NewCIDSetManager(ds datastore.Datastore) *CIDSetManager {
return &CIDSetManager{ds: ds}
}

func (mgr *CIDSetManager) InsertSetCID(sid SetID, c cid.Cid) (bool, error) {
return mgr.getSet(sid).Insert(c)
}

func (mgr *CIDSetManager) DeleteSet(sid SetID) error {
return mgr.getSet(sid).Truncate()
}

func (mgr *CIDSetManager) getSet(sid SetID) *cidSet {
// TODO: cache the wrapped DS instead of creating a new one every time
return NewCIDSet(mgr.getSetDS(sid))
}

func (mgr *CIDSetManager) getSetDS(sid SetID) datastore.Batching {
return namespace.Wrap(mgr.ds, datastore.NewKey("/"+string(sid)+"/cids/"))
}

type cidSet struct {
lk sync.Mutex
ds datastore.Batching
}

func NewCIDSet(ds datastore.Batching) *cidSet {
return &cidSet{ds: ds}
}

func (s *cidSet) Insert(c cid.Cid) (bool, error) {
s.lk.Lock()
defer s.lk.Unlock()

k := datastore.NewKey(c.String())
has, err := s.ds.Has(k)
if err != nil {
return false, err
}
if has {
return true, nil
}
return false, s.ds.Put(k, nil)
}

func (s *cidSet) Truncate() error {
s.lk.Lock()
defer s.lk.Unlock()

res, err := s.ds.Query(query.Query{KeysOnly: true})
if err != nil {
return err
}

entries, err := res.Rest()
if err != nil {
return err
}

batched, err := s.ds.Batch()
if err != nil {
return err
}

for _, entry := range entries {
err := batched.Delete(datastore.NewKey(entry.Key))
if err != nil {
return err
}
}

return batched.Commit()
}
47 changes: 47 additions & 0 deletions cidsets/cidsets_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package cidsets

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-data-transfer/testutil"
ds "github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
)

func TestCIDSetManager(t *testing.T) {
cid1 := testutil.GenerateCids(1)[0]

dstore := ds_sync.MutexWrap(ds.NewMapDatastore())
mgr := NewCIDSetManager(dstore)
setID1 := SetID("set1")
setID2 := SetID("set2")

exists, err := mgr.InsertSetCID(setID1, cid1)
require.NoError(t, err)
require.False(t, exists)

exists, err = mgr.InsertSetCID(setID1, cid1)
require.NoError(t, err)
require.True(t, exists)

exists, err = mgr.InsertSetCID(setID2, cid1)
require.NoError(t, err)
require.False(t, exists)

exists, err = mgr.InsertSetCID(setID2, cid1)
require.NoError(t, err)
require.True(t, exists)

err = mgr.DeleteSet(setID1)
require.NoError(t, err)

exists, err = mgr.InsertSetCID(setID1, cid1)
require.NoError(t, err)
require.False(t, exists)

exists, err = mgr.InsertSetCID(setID2, cid1)
require.NoError(t, err)
require.True(t, exists)
}
7 changes: 7 additions & 0 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ const (

// DataQueued is emmited is read and queued for sending to the remote peer
DataQueued

DataQueuedProgress
DataSentProgress
DataReceivedProgress
)

// Events are human readable names for data transfer events
Expand All @@ -97,6 +101,9 @@ var Events = map[EventCode]string{
Complete: "Complete",
CompleteCleanupOnRestart: "CompleteCleanupOnRestart",
DataQueued: "DataQueued",
DataQueuedProgress: "DataQueuedProgress",
DataSentProgress: "DataSentProgress",
DataReceivedProgress: "DataReceivedProgress",
}

// Event is a struct containing information about a data transfer event
Expand Down
Loading

0 comments on commit a977980

Please sign in to comment.