Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
hannahhoward committed Mar 25, 2022
1 parent 2adcb65 commit 078074c
Show file tree
Hide file tree
Showing 15 changed files with 488 additions and 386 deletions.
19 changes: 8 additions & 11 deletions channels/caches.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,11 @@ func (bic *blockIndexCache) updateIfGreater(evt datatransfer.EventCode, chid dat
}

type progressState struct {
dataLimitPresent bool
dataLimit uint64
progress *uint64
dataLimit uint64
progress *uint64
}

type readProgressFn func(datatransfer.ChannelID) (dataLimitPresent bool, dataLimit uint64, progress uint64, err error)
type readProgressFn func(datatransfer.ChannelID) (dataLimit uint64, progress uint64, err error)

type progressCache struct {
lk sync.RWMutex
Expand All @@ -82,7 +81,7 @@ func newProgressCache() *progressCache {
}
}

func (pc *progressCache) getValue(chid datatransfer.ChannelID, readFromOriginal readProgressFn) (progressState, error) {
func (pc *progressCache) getValue(chid datatransfer.ChannelID, readProgress readProgressFn) (progressState, error) {
pc.lk.RLock()
value, ok := pc.values[chid]
pc.lk.RUnlock()
Expand All @@ -95,14 +94,13 @@ func (pc *progressCache) getValue(chid datatransfer.ChannelID, readFromOriginal
if ok {
return value, nil
}
dataLimitPresent, dataLimit, progress, err := readFromOriginal(chid)
dataLimit, progress, err := readProgress(chid)
if err != nil {
return progressState{}, err
}
newValue := progressState{
dataLimit: dataLimit,
dataLimitPresent: dataLimitPresent,
progress: &progress,
dataLimit: dataLimit,
progress: &progress,
}
pc.values[chid] = newValue
return newValue, nil
Expand All @@ -114,7 +112,7 @@ func (pc *progressCache) progress(chid datatransfer.ChannelID, additionalData ui
return false, err
}
total := atomic.AddUint64(state.progress, additionalData)
return state.dataLimitPresent && total >= state.dataLimit, nil
return state.dataLimit != 0 && total >= state.dataLimit, nil
}

func (pc *progressCache) setDataLimit(chid datatransfer.ChannelID, newLimit uint64) {
Expand All @@ -130,7 +128,6 @@ func (pc *progressCache) setDataLimit(chid datatransfer.ChannelID, newLimit uint
if !ok {
return
}
value.dataLimitPresent = true
value.dataLimit = newLimit
pc.values[chid] = value
}
143 changes: 46 additions & 97 deletions channels/channel_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,87 +8,46 @@ import (
"github.com/ipld/go-ipld-prime/codec/dagcbor"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
peer "github.com/libp2p/go-libp2p-core/peer"
cbg "github.com/whyrusleeping/cbor-gen"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-data-transfer/channels/internal"
)

// channelState is immutable channel data plus mutable state
type channelState struct {
// peerId of the manager peer
selfPeer peer.ID
// an identifier for this channel shared by request and responder, set by requester through protocol
transferID datatransfer.TransferID
// base CID for the piece being transferred
baseCid cid.Cid
// portion of Piece to return, specified by an IPLD selector
selector *cbg.Deferred
// the party that is sending the data (not who initiated the request)
sender peer.ID
// the party that is receiving the data (not who initiated the request)
recipient peer.ID
// expected amount of data to be transferred
totalSize uint64
// current status of this deal
status datatransfer.Status
// isPull indicates if this is a push or pull request
isPull bool
// total bytes read from this node and queued for sending (0 if receiver)
queued uint64
// total bytes sent from this node (0 if receiver)
sent uint64
// total bytes received by this node (0 if sender)
received uint64
// number of blocks that have been received, including blocks that are
// present in more than one place in the DAG
receivedBlocksTotal int64
// Number of blocks that have been queued, including blocks that are
// present in more than one place in the DAG
queuedBlocksTotal int64
// Number of blocks that have been sent, including blocks that are
// present in more than one place in the DAG
sentBlocksTotal int64
// more informative status on a channel
message string
// additional vouchers
vouchers []internal.EncodedVoucher
ic internal.ChannelState

// additional voucherResults
voucherResults []internal.EncodedVoucherResult
voucherResultDecoder DecoderByTypeFunc
voucherDecoder DecoderByTypeFunc

// stages tracks the timeline of events related to a data transfer, for
// traceability purposes.
stages *datatransfer.ChannelStages
}

// EmptyChannelState is the zero value for channel state, meaning not present
var EmptyChannelState = channelState{}

// Status is the current status of this channel
func (c channelState) Status() datatransfer.Status { return c.status }
func (c channelState) Status() datatransfer.Status { return c.ic.Status }

// Received returns the number of bytes received
func (c channelState) Queued() uint64 { return c.queued }
func (c channelState) Queued() uint64 { return c.ic.Queued }

// Sent returns the number of bytes sent
func (c channelState) Sent() uint64 { return c.sent }
func (c channelState) Sent() uint64 { return c.ic.Sent }

// Received returns the number of bytes received
func (c channelState) Received() uint64 { return c.received }
func (c channelState) Received() uint64 { return c.ic.Received }

// TransferID returns the transfer id for this channel
func (c channelState) TransferID() datatransfer.TransferID { return c.transferID }
func (c channelState) TransferID() datatransfer.TransferID { return c.ic.TransferID }

// BaseCID returns the CID that is at the root of this data transfer
func (c channelState) BaseCID() cid.Cid { return c.baseCid }
func (c channelState) BaseCID() cid.Cid { return c.ic.BaseCid }

// Selector returns the IPLD selector for this data transfer (represented as
// an IPLD node)
func (c channelState) Selector() ipld.Node {
builder := basicnode.Prototype.Any.NewBuilder()
reader := bytes.NewReader(c.selector.Raw)
reader := bytes.NewReader(c.ic.Selector.Raw)
err := dagcbor.Decode(builder, reader)
if err != nil {
log.Error(err)
Expand All @@ -98,60 +57,60 @@ func (c channelState) Selector() ipld.Node {

// Voucher returns the voucher for this data transfer
func (c channelState) Voucher() datatransfer.Voucher {
if len(c.vouchers) == 0 {
if len(c.ic.Vouchers) == 0 {
return nil
}
decoder, _ := c.voucherDecoder(c.vouchers[0].Type)
encodable, _ := decoder.DecodeFromCbor(c.vouchers[0].Voucher.Raw)
decoder, _ := c.voucherDecoder(c.ic.Vouchers[0].Type)
encodable, _ := decoder.DecodeFromCbor(c.ic.Vouchers[0].Voucher.Raw)
return encodable.(datatransfer.Voucher)
}

// ReceivedCidsTotal returns the number of (non-unique) cids received so far
// on the channel - note that a block can exist in more than one place in the DAG
func (c channelState) ReceivedCidsTotal() int64 {
return c.receivedBlocksTotal
return c.ic.ReceivedBlocksTotal
}

// QueuedCidsTotal returns the number of (non-unique) cids queued so far
// on the channel - note that a block can exist in more than one place in the DAG
func (c channelState) QueuedCidsTotal() int64 {
return c.queuedBlocksTotal
return c.ic.QueuedBlocksTotal
}

// SentCidsTotal returns the number of (non-unique) cids sent so far
// on the channel - note that a block can exist in more than one place in the DAG
func (c channelState) SentCidsTotal() int64 {
return c.sentBlocksTotal
return c.ic.SentBlocksTotal
}

// Sender returns the peer id for the node that is sending data
func (c channelState) Sender() peer.ID { return c.sender }
func (c channelState) Sender() peer.ID { return c.ic.Sender }

// Recipient returns the peer id for the node that is receiving data
func (c channelState) Recipient() peer.ID { return c.recipient }
func (c channelState) Recipient() peer.ID { return c.ic.Recipient }

// TotalSize returns the total size for the data being transferred
func (c channelState) TotalSize() uint64 { return c.totalSize }
func (c channelState) TotalSize() uint64 { return c.ic.TotalSize }

// IsPull returns whether this is a pull request based on who initiated it
func (c channelState) IsPull() bool {
return c.isPull
return c.ic.Initiator == c.ic.Recipient
}

func (c channelState) ChannelID() datatransfer.ChannelID {
if c.isPull {
return datatransfer.ChannelID{ID: c.transferID, Initiator: c.recipient, Responder: c.sender}
if c.ic.Initiator == c.ic.Recipient {
return datatransfer.ChannelID{ID: c.ic.TransferID, Initiator: c.ic.Recipient, Responder: c.ic.Sender}
}
return datatransfer.ChannelID{ID: c.transferID, Initiator: c.sender, Responder: c.recipient}
return datatransfer.ChannelID{ID: c.ic.TransferID, Initiator: c.ic.Sender, Responder: c.ic.Recipient}
}

func (c channelState) Message() string {
return c.message
return c.ic.Message
}

func (c channelState) Vouchers() []datatransfer.Voucher {
vouchers := make([]datatransfer.Voucher, 0, len(c.vouchers))
for _, encoded := range c.vouchers {
vouchers := make([]datatransfer.Voucher, 0, len(c.ic.Vouchers))
for _, encoded := range c.ic.Vouchers {
decoder, _ := c.voucherDecoder(encoded.Type)
encodable, _ := decoder.DecodeFromCbor(encoded.Voucher.Raw)
vouchers = append(vouchers, encodable.(datatransfer.Voucher))
Expand All @@ -160,20 +119,20 @@ func (c channelState) Vouchers() []datatransfer.Voucher {
}

func (c channelState) LastVoucher() datatransfer.Voucher {
decoder, _ := c.voucherDecoder(c.vouchers[len(c.vouchers)-1].Type)
encodable, _ := decoder.DecodeFromCbor(c.vouchers[len(c.vouchers)-1].Voucher.Raw)
decoder, _ := c.voucherDecoder(c.ic.Vouchers[len(c.ic.Vouchers)-1].Type)
encodable, _ := decoder.DecodeFromCbor(c.ic.Vouchers[len(c.ic.Vouchers)-1].Voucher.Raw)
return encodable.(datatransfer.Voucher)
}

func (c channelState) LastVoucherResult() datatransfer.VoucherResult {
decoder, _ := c.voucherResultDecoder(c.voucherResults[len(c.voucherResults)-1].Type)
encodable, _ := decoder.DecodeFromCbor(c.voucherResults[len(c.voucherResults)-1].VoucherResult.Raw)
decoder, _ := c.voucherResultDecoder(c.ic.VoucherResults[len(c.ic.VoucherResults)-1].Type)
encodable, _ := decoder.DecodeFromCbor(c.ic.VoucherResults[len(c.ic.VoucherResults)-1].VoucherResult.Raw)
return encodable.(datatransfer.VoucherResult)
}

func (c channelState) VoucherResults() []datatransfer.VoucherResult {
voucherResults := make([]datatransfer.VoucherResult, 0, len(c.voucherResults))
for _, encoded := range c.voucherResults {
voucherResults := make([]datatransfer.VoucherResult, 0, len(c.ic.VoucherResults))
for _, encoded := range c.ic.VoucherResults {
decoder, _ := c.voucherResultDecoder(encoded.Type)
encodable, _ := decoder.DecodeFromCbor(encoded.VoucherResult.Raw)
voucherResults = append(voucherResults, encodable.(datatransfer.VoucherResult))
Expand All @@ -182,14 +141,22 @@ func (c channelState) VoucherResults() []datatransfer.VoucherResult {
}

func (c channelState) SelfPeer() peer.ID {
return c.selfPeer
return c.ic.SelfPeer
}

func (c channelState) OtherPeer() peer.ID {
if c.sender == c.selfPeer {
return c.recipient
if c.ic.Sender == c.ic.SelfPeer {
return c.ic.Recipient
}
return c.sender
return c.ic.Sender
}

func (c channelState) DataLimit() uint64 {
return c.ic.DataLimit
}

func (c channelState) RevalidateToComplete() bool {
return c.ic.RevalidateToComplete
}

// Stages returns the current ChannelStages object, or an empty object.
Expand All @@ -198,38 +165,20 @@ func (c channelState) OtherPeer() peer.ID {
//
// EXPERIMENTAL; subject to change.
func (c channelState) Stages() *datatransfer.ChannelStages {
if c.stages == nil {
if c.ic.Stages == nil {
// return an empty placeholder; it will be discarded because the caller
// is not supposed to mutate the value anyway.
return &datatransfer.ChannelStages{}
}

return c.stages
return c.ic.Stages
}

func fromInternalChannelState(c internal.ChannelState, voucherDecoder DecoderByTypeFunc, voucherResultDecoder DecoderByTypeFunc) datatransfer.ChannelState {
return channelState{
selfPeer: c.SelfPeer,
isPull: c.Initiator == c.Recipient,
transferID: c.TransferID,
baseCid: c.BaseCid,
selector: c.Selector,
sender: c.Sender,
recipient: c.Recipient,
totalSize: c.TotalSize,
status: c.Status,
queued: c.Queued,
sent: c.Sent,
received: c.Received,
receivedBlocksTotal: c.ReceivedBlocksTotal,
queuedBlocksTotal: c.QueuedBlocksTotal,
sentBlocksTotal: c.SentBlocksTotal,
message: c.Message,
vouchers: c.Vouchers,
voucherResults: c.VoucherResults,
ic: c,
voucherResultDecoder: voucherResultDecoder,
voucherDecoder: voucherDecoder,
stages: c.Stages,
}
}

Expand Down
Loading

0 comments on commit 078074c

Please sign in to comment.