From f511cc188af48b9e97d6abafd4f218f3ec0140c4 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 18 Aug 2020 16:26:21 -0700 Subject: [PATCH 1/4] feat(cli): data transfer command add command to monitor data transfers --- api/api_full.go | 3 +- api/apistruct/struct.go | 5 ++ api/types.go | 15 ++++++ cli/client.go | 103 +++++++++++++++++++++++++++++++++++++ node/impl/client/client.go | 38 ++++++++++++++ 5 files changed, 163 insertions(+), 1 deletion(-) diff --git a/api/api_full.go b/api/api_full.go index 6feeb12cf0..3e26832418 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -256,7 +256,8 @@ type FullNode interface { ClientGenCar(ctx context.Context, ref FileRef, outpath string) error // ClientDealSize calculates real deal data size ClientDealSize(ctx context.Context, root cid.Cid) (DataSize, error) - + // ClientListTransfers returns the status of all ongoing transfers of data + ClientListDataTransfers(ctx context.Context) ([]DataTransferChannel, error) // ClientUnimport removes references to the specified file from filestore //ClientUnimport(path string) diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 8fb8e68edc..ffa28be908 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -142,6 +142,7 @@ type FullNodeStruct struct { ClientCalcCommP func(ctx context.Context, inpath string) (*api.CommPRet, error) `perm:"read"` ClientGenCar func(ctx context.Context, ref api.FileRef, outpath string) error `perm:"write"` ClientDealSize func(ctx context.Context, root cid.Cid) (api.DataSize, error) `perm:"read"` + ClientListDataTransfers func(ctx context.Context) ([]api.DataTransferChannel, error) `perm:"write"` StateNetworkName func(context.Context) (dtypes.NetworkName, error) `perm:"read"` StateMinerSectors func(context.Context, address.Address, *abi.BitField, bool, types.TipSetKey) ([]*api.ChainSectorInfo, error) `perm:"read"` @@ -449,6 +450,10 @@ func (c *FullNodeStruct) ClientDealSize(ctx context.Context, root cid.Cid) (api. return c.Internal.ClientDealSize(ctx, root) } +func (c *FullNodeStruct) ClientListDataTransfers(ctx context.Context) ([]api.DataTransferChannel, error) { + return c.Internal.ClientListDataTransfers(ctx) +} + func (c *FullNodeStruct) GasEstimateGasPremium(ctx context.Context, nblocksincl uint64, sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error) { return c.Internal.GasEstimateGasPremium(ctx, nblocksincl, sender, gaslimit, tsk) diff --git a/api/types.go b/api/types.go index 8a682db172..4b707dc9f2 100644 --- a/api/types.go +++ b/api/types.go @@ -2,10 +2,13 @@ package api import ( "encoding/json" + "github.com/filecoin-project/go-address" + datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/builtin/miner" + "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p-core/peer" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -98,3 +101,15 @@ func (ms *MessageSendSpec) Get() MessageSendSpec { return *ms } + +type DataTransferChannel struct { + TransferID datatransfer.TransferID + Status datatransfer.Status + BaseCID cid.Cid + IsInitiator bool + IsSender bool + VoucherJSON string + Message string + OtherPeer peer.ID + Transferred uint64 +} diff --git a/cli/client.go b/cli/client.go index 519d11e5a5..fe4ed8c1f0 100644 --- a/cli/client.go +++ b/cli/client.go @@ -12,6 +12,7 @@ import ( "github.com/docker/go-units" "github.com/fatih/color" + datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/ipfs/go-cid" "github.com/ipfs/go-cidutil/cidenc" @@ -76,6 +77,7 @@ var clientCmd = &cli.Command{ WithCategory("util", clientCommPCmd), WithCategory("util", clientCarGenCmd), WithCategory("util", clientInfoCmd), + WithCategory("util", clientListTransfers), }, } @@ -1203,3 +1205,104 @@ var clientInfoCmd = &cli.Command{ return nil }, } + +var clientListTransfers = &cli.Command{ + Name: "list-transfers", + Usage: "Monitor ongoing data transfers for deals", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "color", + Usage: "use color in display output", + Value: true, + }, + }, + Action: func(cctx *cli.Context) error { + api, closer, err := GetFullNodeAPI(cctx) + if err != nil { + return err + } + defer closer() + ctx := ReqContext(cctx) + + channels, err := api.ClientListDataTransfers(ctx) + if err != nil { + return err + } + + sort.Slice(channels, func(i, j int) bool { + return channels[i].TransferID < channels[j].TransferID + }) + + var receivingChannels, sendingChannels []lapi.DataTransferChannel + for _, channel := range channels { + if channel.IsSender { + sendingChannels = append(sendingChannels, channel) + } else { + receivingChannels = append(receivingChannels, channel) + } + } + + color := cctx.Bool("color") + + fmt.Fprintf(os.Stdout, "Sending Channels\n\n") + w := tablewriter.New(tablewriter.Col("ID"), + tablewriter.Col("Status"), + tablewriter.Col("Other Party"), + tablewriter.Col("Root Cid"), + tablewriter.Col("Initiated?"), + tablewriter.Col("Transferred"), + tablewriter.NewLineCol("Voucher"), + tablewriter.NewLineCol("Message")) + for _, channel := range sendingChannels { + w.Write(toChannelOutput(color, channel)) + } + w.Flush(os.Stdout) + + fmt.Fprintf(os.Stdout, "\nReceiving Channels\n\n") + for _, channel := range receivingChannels { + + w.Write(toChannelOutput(color, channel)) + } + return w.Flush(os.Stdout) + }, +} + +func channelStatusString(useColor bool, status datatransfer.Status) string { + s := datatransfer.Statuses[status] + if !useColor { + return s + } + + switch status { + case datatransfer.Failed, datatransfer.Cancelled: + return color.RedString(s) + case datatransfer.Completed: + return color.GreenString(s) + default: + return s + } +} + +func toChannelOutput(useColor bool, channel api.DataTransferChannel) map[string]interface{} { + rootCid := channel.BaseCID.String() + rootCid = "..." + rootCid[len(rootCid)-8:] + + otherParty := channel.OtherPeer.String() + otherParty = "..." + otherParty[len(otherParty)-8:] + + initiated := "N" + if channel.IsInitiator { + initiated = "Y" + } + + return map[string]interface{}{ + "ID": channel.TransferID, + "Status": channelStatusString(useColor, channel.Status), + "Other Party": otherParty, + "Root Cid": rootCid, + "Initiated?": initiated, + "Transferred": channel.Transferred, + "Voucher": channel.VoucherJSON, + "Message": channel.Message, + } +} diff --git a/node/impl/client/client.go b/node/impl/client/client.go index d9ece2d9e6..38696431b3 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -2,6 +2,7 @@ package client import ( "context" + "encoding/json" "fmt" "io" "os" @@ -24,6 +25,7 @@ import ( basicnode "github.com/ipld/go-ipld-prime/node/basic" "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/ipld/go-ipld-prime/traversal/selector/builder" + "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" mh "github.com/multiformats/go-multihash" "go.uber.org/fx" @@ -74,6 +76,8 @@ type API struct { CombinedBstore dtypes.ClientBlockstore // TODO: try to remove RetrievalStoreMgr dtypes.ClientRetrievalStoreManager + DataTransfer dtypes.ClientDataTransfer + Host host.Host } func calcDealExpiration(minDuration uint64, md *miner.DeadlineInfo, startEpoch abi.ChainEpoch) abi.ChainEpoch { @@ -755,3 +759,37 @@ func (a *API) clientImport(ctx context.Context, ref api.FileRef, store *multisto return nd.Cid(), nil } + +func (a *API) ClientListDataTransfers(ctx context.Context) ([]api.DataTransferChannel, error) { + inProgressChannels, err := a.DataTransfer.InProgressChannels(ctx) + if err != nil { + return nil, err + } + + apiChannels := make([]api.DataTransferChannel, 0, len(inProgressChannels)) + for channelID, channelState := range inProgressChannels { + channel := api.DataTransferChannel{ + TransferID: channelState.TransferID(), + Status: channelState.Status(), + BaseCID: channelState.BaseCID(), + IsInitiator: channelID.Initiator == a.Host.ID(), + IsSender: channelState.Sender() == a.Host.ID(), + Message: channelState.Message(), + } + voucherJSON, err := json.Marshal(channelState.Voucher()) + if err != nil { + return nil, err + } + channel.VoucherJSON = string(voucherJSON) + if channel.IsSender { + channel.Transferred = channelState.Sent() + channel.OtherPeer = channelState.Recipient() + } else { + channel.Transferred = channelState.Received() + channel.OtherPeer = channelState.Sender() + } + apiChannels = append(apiChannels, channel) + } + + return apiChannels, nil +} From ef7f7375e70a9377f2d15f81676460f3764e8953 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 18 Aug 2020 16:43:05 -0700 Subject: [PATCH 2/4] fix(cli): cleanup data-transfers cleanup output for data transfers. --- cli/client.go | 39 ++++++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/cli/client.go b/cli/client.go index fe4ed8c1f0..fac8915307 100644 --- a/cli/client.go +++ b/cli/client.go @@ -1247,21 +1247,29 @@ var clientListTransfers = &cli.Command{ fmt.Fprintf(os.Stdout, "Sending Channels\n\n") w := tablewriter.New(tablewriter.Col("ID"), tablewriter.Col("Status"), - tablewriter.Col("Other Party"), + tablewriter.Col("Sending To"), tablewriter.Col("Root Cid"), tablewriter.Col("Initiated?"), tablewriter.Col("Transferred"), - tablewriter.NewLineCol("Voucher"), + tablewriter.Col("Voucher"), tablewriter.NewLineCol("Message")) for _, channel := range sendingChannels { - w.Write(toChannelOutput(color, channel)) + w.Write(toChannelOutput(color, "Sending To", channel)) } w.Flush(os.Stdout) fmt.Fprintf(os.Stdout, "\nReceiving Channels\n\n") + w = tablewriter.New(tablewriter.Col("ID"), + tablewriter.Col("Status"), + tablewriter.Col("Receiving From"), + tablewriter.Col("Root Cid"), + tablewriter.Col("Initiated?"), + tablewriter.Col("Transferred"), + tablewriter.Col("Voucher"), + tablewriter.NewLineCol("Message")) for _, channel := range receivingChannels { - w.Write(toChannelOutput(color, channel)) + w.Write(toChannelOutput(color, "Receiving From", channel)) } return w.Flush(os.Stdout) }, @@ -1283,7 +1291,7 @@ func channelStatusString(useColor bool, status datatransfer.Status) string { } } -func toChannelOutput(useColor bool, channel api.DataTransferChannel) map[string]interface{} { +func toChannelOutput(useColor bool, otherPartyColumn string, channel api.DataTransferChannel) map[string]interface{} { rootCid := channel.BaseCID.String() rootCid = "..." + rootCid[len(rootCid)-8:] @@ -1295,14 +1303,19 @@ func toChannelOutput(useColor bool, channel api.DataTransferChannel) map[string] initiated = "Y" } + voucher := channel.VoucherJSON + if len(voucher) > 40 { + voucher = "..." + voucher[len(voucher)-37:] + } + return map[string]interface{}{ - "ID": channel.TransferID, - "Status": channelStatusString(useColor, channel.Status), - "Other Party": otherParty, - "Root Cid": rootCid, - "Initiated?": initiated, - "Transferred": channel.Transferred, - "Voucher": channel.VoucherJSON, - "Message": channel.Message, + "ID": channel.TransferID, + "Status": channelStatusString(useColor, channel.Status), + otherPartyColumn: otherParty, + "Root Cid": rootCid, + "Initiated?": initiated, + "Transferred": channel.Transferred, + "Voucher": voucher, + "Message": channel.Message, } } From 66ac7c195cb74646943ea8fb9b32e069ffa5517f Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 18 Aug 2020 17:36:22 -0700 Subject: [PATCH 3/4] feat(cli): add updates to data transfer Add an API to get data transfer updates and add modify the CLI to be an ongoing monitoring plan --- api/api_full.go | 2 + api/apistruct/struct.go | 37 ++++++----- api/types.go | 2 +- cli/client.go | 122 +++++++++++++++++++++++++------------ go.mod | 1 + go.sum | 2 + node/impl/client/client.go | 75 +++++++++++++++++------ 7 files changed, 164 insertions(+), 77 deletions(-) diff --git a/api/api_full.go b/api/api_full.go index 3e26832418..72799c846f 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -258,6 +258,8 @@ type FullNode interface { ClientDealSize(ctx context.Context, root cid.Cid) (DataSize, error) // ClientListTransfers returns the status of all ongoing transfers of data ClientListDataTransfers(ctx context.Context) ([]DataTransferChannel, error) + ClientDataTransferUpdates(ctx context.Context) (<-chan DataTransferChannel, error) + // ClientUnimport removes references to the specified file from filestore //ClientUnimport(path string) diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index ffa28be908..282dd31160 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -127,22 +127,23 @@ type FullNodeStruct struct { WalletImport func(context.Context, *types.KeyInfo) (address.Address, error) `perm:"admin"` WalletDelete func(context.Context, address.Address) error `perm:"write"` - ClientImport func(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) `perm:"admin"` - ClientListImports func(ctx context.Context) ([]api.Import, error) `perm:"write"` - ClientRemoveImport func(ctx context.Context, importID multistore.StoreID) error `perm:"admin"` - ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"` - ClientFindData func(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) `perm:"read"` - ClientMinerQueryOffer func(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (api.QueryOffer, error) `perm:"read"` - ClientStartDeal func(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) `perm:"admin"` - ClientGetDealInfo func(context.Context, cid.Cid) (*api.DealInfo, error) `perm:"read"` - ClientListDeals func(ctx context.Context) ([]api.DealInfo, error) `perm:"write"` - ClientRetrieve func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error `perm:"admin"` - ClientRetrieveWithEvents func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) `perm:"admin"` - ClientQueryAsk func(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) `perm:"read"` - ClientCalcCommP func(ctx context.Context, inpath string) (*api.CommPRet, error) `perm:"read"` - ClientGenCar func(ctx context.Context, ref api.FileRef, outpath string) error `perm:"write"` - ClientDealSize func(ctx context.Context, root cid.Cid) (api.DataSize, error) `perm:"read"` - ClientListDataTransfers func(ctx context.Context) ([]api.DataTransferChannel, error) `perm:"write"` + ClientImport func(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) `perm:"admin"` + ClientListImports func(ctx context.Context) ([]api.Import, error) `perm:"write"` + ClientRemoveImport func(ctx context.Context, importID multistore.StoreID) error `perm:"admin"` + ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"` + ClientFindData func(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) `perm:"read"` + ClientMinerQueryOffer func(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (api.QueryOffer, error) `perm:"read"` + ClientStartDeal func(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) `perm:"admin"` + ClientGetDealInfo func(context.Context, cid.Cid) (*api.DealInfo, error) `perm:"read"` + ClientListDeals func(ctx context.Context) ([]api.DealInfo, error) `perm:"write"` + ClientRetrieve func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error `perm:"admin"` + ClientRetrieveWithEvents func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) `perm:"admin"` + ClientQueryAsk func(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) `perm:"read"` + ClientCalcCommP func(ctx context.Context, inpath string) (*api.CommPRet, error) `perm:"read"` + ClientGenCar func(ctx context.Context, ref api.FileRef, outpath string) error `perm:"write"` + ClientDealSize func(ctx context.Context, root cid.Cid) (api.DataSize, error) `perm:"read"` + ClientListDataTransfers func(ctx context.Context) ([]api.DataTransferChannel, error) `perm:"write"` + ClientDataTransferUpdates func(ctx context.Context) (<-chan api.DataTransferChannel, error) `perm:"write"` StateNetworkName func(context.Context) (dtypes.NetworkName, error) `perm:"read"` StateMinerSectors func(context.Context, address.Address, *abi.BitField, bool, types.TipSetKey) ([]*api.ChainSectorInfo, error) `perm:"read"` @@ -454,6 +455,10 @@ func (c *FullNodeStruct) ClientListDataTransfers(ctx context.Context) ([]api.Dat return c.Internal.ClientListDataTransfers(ctx) } +func (c *FullNodeStruct) ClientDataTransferUpdates(ctx context.Context) (<-chan api.DataTransferChannel, error) { + return c.Internal.ClientDataTransferUpdates(ctx) +} + func (c *FullNodeStruct) GasEstimateGasPremium(ctx context.Context, nblocksincl uint64, sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error) { return c.Internal.GasEstimateGasPremium(ctx, nblocksincl, sender, gaslimit, tsk) diff --git a/api/types.go b/api/types.go index 4b707dc9f2..9a874d1c2f 100644 --- a/api/types.go +++ b/api/types.go @@ -108,7 +108,7 @@ type DataTransferChannel struct { BaseCID cid.Cid IsInitiator bool IsSender bool - VoucherJSON string + Voucher string Message string OtherPeer peer.ID Transferred uint64 diff --git a/cli/client.go b/cli/client.go index fac8915307..1b1072edd8 100644 --- a/cli/client.go +++ b/cli/client.go @@ -10,6 +10,7 @@ import ( "text/tabwriter" "time" + tm "github.com/buger/goterm" "github.com/docker/go-units" "github.com/fatih/color" datatransfer "github.com/filecoin-project/go-data-transfer" @@ -1215,6 +1216,10 @@ var clientListTransfers = &cli.Command{ Usage: "use color in display output", Value: true, }, + &cli.BoolFlag{ + Name: "completed", + Usage: "show completed data transfers", + }, }, Action: func(cctx *cli.Context) error { api, closer, err := GetFullNodeAPI(cctx) @@ -1229,49 +1234,86 @@ var clientListTransfers = &cli.Command{ return err } - sort.Slice(channels, func(i, j int) bool { - return channels[i].TransferID < channels[j].TransferID - }) + channelUpdates, err := api.ClientDataTransferUpdates(ctx) + if err != nil { + return err + } - var receivingChannels, sendingChannels []lapi.DataTransferChannel - for _, channel := range channels { - if channel.IsSender { - sendingChannels = append(sendingChannels, channel) - } else { - receivingChannels = append(receivingChannels, channel) + for { + tm.Clear() // Clear current screen + + tm.MoveCursor(1, 1) + + sort.Slice(channels, func(i, j int) bool { + return channels[i].TransferID < channels[j].TransferID + }) + + completed := cctx.Bool("completed") + var receivingChannels, sendingChannels []lapi.DataTransferChannel + for _, channel := range channels { + if !completed && channel.Status == datatransfer.Completed { + continue + } + if channel.IsSender { + sendingChannels = append(sendingChannels, channel) + } else { + receivingChannels = append(receivingChannels, channel) + } } - } - color := cctx.Bool("color") + color := cctx.Bool("color") + + tm.Printf("Sending Channels\n\n") + w := tablewriter.New(tablewriter.Col("ID"), + tablewriter.Col("Status"), + tablewriter.Col("Sending To"), + tablewriter.Col("Root Cid"), + tablewriter.Col("Initiated?"), + tablewriter.Col("Transferred"), + tablewriter.Col("Voucher"), + tablewriter.NewLineCol("Message")) + for _, channel := range sendingChannels { + w.Write(toChannelOutput(color, "Sending To", channel)) + } + w.Flush(tm.Screen) + + fmt.Fprintf(os.Stdout, "\nReceiving Channels\n\n") + w = tablewriter.New(tablewriter.Col("ID"), + tablewriter.Col("Status"), + tablewriter.Col("Receiving From"), + tablewriter.Col("Root Cid"), + tablewriter.Col("Initiated?"), + tablewriter.Col("Transferred"), + tablewriter.Col("Voucher"), + tablewriter.NewLineCol("Message")) + for _, channel := range receivingChannels { + + w.Write(toChannelOutput(color, "Receiving From", channel)) + } + w.Flush(tm.Screen) + + tm.Flush() - fmt.Fprintf(os.Stdout, "Sending Channels\n\n") - w := tablewriter.New(tablewriter.Col("ID"), - tablewriter.Col("Status"), - tablewriter.Col("Sending To"), - tablewriter.Col("Root Cid"), - tablewriter.Col("Initiated?"), - tablewriter.Col("Transferred"), - tablewriter.Col("Voucher"), - tablewriter.NewLineCol("Message")) - for _, channel := range sendingChannels { - w.Write(toChannelOutput(color, "Sending To", channel)) - } - w.Flush(os.Stdout) - - fmt.Fprintf(os.Stdout, "\nReceiving Channels\n\n") - w = tablewriter.New(tablewriter.Col("ID"), - tablewriter.Col("Status"), - tablewriter.Col("Receiving From"), - tablewriter.Col("Root Cid"), - tablewriter.Col("Initiated?"), - tablewriter.Col("Transferred"), - tablewriter.Col("Voucher"), - tablewriter.NewLineCol("Message")) - for _, channel := range receivingChannels { - - w.Write(toChannelOutput(color, "Receiving From", channel)) - } - return w.Flush(os.Stdout) + select { + case <-ctx.Done(): + return nil + case channelUpdate := <-channelUpdates: + var found bool + for i, existing := range channels { + if existing.TransferID == channelUpdate.TransferID && + existing.OtherPeer == channelUpdate.OtherPeer && + existing.IsSender == channelUpdate.IsSender && + existing.IsInitiator == channelUpdate.IsInitiator { + channels[i] = channelUpdate + found = true + break + } + } + if !found { + channels = append(channels, channelUpdate) + } + } + } }, } @@ -1303,7 +1345,7 @@ func toChannelOutput(useColor bool, otherPartyColumn string, channel api.DataTra initiated = "Y" } - voucher := channel.VoucherJSON + voucher := channel.Voucher if len(voucher) > 40 { voucher = "..." + voucher[len(voucher)-37:] } diff --git a/go.mod b/go.mod index aeb96ffbd4..d3c945c38c 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/Gurpartap/async v0.0.0-20180927173644-4f7f499dd9ee github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d + github.com/buger/goterm v0.0.0-20200322175922-2f3e71b85129 github.com/coreos/go-systemd/v22 v22.0.0 github.com/detailyang/go-fallocate v0.0.0-20180908115635-432fa640bd2e github.com/dgraph-io/badger/v2 v2.0.3 diff --git a/go.sum b/go.sum index 26966c5059..a916c60746 100644 --- a/go.sum +++ b/go.sum @@ -112,6 +112,8 @@ github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVa github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc= github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY= github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs= +github.com/buger/goterm v0.0.0-20200322175922-2f3e71b85129 h1:gfAMKE626QEuKG3si0pdTRcr/YEbBoxY+3GOH3gWvl4= +github.com/buger/goterm v0.0.0-20200322175922-2f3e71b85129/go.mod h1:u9UyCz2eTrSGy6fbupqJ54eY5c4IC8gREQ1053dK12U= github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 38696431b3..248b9ec6ac 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -7,6 +7,7 @@ import ( "io" "os" + datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/specs-actors/actors/abi/big" "golang.org/x/xerrors" @@ -470,11 +471,15 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) { if state.PayloadCID.Equals(order.Root) { - events <- marketevents.RetrievalEvent{ + select { + case <-ctx.Done(): + return + case events <- marketevents.RetrievalEvent{ Event: event, Status: state.Status, BytesReceived: state.TotalReceived, FundsSpent: state.FundsSpent, + }: } switch state.Status { @@ -767,29 +772,59 @@ func (a *API) ClientListDataTransfers(ctx context.Context) ([]api.DataTransferCh } apiChannels := make([]api.DataTransferChannel, 0, len(inProgressChannels)) - for channelID, channelState := range inProgressChannels { - channel := api.DataTransferChannel{ - TransferID: channelState.TransferID(), - Status: channelState.Status(), - BaseCID: channelState.BaseCID(), - IsInitiator: channelID.Initiator == a.Host.ID(), - IsSender: channelState.Sender() == a.Host.ID(), - Message: channelState.Message(), + for _, channelState := range inProgressChannels { + apiChannels = append(apiChannels, toAPIChannel(a.Host.ID(), channelState)) + } + + return apiChannels, nil +} + +func (a *API) ClientDataTransferUpdates(ctx context.Context) (<-chan api.DataTransferChannel, error) { + channels := make(chan api.DataTransferChannel) + + unsub := a.DataTransfer.SubscribeToEvents(func(evt datatransfer.Event, channelState datatransfer.ChannelState) { + channel := toAPIChannel(a.Host.ID(), channelState) + select { + case <-ctx.Done(): + case channels <- channel: } + }) + + go func() { + defer unsub() + <-ctx.Done() + }() + + return channels, nil +} + +func toAPIChannel(hostID peer.ID, channelState datatransfer.ChannelState) api.DataTransferChannel { + channel := api.DataTransferChannel{ + TransferID: channelState.TransferID(), + Status: channelState.Status(), + BaseCID: channelState.BaseCID(), + IsSender: channelState.Sender() == hostID, + Message: channelState.Message(), + } + stringer, ok := channelState.Voucher().(fmt.Stringer) + if ok { + channel.Voucher = stringer.String() + } else { voucherJSON, err := json.Marshal(channelState.Voucher()) if err != nil { - return nil, err - } - channel.VoucherJSON = string(voucherJSON) - if channel.IsSender { - channel.Transferred = channelState.Sent() - channel.OtherPeer = channelState.Recipient() + channel.Voucher = fmt.Errorf("Voucher Serialization: %w", err).Error() } else { - channel.Transferred = channelState.Received() - channel.OtherPeer = channelState.Sender() + channel.Voucher = string(voucherJSON) } - apiChannels = append(apiChannels, channel) } - - return apiChannels, nil + if channel.IsSender { + channel.IsInitiator = !channelState.IsPull() + channel.Transferred = channelState.Sent() + channel.OtherPeer = channelState.Recipient() + } else { + channel.IsInitiator = channelState.IsPull() + channel.Transferred = channelState.Received() + channel.OtherPeer = channelState.Sender() + } + return channel } From 4fc5f9fe9f832c7dc362436e1040443a7ef11b66 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 18 Aug 2020 18:23:29 -0700 Subject: [PATCH 4/4] feat(cli): data transfer watch option make watching data transfer CLI updates only an option --- cli/client.go | 153 +++++++++++++++++++++++++++----------------------- 1 file changed, 83 insertions(+), 70 deletions(-) diff --git a/cli/client.go b/cli/client.go index 1b1072edd8..b3ee6220ba 100644 --- a/cli/client.go +++ b/cli/client.go @@ -3,6 +3,7 @@ package cli import ( "encoding/json" "fmt" + "io" "os" "path/filepath" "sort" @@ -1209,7 +1210,7 @@ var clientInfoCmd = &cli.Command{ var clientListTransfers = &cli.Command{ Name: "list-transfers", - Usage: "Monitor ongoing data transfers for deals", + Usage: "List ongoing data transfers for deals", Flags: []cli.Flag{ &cli.BoolFlag{ Name: "color", @@ -1220,6 +1221,10 @@ var clientListTransfers = &cli.Command{ Name: "completed", Usage: "show completed data transfers", }, + &cli.BoolFlag{ + Name: "watch", + Usage: "watch deal updates in real-time, rather than a one time list", + }, }, Action: func(cctx *cli.Context) error { api, closer, err := GetFullNodeAPI(cctx) @@ -1234,89 +1239,97 @@ var clientListTransfers = &cli.Command{ return err } - channelUpdates, err := api.ClientDataTransferUpdates(ctx) - if err != nil { - return err - } - - for { - tm.Clear() // Clear current screen - - tm.MoveCursor(1, 1) - - sort.Slice(channels, func(i, j int) bool { - return channels[i].TransferID < channels[j].TransferID - }) + completed := cctx.Bool("completed") + color := cctx.Bool("color") + watch := cctx.Bool("watch") - completed := cctx.Bool("completed") - var receivingChannels, sendingChannels []lapi.DataTransferChannel - for _, channel := range channels { - if !completed && channel.Status == datatransfer.Completed { - continue - } - if channel.IsSender { - sendingChannels = append(sendingChannels, channel) - } else { - receivingChannels = append(receivingChannels, channel) - } + if watch { + channelUpdates, err := api.ClientDataTransferUpdates(ctx) + if err != nil { + return err } - color := cctx.Bool("color") + for { + tm.Clear() // Clear current screen - tm.Printf("Sending Channels\n\n") - w := tablewriter.New(tablewriter.Col("ID"), - tablewriter.Col("Status"), - tablewriter.Col("Sending To"), - tablewriter.Col("Root Cid"), - tablewriter.Col("Initiated?"), - tablewriter.Col("Transferred"), - tablewriter.Col("Voucher"), - tablewriter.NewLineCol("Message")) - for _, channel := range sendingChannels { - w.Write(toChannelOutput(color, "Sending To", channel)) - } - w.Flush(tm.Screen) - - fmt.Fprintf(os.Stdout, "\nReceiving Channels\n\n") - w = tablewriter.New(tablewriter.Col("ID"), - tablewriter.Col("Status"), - tablewriter.Col("Receiving From"), - tablewriter.Col("Root Cid"), - tablewriter.Col("Initiated?"), - tablewriter.Col("Transferred"), - tablewriter.Col("Voucher"), - tablewriter.NewLineCol("Message")) - for _, channel := range receivingChannels { + tm.MoveCursor(1, 1) - w.Write(toChannelOutput(color, "Receiving From", channel)) - } - w.Flush(tm.Screen) + outputChannels(tm.Screen, channels, completed, color) - tm.Flush() + tm.Flush() - select { - case <-ctx.Done(): - return nil - case channelUpdate := <-channelUpdates: - var found bool - for i, existing := range channels { - if existing.TransferID == channelUpdate.TransferID && - existing.OtherPeer == channelUpdate.OtherPeer && - existing.IsSender == channelUpdate.IsSender && - existing.IsInitiator == channelUpdate.IsInitiator { - channels[i] = channelUpdate - found = true - break + select { + case <-ctx.Done(): + return nil + case channelUpdate := <-channelUpdates: + var found bool + for i, existing := range channels { + if existing.TransferID == channelUpdate.TransferID && + existing.OtherPeer == channelUpdate.OtherPeer && + existing.IsSender == channelUpdate.IsSender && + existing.IsInitiator == channelUpdate.IsInitiator { + channels[i] = channelUpdate + found = true + break + } + } + if !found { + channels = append(channels, channelUpdate) } - } - if !found { - channels = append(channels, channelUpdate) } } } + outputChannels(os.Stdout, channels, completed, color) + return nil }, } +func outputChannels(out io.Writer, channels []api.DataTransferChannel, completed bool, color bool) { + sort.Slice(channels, func(i, j int) bool { + return channels[i].TransferID < channels[j].TransferID + }) + + var receivingChannels, sendingChannels []lapi.DataTransferChannel + for _, channel := range channels { + if !completed && channel.Status == datatransfer.Completed { + continue + } + if channel.IsSender { + sendingChannels = append(sendingChannels, channel) + } else { + receivingChannels = append(receivingChannels, channel) + } + } + + fmt.Fprintf(out, "Sending Channels\n\n") + w := tablewriter.New(tablewriter.Col("ID"), + tablewriter.Col("Status"), + tablewriter.Col("Sending To"), + tablewriter.Col("Root Cid"), + tablewriter.Col("Initiated?"), + tablewriter.Col("Transferred"), + tablewriter.Col("Voucher"), + tablewriter.NewLineCol("Message")) + for _, channel := range sendingChannels { + w.Write(toChannelOutput(color, "Sending To", channel)) + } + w.Flush(out) + + fmt.Fprintf(out, "\nReceiving Channels\n\n") + w = tablewriter.New(tablewriter.Col("ID"), + tablewriter.Col("Status"), + tablewriter.Col("Receiving From"), + tablewriter.Col("Root Cid"), + tablewriter.Col("Initiated?"), + tablewriter.Col("Transferred"), + tablewriter.Col("Voucher"), + tablewriter.NewLineCol("Message")) + for _, channel := range receivingChannels { + w.Write(toChannelOutput(color, "Receiving From", channel)) + } + w.Flush(out) +} + func channelStatusString(useColor bool, status datatransfer.Status) string { s := datatransfer.Statuses[status] if !useColor {