Skip to content

Commit

Permalink
update the peer protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
xgfone committed Mar 4, 2023
1 parent 7fa1fa8 commit fad4a74
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 359 deletions.
219 changes: 3 additions & 216 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@

A pure golang implementation of [BitTorrent](http://bittorrent.org/beps/bep_0000.html) library, which is inspired by [dht](https://github.com/shiyanhui/dht) and [torrent](https://github.com/anacrolix/torrent).


## Install

```shell
$ go get -u github.com/xgfone/bt
```


## Features

- Support `Go1.9+`.
Expand All @@ -17,8 +16,8 @@ $ go get -u github.com/xgfone/bt
- Pure Go implementation without `CGO`.
- Only library without any denpendencies. For the command tools, see [bttools](https://github.com/xgfone/bttools).


## The Implemented Specifications

- [x] [**BEP 03:** The BitTorrent Protocol Specification](http://bittorrent.org/beps/bep_0003.html)
- [x] [**BEP 05:** DHT Protocol](http://bittorrent.org/beps/bep_0005.html)
- [x] [**BEP 06:** Fast Extension](http://bittorrent.org/beps/bep_0006.html)
Expand All @@ -37,218 +36,6 @@ $ go get -u github.com/xgfone/bt
- [ ] [**BEP 44:** Storing arbitrary data in the DHT](http://bittorrent.org/beps/bep_0044.html)
- [x] [**BEP 48:** Tracker Protocol Extension: Scrape](http://bittorrent.org/beps/bep_0048.html)


## Example
See [godoc](https://pkg.go.dev/github.com/xgfone/bt) or [bttools](https://github.com/xgfone/bttools).

### Example 1: Download the file from the remote peer

```go
package main

import (
"context"
"flag"
"fmt"
"io"
"log"
"os"
"time"

"github.com/xgfone/bt/downloader"
"github.com/xgfone/bt/metainfo"
pp "github.com/xgfone/bt/peerprotocol"
"github.com/xgfone/bt/tracker"
)

var peeraddr string

func init() {
flag.StringVar(&peeraddr, "peeraddr", "", "The address of the peer storing the file.")
}

func getPeersFromTrackers(id, infohash metainfo.Hash, trackers []string) (peers []string) {
c, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()

resp := tracker.GetPeers(c, id, infohash, trackers)
for _, r := range resp {
for _, addr := range r.Resp.Addresses {
addrs := addr.String()
nonexist := true
for _, peer := range peers {
if peer == addrs {
nonexist = false
break
}
}

if nonexist {
peers = append(peers, addrs)
}
}
}

return
}

func main() {
flag.Parse()

torrentfile := os.Args[1]
mi, err := metainfo.LoadFromFile(torrentfile)
if err != nil {
fmt.Println(err)
os.Exit(1)
}

id := metainfo.NewRandomHash()
infohash := mi.InfoHash()
info, err := mi.Info()
if err != nil {
fmt.Println(err)
os.Exit(1)
}

var peers []string
if peeraddr != "" {
peers = []string{peeraddr}
} else {
// Get the peers from the trackers in the torrent file.
trackers := mi.Announces().Unique()
if len(trackers) == 0 {
fmt.Println("no trackers")
return
}

peers = getPeersFromTrackers(id, infohash, trackers)
if len(peers) == 0 {
fmt.Println("no peers")
return
}
}

// We save the downloaded file to the current directory.
w := metainfo.NewWriter("", info, 0)
defer w.Close()

// We don't request the blocks from the remote peers concurrently,
// and it is only an example. But you can do it concurrently.
dm := newDownloadManager(w, info)
for peerslen := len(peers); peerslen > 0 && !dm.IsFinished(); {
peerslen--
peer := peers[peerslen]
peers = peers[:peerslen]
downloadFileFromPeer(peer, id, infohash, dm)
}
}

func downloadFileFromPeer(peer string, id, infohash metainfo.Hash, dm *downloadManager) {
pc, err := pp.NewPeerConnByDial(peer, id, infohash, time.Second*3)
if err != nil {
log.Printf("fail to dial '%s'", peer)
return
}
defer pc.Close()

dm.doing = false
pc.Timeout = time.Second * 10
if err = pc.Handshake(); err != nil {
log.Printf("fail to handshake with '%s': %s", peer, err)
return
}

info := dm.writer.Info()
bdh := downloader.NewBlockDownloadHandler(info, dm.OnBlock, dm.RequestBlock)
if err = bdh.OnHandShake(pc); err != nil {
log.Printf("handshake error with '%s': %s", peer, err)
return
}

var msg pp.Message
for !dm.IsFinished() {
switch msg, err = pc.ReadMsg(); err {
case nil:
switch err = pc.HandleMessage(msg, bdh); err {
case nil, pp.ErrChoked:
default:
log.Printf("fail to handle the msg from '%s': %s", peer, err)
return
}
case io.EOF:
log.Printf("got EOF from '%s'", peer)
return
default:
log.Printf("fail to read the msg from '%s': %s", peer, err)
return
}
}
}

func newDownloadManager(w metainfo.Writer, info metainfo.Info) *downloadManager {
length := info.Piece(0).Length()
return &downloadManager{writer: w, plength: length}
}

type downloadManager struct {
writer metainfo.Writer
pindex uint32
poffset uint32
plength int64
doing bool
}

func (dm *downloadManager) IsFinished() bool {
if dm.pindex >= uint32(dm.writer.Info().CountPieces()) {
return true
}
return false
}

func (dm *downloadManager) OnBlock(index, offset uint32, b []byte) (err error) {
if dm.pindex != index {
return fmt.Errorf("inconsistent piece: old=%d, new=%d", dm.pindex, index)
} else if dm.poffset != offset {
return fmt.Errorf("inconsistent offset for piece '%d': old=%d, new=%d",
index, dm.poffset, offset)
}

dm.doing = false
n, err := dm.writer.WriteBlock(index, offset, b)
if err == nil {
dm.poffset = offset + uint32(n)
dm.plength -= int64(n)
}
return
}

func (dm *downloadManager) RequestBlock(pc *pp.PeerConn) (err error) {
if dm.doing {
return
}

if dm.plength <= 0 {
dm.pindex++
if dm.IsFinished() {
return
}

dm.poffset = 0
dm.plength = dm.writer.Info().Piece(int(dm.pindex)).Length()
}

index := dm.pindex
begin := dm.poffset
length := uint32(downloader.BlockSize)
if length > uint32(dm.plength) {
length = uint32(dm.plength)
}

log.Printf("Request Block from '%s': index=%d, offset=%d, length=%d",
pc.RemoteAddr().String(), index, begin, length)
if err = pc.SendRequest(index, begin, length); err == nil {
dm.doing = true
}
return
}
```
See [godoc](https://pkg.go.dev/github.com/xgfone/bt) or [bttools](https://github.com/xgfone/bttools).
53 changes: 23 additions & 30 deletions downloader/block_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,62 +14,53 @@

package downloader

import (
"github.com/xgfone/bt/metainfo"
pp "github.com/xgfone/bt/peerprotocol"
)
import pp "github.com/xgfone/bt/peerprotocol"

// BlockDownloadHandler is used to downloads the files in the torrent file.
type BlockDownloadHandler struct {
pp.NoopHandler
pp.NoopBep3Handler
pp.NoopBep6Handler

Info metainfo.Info // Required
OnBlock func(index, offset uint32, b []byte) error // Required
RequestBlock func(c *pp.PeerConn) error // Required
OnBlock func(index, offset uint32, b []byte) error
ReqBlock func(c *pp.PeerConn) error
PieceNum int
}

// NewBlockDownloadHandler returns a new BlockDownloadHandler.
func NewBlockDownloadHandler(info metainfo.Info,
onBlock func(pieceIndex, pieceOffset uint32, b []byte) error,
requestBlock func(c *pp.PeerConn) error) BlockDownloadHandler {
func NewBlockDownloadHandler(pieceNum int, reqBlock func(c *pp.PeerConn) error,
onBlock func(pieceIndex, pieceOffset uint32, b []byte) error) BlockDownloadHandler {
return BlockDownloadHandler{
Info: info,
OnBlock: onBlock,
RequestBlock: requestBlock,
OnBlock: onBlock,
ReqBlock: reqBlock,
PieceNum: pieceNum,
}
}

// OnHandShake implements the interface Handler#OnHandShake.
//
// Notice: it uses the field Data to store the inner data, you mustn't override
// it.
func (fd BlockDownloadHandler) OnHandShake(c *pp.PeerConn) (err error) {
if err = c.SetUnchoked(); err == nil {
err = c.SetInterested()
}
return
}

/// ---------------------------------------------------------------------------
/// BEP 3

func (fd BlockDownloadHandler) request(pc *pp.PeerConn) (err error) {
if fd.ReqBlock == nil {
return nil
}

if pc.PeerChoked {
err = pp.ErrChoked
} else {
err = fd.RequestBlock(pc)
err = fd.ReqBlock(pc)
}
return
}

// Piece implements the interface Bep3Handler#Piece.
func (fd BlockDownloadHandler) Piece(c *pp.PeerConn, i, b uint32, p []byte) (err error) {
if err = fd.OnBlock(i, b, p); err == nil {
err = fd.request(c)
func (fd BlockDownloadHandler) Piece(c *pp.PeerConn, i, b uint32, p []byte) error {
if fd.OnBlock != nil {
if err := fd.OnBlock(i, b, p); err != nil {
return err
}
}
return
return fd.request(c)
}

// Unchoke implements the interface Bep3Handler#Unchoke.
Expand All @@ -88,7 +79,9 @@ func (fd BlockDownloadHandler) Have(pc *pp.PeerConn, index uint32) (err error) {

// HaveAll implements the interface Bep6Handler#HaveAll.
func (fd BlockDownloadHandler) HaveAll(pc *pp.PeerConn) (err error) {
pc.BitField = pp.NewBitField(fd.Info.CountPieces(), true)
if fd.PieceNum > 0 {
pc.BitField = pp.NewBitField(fd.PieceNum, true)
}
return
}

Expand Down
3 changes: 3 additions & 0 deletions metainfo/piece.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
"github.com/xgfone/bt/internal/helper"
)

// BlockSize is the default size of a piece block.
const BlockSize = 16 * 1024 // 2^14 = 16KB

// Predefine some sizes of the pieces.
const (
PieceSize256KB = 1024 * 256
Expand Down
2 changes: 1 addition & 1 deletion metainfo/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (w *writer) Close() error {
return nil
}

// WriteBlock writes a data block.
// WriteBlock writes a block data.
func (w *writer) WriteBlock(pieceIndex, pieceOffset uint32, p []byte) (int, error) {
return w.WriteAt(p, w.info.PieceOffset(pieceIndex, pieceOffset))
}
Expand Down
13 changes: 11 additions & 2 deletions peerprotocol/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,18 +125,27 @@ func (bf BitField) Unsets() (pieces Pieces) {
return
}

// CanSet reports whether the index can be set.
func (bf BitField) CanSet(index uint32) bool {
return (int(index) / 8) < len(bf)
}

// Set sets the bit of the piece to 1 by its index.
func (bf BitField) Set(index uint32) {
func (bf BitField) Set(index uint32) (ok bool) {
if i := int(index) / 8; i < len(bf) {
bf[i] |= (1 << byte(7-index%8))
ok = true
}
return
}

// Unset sets the bit of the piece to 0 by its index.
func (bf BitField) Unset(index uint32) {
func (bf BitField) Unset(index uint32) (ok bool) {
if i := int(index) / 8; i < len(bf) {
bf[i] &^= (1 << byte(7-index%8))
ok = true
}
return
}

// IsSet reports whether the bit of the piece is set to 1.
Expand Down
Loading

0 comments on commit fad4a74

Please sign in to comment.