Skip to content

Commit

Permalink
Handle connection to torrent peer status update messages (#4)
Browse files Browse the repository at this point in the history
* basic observer framework

* fleshing out the tracker status

* set up provision for onPeerConnUpdate function

* add provision for peer conn state update channel

* connects to a valid tracker

* added observer channel for announce status

* set up provision for onPeerConnUpdate function

* add provision for peer conn state update channel

* set up Peer Connection status Observers

* add missing Observer initialisation

* add PeerConn test: connection established

* add comment on torrent used for PeerConn testing

* merged PeerObserver commits, moved readChannelTimeout

* added failure case for AnnounceStatus observer

* added Observers factory method

* Added Event to AnnounceStatus, with embedded TrackerStatus

* AnnounceStatus details and error reporting

* state updates must be non-blocking

* json annotations and use matching infoHash string

* add unit tests on PeerConn Observer status reading

* refactor test by doing direct transfer locally

* add test and debug log on dropped connection

* add provision for test on PeerID

* remove unused OnPeerConnUpdate function

* change Err field in PeerStatus to string

The main use for this status is to send update messages through the WS,
in the form of JSON. Marshalling a JSON field into an error object
doesn't work in Go, so it's better to send the error message as string.
See golang/go#5161

* add PeerID check to test

* remove unused method

---------

Co-authored-by: Parker Whittle <pwhittle@medicom.us>
  • Loading branch information
marcovidonis and pwhittle-medicom authored Feb 9, 2024
1 parent e10ffd1 commit 9baf001
Show file tree
Hide file tree
Showing 9 changed files with 556 additions and 2 deletions.
231 changes: 231 additions & 0 deletions client-peerconn_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package torrent

import (
"io"
"os"
"testing"
"testing/iotest"
"time"

"github.com/anacrolix/missinggo/v2"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/torrent/internal/testutil"
"github.com/anacrolix/torrent/types"
"github.com/frankban/quicktest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
)

func TestPeerConnObserverReadStatusOk(t *testing.T) {
cfg := TestingConfig(t)
cfg.DisableTrackers = false
cfg.EstablishedConnsPerTorrent = 1
cfg.Observers = &Observers{
Peers: PeerObserver{
PeerStatus: make(chan PeerStatus),
},
}

c, _ := NewClient(cfg)
defer c.Close()

go func() {
cfg.Observers.Peers.PeerStatus <- PeerStatus{
Ok: true,
}
}()

status := readChannelTimeout(t, cfg.Observers.Peers.PeerStatus, 500*time.Millisecond).(PeerStatus)
require.True(t, status.Ok)
require.Equal(t, "", status.Err)
}

func TestPeerConnObserverReadStatusErr(t *testing.T) {
cfg := TestingConfig(t)
cfg.DisableTrackers = false
cfg.EstablishedConnsPerTorrent = 1
cfg.Observers = &Observers{
Peers: PeerObserver{
PeerStatus: make(chan PeerStatus),
},
}

c, _ := NewClient(cfg)
defer c.Close()

go func() {
cfg.Observers.Peers.PeerStatus <- PeerStatus{
Err: "test error",
}
}()

status := readChannelTimeout(t, cfg.Observers.Peers.PeerStatus, 500*time.Millisecond).(PeerStatus)
require.False(t, status.Ok)
require.Equal(t, status.Err, "test error")
}

func TestPeerConnEstablished(t *testing.T) {
obs := NewClientObservers()
ps := testClientTransferParams{
ConfigureSeeder: ConfigureClient{
Config: func(cfg *ClientConfig) {
cfg.PeerID = "12345123451234512345"
},
},
ConfigureLeecher: ConfigureClient{
Config: func(cfg *ClientConfig) {
// TODO one of UTP or TCP is needed for the transfer
// Does this mean we're not doing webtorrent? TBC
// cfg.DisableUTP = true
cfg.DisableTCP = true
cfg.Debug = false
cfg.DisableTrackers = true
cfg.EstablishedConnsPerTorrent = 1
cfg.Observers = obs
},
},
}

go testClientTransfer(t, ps)

status := readChannelTimeout(t, obs.Peers.PeerStatus, 500*time.Millisecond).(PeerStatus)
var expectedPeerId types.PeerID
missinggo.CopyExact(&expectedPeerId, "12345123451234512345")
require.Equal(t, expectedPeerId, status.Id)
require.True(t, status.Ok)
require.Equal(t, "", status.Err)

// Peer conn is dropped after transfer is finished. This is the next update we receive.
status = readChannelTimeout(t, obs.Peers.PeerStatus, 500*time.Millisecond).(PeerStatus)
require.Equal(t, expectedPeerId, status.Id)
require.False(t, status.Ok)
require.Equal(t, "", status.Err)
}

type ConfigureClient struct {
Config func(cfg *ClientConfig)
Client func(cl *Client)
}

type testClientTransferParams struct {
SeederUploadRateLimiter *rate.Limiter
LeecherDownloadRateLimiter *rate.Limiter
ConfigureSeeder ConfigureClient
ConfigureLeecher ConfigureClient

LeecherStartsWithoutMetadata bool
}

// Simplified version of testClientTransfer found in test/leecher-storage.go.
// Could not import and reuse that function due to circular dependencies between modules.
func testClientTransfer(t *testing.T, ps testClientTransferParams) {
greetingTempDir, mi := testutil.GreetingTestTorrent()
defer os.RemoveAll(greetingTempDir)
// Create seeder and a Torrent.
cfg := TestingConfig(t)
cfg.Seed = true
// Some test instances don't like this being on, even when there's no cache involved.
cfg.DropMutuallyCompletePeers = false
if ps.SeederUploadRateLimiter != nil {
cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
}
cfg.DataDir = greetingTempDir
if ps.ConfigureSeeder.Config != nil {
ps.ConfigureSeeder.Config(cfg)
}
seeder, err := NewClient(cfg)
require.NoError(t, err)
if ps.ConfigureSeeder.Client != nil {
ps.ConfigureSeeder.Client(seeder)
}
seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
defer seeder.Close()
<-seederTorrent.Complete.On()

// Create leecher and a Torrent.
leecherDataDir := t.TempDir()
cfg = TestingConfig(t)
// See the seeder client config comment.
cfg.DropMutuallyCompletePeers = false
cfg.DataDir = leecherDataDir
if ps.LeecherDownloadRateLimiter != nil {
cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
}
cfg.Seed = false
if ps.ConfigureLeecher.Config != nil {
ps.ConfigureLeecher.Config(cfg)
}
leecher, err := NewClient(cfg)
require.NoError(t, err)
defer leecher.Close()
if ps.ConfigureLeecher.Client != nil {
ps.ConfigureLeecher.Client(leecher)
}
leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
ret = TorrentSpecFromMetaInfo(mi)
ret.ChunkSize = 2
if ps.LeecherStartsWithoutMetadata {
ret.InfoBytes = nil
}
return
}())
require.NoError(t, err)
assert.False(t, leecherTorrent.Complete.Bool())
assert.True(t, new)

added := leecherTorrent.AddClientPeer(seeder)
assert.False(t, leecherTorrent.Seeding())
// The leecher will use peers immediately if it doesn't have the metadata. Otherwise, they
// should be sitting idle until we demand data.
if !ps.LeecherStartsWithoutMetadata {
assert.EqualValues(t, added, leecherTorrent.Stats().PendingPeers)
}
if ps.LeecherStartsWithoutMetadata {
<-leecherTorrent.GotInfo()
}
r := leecherTorrent.NewReader()
defer r.Close()
go leecherTorrent.SetInfoBytes(mi.InfoBytes)

assertReadAllGreeting(t, r)
<-leecherTorrent.Complete.On()
assert.NotEmpty(t, seederTorrent.PeerConns())
leecherPeerConns := leecherTorrent.PeerConns()
if cfg.DropMutuallyCompletePeers {
// I don't think we can assume it will be empty already, due to timing.
// assert.Empty(t, leecherPeerConns)
} else {
assert.NotEmpty(t, leecherPeerConns)
}
foundSeeder := false
for _, pc := range leecherPeerConns {
completed := pc.PeerPieces().GetCardinality()
t.Logf("peer conn %v has %v completed pieces", pc, completed)
if completed == bitmap.BitRange(leecherTorrent.Info().NumPieces()) {
foundSeeder = true
}
}
if !foundSeeder {
t.Errorf("didn't find seeder amongst leecher peer conns")
}

seederStats := seederTorrent.Stats()
assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
assert.True(t, 8 <= seederStats.ChunksWritten.Int64())

leecherStats := leecherTorrent.Stats()
assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
assert.True(t, 8 <= leecherStats.ChunksRead.Int64())

// Try reading through again for the cases where the torrent data size
// exceeds the size of the cache.
assertReadAllGreeting(t, r)
}

func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
pos, err := r.Seek(0, io.SeekStart)
assert.NoError(t, err)
assert.EqualValues(t, 0, pos)
quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil)
}
166 changes: 166 additions & 0 deletions client-tracker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package torrent

import (
"errors"
"github.com/anacrolix/torrent/internal/testutil"
"github.com/anacrolix/torrent/tracker"
"github.com/anacrolix/torrent/webtorrent"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/require"
"net"
"net/http"
"net/http/httptest"
"os"
"strings"
"testing"
"time"
)

func TestClientInvalidTracker(t *testing.T) {
cfg := TestingConfig(t)
cfg.DisableTrackers = false
cfg.Observers = NewClientObservers()

cl, err := NewClient(cfg)
require.NoError(t, err)
defer cl.Close()

dir, mi := testutil.GreetingTestTorrent()
defer os.RemoveAll(dir)

mi.AnnounceList = [][]string{
{"ws://test.invalid:4242"},
}

to, err := cl.AddTorrent(mi)
require.NoError(t, err)

status := readChannelTimeout(t, cfg.Observers.Trackers.ConnStatus, 500*time.Millisecond).(webtorrent.TrackerStatus)
require.Equal(t, "ws://test.invalid:4242", status.Url)
var expected *net.OpError
require.ErrorAs(t, expected, &status.Err)

to.Drop()
}

var upgrader = websocket.Upgrader{}

func testtracker(w http.ResponseWriter, r *http.Request) {
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
defer c.Close()
for {
_, _, err := c.ReadMessage()
if err != nil {
break
}
//err = c.WriteMessage(mt, message)
//if err != nil {
// break
//}
}
}

func TestClientValidTrackerConn(t *testing.T) {
s, trackerUrl := startTestTracker()
defer s.Close()

cfg := TestingConfig(t)
cfg.DisableTrackers = false
cfg.Observers = NewClientObservers()

cl, err := NewClient(cfg)
require.NoError(t, err)
defer cl.Close()

dir, mi := testutil.GreetingTestTorrent()
defer os.RemoveAll(dir)

mi.AnnounceList = [][]string{
{trackerUrl},
}

to, err := cl.AddTorrent(mi)
require.NoError(t, err)

status := readChannelTimeout(t, cfg.Observers.Trackers.ConnStatus, 500*time.Millisecond).(webtorrent.TrackerStatus)
require.Equal(t, trackerUrl, status.Url)
require.True(t, status.Ok)
require.Nil(t, status.Err)

to.Drop()
}

func TestClientAnnounceFailure(t *testing.T) {
s, trackerUrl := startTestTracker()
defer s.Close()

cfg := TestingConfig(t)
cfg.DisableTrackers = false
cfg.Observers = NewClientObservers()

cl, err := NewClient(cfg)
require.NoError(t, err)
defer cl.Close()

cl.websocketTrackers.GetAnnounceRequest = func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
return tracker.AnnounceRequest{}, errors.New("test error")
}

dir, mi := testutil.GreetingTestTorrent()
defer os.RemoveAll(dir)

mi.AnnounceList = [][]string{
{trackerUrl},
}

to, err := cl.AddTorrent(mi)
require.NoError(t, err)

status := readChannelTimeout(t, cfg.Observers.Trackers.AnnounceStatus, 500*time.Millisecond).(webtorrent.AnnounceStatus)
require.Equal(t, trackerUrl, status.Url)
require.False(t, status.Ok)
require.EqualError(t, status.Err, "test error")
require.Empty(t, status.Event)

to.Drop()
}

func TestClientAnnounceSuccess(t *testing.T) {
s, trackerUrl := startTestTracker()
defer s.Close()

cfg := TestingConfig(t)
cfg.DisableTrackers = false
cfg.Observers = NewClientObservers()

cl, err := NewClient(cfg)
require.NoError(t, err)
defer cl.Close()

dir, mi := testutil.GreetingTestTorrent()
defer os.RemoveAll(dir)

mi.AnnounceList = [][]string{
{trackerUrl},
}

to, err := cl.AddTorrent(mi)
require.NoError(t, err)

status := readChannelTimeout(t, cfg.Observers.Trackers.AnnounceStatus, 500*time.Millisecond).(webtorrent.AnnounceStatus)
require.Equal(t, trackerUrl, status.Url)
require.True(t, status.Ok)
require.Nil(t, status.Err)
require.Equal(t, "started", status.Event)

to.Drop()
}

func startTestTracker() (*httptest.Server, string) {
s := httptest.NewServer(http.HandlerFunc(testtracker))
trackerUrl := "ws" + strings.TrimPrefix(s.URL, "http")
return s, trackerUrl
}
Loading

0 comments on commit 9baf001

Please sign in to comment.