Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Find out what's up with data transfers! #3162

Merged
merged 4 commits into from
Aug 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions api/api_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,9 @@ type FullNode interface {
ClientGenCar(ctx context.Context, ref FileRef, outpath string) error
// ClientDealSize calculates real deal data size
ClientDealSize(ctx context.Context, root cid.Cid) (DataSize, error)
// ClientListTransfers returns the status of all ongoing transfers of data
ClientListDataTransfers(ctx context.Context) ([]DataTransferChannel, error)
ClientDataTransferUpdates(ctx context.Context) (<-chan DataTransferChannel, error)

// ClientUnimport removes references to the specified file from filestore
//ClientUnimport(path string)
Expand Down
40 changes: 25 additions & 15 deletions api/apistruct/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,21 +127,23 @@ type FullNodeStruct struct {
WalletImport func(context.Context, *types.KeyInfo) (address.Address, error) `perm:"admin"`
WalletDelete func(context.Context, address.Address) error `perm:"write"`

ClientImport func(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) `perm:"admin"`
ClientListImports func(ctx context.Context) ([]api.Import, error) `perm:"write"`
ClientRemoveImport func(ctx context.Context, importID multistore.StoreID) error `perm:"admin"`
ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"`
ClientFindData func(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) `perm:"read"`
ClientMinerQueryOffer func(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (api.QueryOffer, error) `perm:"read"`
ClientStartDeal func(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) `perm:"admin"`
ClientGetDealInfo func(context.Context, cid.Cid) (*api.DealInfo, error) `perm:"read"`
ClientListDeals func(ctx context.Context) ([]api.DealInfo, error) `perm:"write"`
ClientRetrieve func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error `perm:"admin"`
ClientRetrieveWithEvents func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) `perm:"admin"`
ClientQueryAsk func(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) `perm:"read"`
ClientCalcCommP func(ctx context.Context, inpath string) (*api.CommPRet, error) `perm:"read"`
ClientGenCar func(ctx context.Context, ref api.FileRef, outpath string) error `perm:"write"`
ClientDealSize func(ctx context.Context, root cid.Cid) (api.DataSize, error) `perm:"read"`
ClientImport func(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) `perm:"admin"`
ClientListImports func(ctx context.Context) ([]api.Import, error) `perm:"write"`
ClientRemoveImport func(ctx context.Context, importID multistore.StoreID) error `perm:"admin"`
ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"`
ClientFindData func(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) `perm:"read"`
ClientMinerQueryOffer func(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (api.QueryOffer, error) `perm:"read"`
ClientStartDeal func(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) `perm:"admin"`
ClientGetDealInfo func(context.Context, cid.Cid) (*api.DealInfo, error) `perm:"read"`
ClientListDeals func(ctx context.Context) ([]api.DealInfo, error) `perm:"write"`
ClientRetrieve func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error `perm:"admin"`
ClientRetrieveWithEvents func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) `perm:"admin"`
ClientQueryAsk func(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) `perm:"read"`
ClientCalcCommP func(ctx context.Context, inpath string) (*api.CommPRet, error) `perm:"read"`
ClientGenCar func(ctx context.Context, ref api.FileRef, outpath string) error `perm:"write"`
ClientDealSize func(ctx context.Context, root cid.Cid) (api.DataSize, error) `perm:"read"`
ClientListDataTransfers func(ctx context.Context) ([]api.DataTransferChannel, error) `perm:"write"`
ClientDataTransferUpdates func(ctx context.Context) (<-chan api.DataTransferChannel, error) `perm:"write"`

StateNetworkName func(context.Context) (dtypes.NetworkName, error) `perm:"read"`
StateMinerSectors func(context.Context, address.Address, *abi.BitField, bool, types.TipSetKey) ([]*api.ChainSectorInfo, error) `perm:"read"`
Expand Down Expand Up @@ -449,6 +451,14 @@ func (c *FullNodeStruct) ClientDealSize(ctx context.Context, root cid.Cid) (api.
return c.Internal.ClientDealSize(ctx, root)
}

func (c *FullNodeStruct) ClientListDataTransfers(ctx context.Context) ([]api.DataTransferChannel, error) {
return c.Internal.ClientListDataTransfers(ctx)
}

func (c *FullNodeStruct) ClientDataTransferUpdates(ctx context.Context) (<-chan api.DataTransferChannel, error) {
return c.Internal.ClientDataTransferUpdates(ctx)
}

func (c *FullNodeStruct) GasEstimateGasPremium(ctx context.Context, nblocksincl uint64,
sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error) {
return c.Internal.GasEstimateGasPremium(ctx, nblocksincl, sender, gaslimit, tsk)
Expand Down
15 changes: 15 additions & 0 deletions api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package api

import (
"encoding/json"

"github.com/filecoin-project/go-address"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/ipfs/go-cid"

"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
Expand Down Expand Up @@ -98,3 +101,15 @@ func (ms *MessageSendSpec) Get() MessageSendSpec {

return *ms
}

type DataTransferChannel struct {
TransferID datatransfer.TransferID
Status datatransfer.Status
BaseCID cid.Cid
IsInitiator bool
IsSender bool
Voucher string
Message string
OtherPeer peer.ID
Transferred uint64
}
171 changes: 171 additions & 0 deletions cli/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ package cli
import (
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"strconv"
"text/tabwriter"
"time"

tm "github.com/buger/goterm"
"github.com/docker/go-units"
"github.com/fatih/color"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-cidutil/cidenc"
Expand Down Expand Up @@ -76,6 +79,7 @@ var clientCmd = &cli.Command{
WithCategory("util", clientCommPCmd),
WithCategory("util", clientCarGenCmd),
WithCategory("util", clientInfoCmd),
WithCategory("util", clientListTransfers),
},
}

Expand Down Expand Up @@ -1203,3 +1207,170 @@ var clientInfoCmd = &cli.Command{
return nil
},
}

var clientListTransfers = &cli.Command{
Name: "list-transfers",
Usage: "List ongoing data transfers for deals",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "color",
Usage: "use color in display output",
Value: true,
},
&cli.BoolFlag{
Name: "completed",
Usage: "show completed data transfers",
},
&cli.BoolFlag{
Name: "watch",
Usage: "watch deal updates in real-time, rather than a one time list",
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)

channels, err := api.ClientListDataTransfers(ctx)
if err != nil {
return err
}

completed := cctx.Bool("completed")
color := cctx.Bool("color")
watch := cctx.Bool("watch")

if watch {
channelUpdates, err := api.ClientDataTransferUpdates(ctx)
if err != nil {
return err
}

for {
tm.Clear() // Clear current screen

tm.MoveCursor(1, 1)

outputChannels(tm.Screen, channels, completed, color)

tm.Flush()

select {
case <-ctx.Done():
return nil
case channelUpdate := <-channelUpdates:
var found bool
for i, existing := range channels {
if existing.TransferID == channelUpdate.TransferID &&
existing.OtherPeer == channelUpdate.OtherPeer &&
existing.IsSender == channelUpdate.IsSender &&
existing.IsInitiator == channelUpdate.IsInitiator {
channels[i] = channelUpdate
found = true
break
}
}
if !found {
channels = append(channels, channelUpdate)
}
}
}
}
outputChannels(os.Stdout, channels, completed, color)
return nil
},
}

func outputChannels(out io.Writer, channels []api.DataTransferChannel, completed bool, color bool) {
sort.Slice(channels, func(i, j int) bool {
return channels[i].TransferID < channels[j].TransferID
})

var receivingChannels, sendingChannels []lapi.DataTransferChannel
for _, channel := range channels {
if !completed && channel.Status == datatransfer.Completed {
continue
}
if channel.IsSender {
sendingChannels = append(sendingChannels, channel)
} else {
receivingChannels = append(receivingChannels, channel)
}
}

fmt.Fprintf(out, "Sending Channels\n\n")
w := tablewriter.New(tablewriter.Col("ID"),
tablewriter.Col("Status"),
tablewriter.Col("Sending To"),
tablewriter.Col("Root Cid"),
tablewriter.Col("Initiated?"),
tablewriter.Col("Transferred"),
tablewriter.Col("Voucher"),
tablewriter.NewLineCol("Message"))
for _, channel := range sendingChannels {
w.Write(toChannelOutput(color, "Sending To", channel))
}
w.Flush(out)

fmt.Fprintf(out, "\nReceiving Channels\n\n")
w = tablewriter.New(tablewriter.Col("ID"),
tablewriter.Col("Status"),
tablewriter.Col("Receiving From"),
tablewriter.Col("Root Cid"),
tablewriter.Col("Initiated?"),
tablewriter.Col("Transferred"),
tablewriter.Col("Voucher"),
tablewriter.NewLineCol("Message"))
for _, channel := range receivingChannels {
w.Write(toChannelOutput(color, "Receiving From", channel))
}
w.Flush(out)
}

func channelStatusString(useColor bool, status datatransfer.Status) string {
s := datatransfer.Statuses[status]
if !useColor {
return s
}

switch status {
case datatransfer.Failed, datatransfer.Cancelled:
return color.RedString(s)
case datatransfer.Completed:
return color.GreenString(s)
default:
return s
}
}

func toChannelOutput(useColor bool, otherPartyColumn string, channel api.DataTransferChannel) map[string]interface{} {
rootCid := channel.BaseCID.String()
rootCid = "..." + rootCid[len(rootCid)-8:]

otherParty := channel.OtherPeer.String()
otherParty = "..." + otherParty[len(otherParty)-8:]

initiated := "N"
if channel.IsInitiator {
initiated = "Y"
}

voucher := channel.Voucher
if len(voucher) > 40 {
voucher = "..." + voucher[len(voucher)-37:]
}

return map[string]interface{}{
"ID": channel.TransferID,
"Status": channelStatusString(useColor, channel.Status),
otherPartyColumn: otherParty,
"Root Cid": rootCid,
"Initiated?": initiated,
"Transferred": channel.Transferred,
"Voucher": voucher,
"Message": channel.Message,
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/Gurpartap/async v0.0.0-20180927173644-4f7f499dd9ee
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d
github.com/buger/goterm v0.0.0-20200322175922-2f3e71b85129
github.com/coreos/go-systemd/v22 v22.0.0
github.com/detailyang/go-fallocate v0.0.0-20180908115635-432fa640bd2e
github.com/dgraph-io/badger/v2 v2.0.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVa
github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc=
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY=
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/buger/goterm v0.0.0-20200322175922-2f3e71b85129 h1:gfAMKE626QEuKG3si0pdTRcr/YEbBoxY+3GOH3gWvl4=
github.com/buger/goterm v0.0.0-20200322175922-2f3e71b85129/go.mod h1:u9UyCz2eTrSGy6fbupqJ54eY5c4IC8gREQ1053dK12U=
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
Expand Down
Loading