Skip to content

Commit

Permalink
address concerns from PR
Browse files Browse the repository at this point in the history
  • Loading branch information
whyrusleeping committed Mar 29, 2015
1 parent f0031af commit feb204d
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 67 deletions.
115 changes: 62 additions & 53 deletions core/commands/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@ import (

var StatsCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Query IPFS daemon statistics",
Tagline: "Query IPFS statistics",
ShortDescription: ``,
},

Subcommands: map[string]*cmds.Command{
"bw": statBwCmd,
"bw-poll": statBwPollCmd,
"bw": statBwCmd,
},
}

Expand All @@ -36,6 +35,8 @@ var statBwCmd = &cmds.Command{
Options: []cmds.Option{
cmds.StringOption("peer", "p", "specify a peer to print bandwidth for"),
cmds.StringOption("proto", "t", "specify a protocol to print bandwidth for"),
cmds.BoolOption("poll", "specify a protocol to print bandwidth for"),
cmds.StringOption("interval", "i", "time interval to wait between updating output"),
},

Run: func(req cmds.Request, res cmds.Response) {
Expand Down Expand Up @@ -67,73 +68,64 @@ var statBwCmd = &cmds.Command{
return
}

var pid peer.ID
if pfound {
pid, err := peer.IDB58Decode(pstr)
checkpid, err := peer.IDB58Decode(pstr)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

stats := nd.Reporter.GetBandwidthForPeer(pid)
res.SetOutput(&stats)
} else if tfound {
pid := protocol.ID(tstr)
stats := nd.Reporter.GetBandwidthForProtocol(pid)
res.SetOutput(&stats)
} else {
totals := nd.Reporter.GetBandwidthTotals()
res.SetOutput(&totals)
pid = checkpid
}
},
Type: metrics.Stats{},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
bs := res.Output().(*metrics.Stats)
out := new(bytes.Buffer)
fmt.Fprintln(out, "Bandwidth")
fmt.Fprintf(out, "TotalIn: %s\n", humanize.Bytes(uint64(bs.TotalIn)))
fmt.Fprintf(out, "TotalOut: %s\n", humanize.Bytes(uint64(bs.TotalOut)))
fmt.Fprintf(out, "RateIn: %s/s\n", humanize.Bytes(uint64(bs.RateIn)))
fmt.Fprintf(out, "RateOut: %s/s\n", humanize.Bytes(uint64(bs.RateOut)))
return out, nil
},
},
}

var statBwPollCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Print ipfs bandwidth information continuously",
ShortDescription: ``,
},
Run: func(req cmds.Request, res cmds.Response) {
nd, err := req.Context().GetNode()
interval := time.Second
timeS, found, err := req.Option("interval").String()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
if found {
v, err := time.ParseDuration(timeS)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
interval = v
}

// Must be online!
if !nd.OnlineMode() {
res.SetError(errNotOnline, cmds.ErrClient)
doPoll, _, err := req.Option("poll").Bool()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

out := make(chan interface{})
res.SetOutput((<-chan interface{})(out))

go func() {
defer close(out)
tick := time.NewTicker(time.Second)
for {
if pfound {
stats := nd.Reporter.GetBandwidthForPeer(pid)
out <- &stats
} else if tfound {
protoId := protocol.ID(tstr)
stats := nd.Reporter.GetBandwidthForProtocol(protoId)
out <- &stats
} else {
totals := nd.Reporter.GetBandwidthTotals()
out <- &totals
}
if !doPoll {
return
}
select {
case <-time.After(interval):
case <-req.Context().Context.Done():
return
case <-tick.C:
totals := nd.Reporter.GetBandwidthTotals()
out <- &totals
}
}
}()

res.SetOutput((<-chan interface{})(out))
},
Type: metrics.Stats{},
Marshalers: cmds.MarshalerMap{
Expand All @@ -143,22 +135,31 @@ var statBwPollCmd = &cmds.Command{
return nil, u.ErrCast()
}

polling, _, err := res.Request().Option("poll").Bool()
if err != nil {
return nil, err
}

first := true
marshal := func(v interface{}) (io.Reader, error) {
bs, ok := v.(*metrics.Stats)
if !ok {
return nil, u.ErrCast()
}
out := new(bytes.Buffer)
if first {
fmt.Fprintln(out, "Total Up\t Total Down\t Rate Up\t Rate Down")
first = false
if !polling {
printStats(out, bs)
} else {
if first {
fmt.Fprintln(out, "Total Up\t Total Down\t Rate Up\t Rate Down")
first = false
}
fmt.Fprint(out, "\r")
fmt.Fprintf(out, "%s \t\t", humanize.Bytes(uint64(bs.TotalOut)))
fmt.Fprintf(out, " %s \t\t", humanize.Bytes(uint64(bs.TotalIn)))
fmt.Fprintf(out, " %s/s \t", humanize.Bytes(uint64(bs.RateOut)))
fmt.Fprintf(out, " %s/s ", humanize.Bytes(uint64(bs.RateIn)))
}
fmt.Fprint(out, "\r")
fmt.Fprintf(out, "%s \t\t", humanize.Bytes(uint64(bs.TotalOut)))
fmt.Fprintf(out, " %s \t\t", humanize.Bytes(uint64(bs.TotalIn)))
fmt.Fprintf(out, " %s/s \t", humanize.Bytes(uint64(bs.RateOut)))
fmt.Fprintf(out, " %s/s ", humanize.Bytes(uint64(bs.RateIn)))
return out, nil

}
Expand All @@ -170,3 +171,11 @@ var statBwPollCmd = &cmds.Command{
},
},
}

func printStats(out io.Writer, bs *metrics.Stats) {
fmt.Fprintln(out, "Bandwidth")
fmt.Fprintf(out, "TotalIn: %s\n", humanize.Bytes(uint64(bs.TotalIn)))
fmt.Fprintf(out, "TotalOut: %s\n", humanize.Bytes(uint64(bs.TotalOut)))
fmt.Fprintf(out, "RateIn: %s/s\n", humanize.Bytes(uint64(bs.RateIn)))
fmt.Fprintf(out, "RateOut: %s/s\n", humanize.Bytes(uint64(bs.RateOut)))
}
4 changes: 3 additions & 1 deletion core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,9 @@ func constructPeerHost(ctx context.Context, id peer.ID, ps peer.Peerstore, bwr m
return nil, debugerror.Wrap(err)
}

host := p2pbhost.New(network, bwr, p2pbhost.NATPortMap)
host := p2pbhost.New(network, p2pbhost.NATPortMap)
host.SetReporter(bwr)

return host, nil
}

Expand Down
12 changes: 10 additions & 2 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ type BasicHost struct {
}

// New constructs and sets up a new *BasicHost with given Network
func New(net inet.Network, bwc metrics.Reporter, opts ...Option) *BasicHost {
func New(net inet.Network, opts ...Option) *BasicHost {
h := &BasicHost{
network: net,
mux: protocol.NewMux(),
bwc: bwc,
bwc: metrics.NewBandwidthCounter(),
}

h.proc = goprocess.WithTeardown(func() error {
Expand Down Expand Up @@ -80,6 +80,14 @@ func New(net inet.Network, bwc metrics.Reporter, opts ...Option) *BasicHost {
return h
}

func (h *BasicHost) SetReporter(bwc metrics.Reporter) {
h.bwc = bwc
}

func (h *BasicHost) GetReporter() metrics.Reporter {
return h.bwc
}

// newConnHandler is the remote-opened conn handler for inet.Network
func (h *BasicHost) newConnHandler(c inet.Conn) {
h.ids.IdentifyConn(c)
Expand Down
6 changes: 3 additions & 3 deletions p2p/net/conn/dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func setupConn(t *testing.T, ctx context.Context, secure bool) (a, b Conn, p1, p
key1 = nil
key2 = nil
}
l1, err := Listen(ctx, p1.Addr, p1.ID, key1, nil)
l1, err := Listen(ctx, p1.Addr, p1.ID, key1)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -111,7 +111,7 @@ func testDialer(t *testing.T, secure bool) {
}

ctx, cancel := context.WithCancel(context.Background())
l1, err := Listen(ctx, p1.Addr, p1.ID, key1, nil)
l1, err := Listen(ctx, p1.Addr, p1.ID, key1)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -185,7 +185,7 @@ func testDialerCloseEarly(t *testing.T, secure bool) {
}

ctx, cancel := context.WithCancel(context.Background())
l1, err := Listen(ctx, p1.Addr, p1.ID, key1, nil)
l1, err := Listen(ctx, p1.Addr, p1.ID, key1)
if err != nil {
t.Fatal(err)
}
Expand Down
13 changes: 11 additions & 2 deletions p2p/net/conn/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (l *listener) Loggable() map[string]interface{} {
}

// Listen listens on the particular multiaddr, with given peer and peerstore.
func Listen(ctx context.Context, addr ma.Multiaddr, local peer.ID, sk ic.PrivKey, connWrapper ConnWrapper) (Listener, error) {
func Listen(ctx context.Context, addr ma.Multiaddr, local peer.ID, sk ic.PrivKey) (Listener, error) {
ml, err := manetListen(addr)
if err != nil {
return nil, err
Expand All @@ -145,7 +145,6 @@ func Listen(ctx context.Context, addr ma.Multiaddr, local peer.ID, sk ic.PrivKey
local: local,
privk: sk,
cg: ctxgroup.WithContext(ctx),
wrapper: connWrapper,
}
l.cg.SetTeardown(l.teardown)

Expand All @@ -154,6 +153,16 @@ func Listen(ctx context.Context, addr ma.Multiaddr, local peer.ID, sk ic.PrivKey
return l, nil
}

type ListenerConnWrapper interface {
SetConnWrapper(ConnWrapper)
}

// SetConnWrapper assigns a maconn ConnWrapper to wrap all incoming
// connections with. MUST be set _before_ calling `Accept()`
func (l *listener) SetConnWrapper(cw ConnWrapper) {
l.wrapper = cw
}

func manetListen(addr ma.Multiaddr) (manet.Listener, error) {
network, naddr, err := manet.DialArgs(addr)
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions p2p/net/mock/mock_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"sort"
"sync"

metrics "github.com/jbenet/go-ipfs/metrics"
ic "github.com/jbenet/go-ipfs/p2p/crypto"
host "github.com/jbenet/go-ipfs/p2p/host"
bhost "github.com/jbenet/go-ipfs/p2p/host/basic"
Expand Down Expand Up @@ -67,7 +66,7 @@ func (mn *mocknet) AddPeer(k ic.PrivKey, a ma.Multiaddr) (host.Host, error) {
return nil, err
}

h := bhost.New(n, metrics.NewBandwidthCounter())
h := bhost.New(n)
log.Debugf("mocknet added listen addr for peer: %s -- %s", n.LocalPeer(), a)

mn.cg.AddChild(n.cg)
Expand Down
10 changes: 7 additions & 3 deletions p2p/net/swarm/swarm_listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,17 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error {
log.Warning("Listener not given PrivateKey, so WILL NOT SECURE conns.")
}
log.Debugf("Swarm Listening at %s", maddr)
list, err := conn.Listen(s.cg.Context(), maddr, s.local, sk, func(c manet.Conn) manet.Conn {
return mconn.WrapConn(s.bwc, c)
})
list, err := conn.Listen(s.cg.Context(), maddr, s.local, sk)
if err != nil {
return err
}

if cw, ok := list.(conn.ListenerConnWrapper); ok {
cw.SetConnWrapper(func(c manet.Conn) manet.Conn {
return mconn.WrapConn(s.bwc, c)
})
}

// AddListener to the peerstream Listener. this will begin accepting connections
// and streams!
sl, err := s.swarm.AddListener(list)
Expand Down
2 changes: 1 addition & 1 deletion p2p/test/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func DivulgeAddresses(a, b inet.Network) {

func GenHostSwarm(t *testing.T, ctx context.Context) *bhost.BasicHost {
n := GenSwarmNetwork(t, ctx)
return bhost.New(n, metrics.NewBandwidthCounter())
return bhost.New(n)
}

var RandPeerID = tu.RandPeerID

0 comments on commit feb204d

Please sign in to comment.