From 9baf001e60630e39dfaf82a87ebabe2daa402ca2 Mon Sep 17 00:00:00 2001 From: Marco Vidonis <31407403+marcovidonis@users.noreply.github.com> Date: Fri, 9 Feb 2024 17:01:03 +0000 Subject: [PATCH] Handle connection to torrent peer status update messages (#4) * basic observer framework * fleshing out the tracker status * set up provision for onPeerConnUpdate function * add provision for peer conn state update channel * connects to a valid tracker * added observer channel for announce status * set up provision for onPeerConnUpdate function * add provision for peer conn state update channel * set up Peer Connection status Observers * add missing Observer initialisation * add PeerConn test: connection established * add comment on torrent used for PeerConn testing * merged PeerObserver commits, moved readChannelTimeout * added failure case for AnnounceStatus observer * added Observers factory method * Added Event to AnnounceStatus, with embedded TrackerStatus * AnnounceStatus details and error reporting * state updates must be non-blocking * json annotations and use matching infoHash string * add unit tests on PeerConn Observer status reading * refactor test by doing direct transfer locally * add test and debug log on dropped connection * add provision for test on PeerID * remove unused OnPeerConnUpdate function * change Err field in PeerStatus to string The main use for this status is to send update messages through the WS, in the form of JSON. Marshalling a JSON field into an error object doesn't work in Go, so it's better to send the error message as string. See https://github.com/golang/go/issues/5161 * add PeerID check to test * remove unused method --------- Co-authored-by: Parker Whittle --- client-peerconn_test.go | 231 +++++++++++++++++++++++++++++++++++ client-tracker_test.go | 166 +++++++++++++++++++++++++ client.go | 27 +++- config.go | 20 +++ peerconn.go | 21 ++++ testing.go | 11 ++ torrent.go | 14 +++ webtorrent/tracker-client.go | 64 ++++++++++ wstracker.go | 4 +- 9 files changed, 556 insertions(+), 2 deletions(-) create mode 100644 client-peerconn_test.go create mode 100644 client-tracker_test.go diff --git a/client-peerconn_test.go b/client-peerconn_test.go new file mode 100644 index 0000000000..3c27bdadb1 --- /dev/null +++ b/client-peerconn_test.go @@ -0,0 +1,231 @@ +package torrent + +import ( + "io" + "os" + "testing" + "testing/iotest" + "time" + + "github.com/anacrolix/missinggo/v2" + "github.com/anacrolix/missinggo/v2/bitmap" + "github.com/anacrolix/torrent/internal/testutil" + "github.com/anacrolix/torrent/types" + "github.com/frankban/quicktest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/time/rate" +) + +func TestPeerConnObserverReadStatusOk(t *testing.T) { + cfg := TestingConfig(t) + cfg.DisableTrackers = false + cfg.EstablishedConnsPerTorrent = 1 + cfg.Observers = &Observers{ + Peers: PeerObserver{ + PeerStatus: make(chan PeerStatus), + }, + } + + c, _ := NewClient(cfg) + defer c.Close() + + go func() { + cfg.Observers.Peers.PeerStatus <- PeerStatus{ + Ok: true, + } + }() + + status := readChannelTimeout(t, cfg.Observers.Peers.PeerStatus, 500*time.Millisecond).(PeerStatus) + require.True(t, status.Ok) + require.Equal(t, "", status.Err) +} + +func TestPeerConnObserverReadStatusErr(t *testing.T) { + cfg := TestingConfig(t) + cfg.DisableTrackers = false + cfg.EstablishedConnsPerTorrent = 1 + cfg.Observers = &Observers{ + Peers: PeerObserver{ + PeerStatus: make(chan PeerStatus), + }, + } + + c, _ := NewClient(cfg) + defer c.Close() + + go func() { + cfg.Observers.Peers.PeerStatus <- PeerStatus{ + Err: "test error", + } + }() + + status := readChannelTimeout(t, cfg.Observers.Peers.PeerStatus, 500*time.Millisecond).(PeerStatus) + require.False(t, status.Ok) + require.Equal(t, status.Err, "test error") +} + +func TestPeerConnEstablished(t *testing.T) { + obs := NewClientObservers() + ps := testClientTransferParams{ + ConfigureSeeder: ConfigureClient{ + Config: func(cfg *ClientConfig) { + cfg.PeerID = "12345123451234512345" + }, + }, + ConfigureLeecher: ConfigureClient{ + Config: func(cfg *ClientConfig) { + // TODO one of UTP or TCP is needed for the transfer + // Does this mean we're not doing webtorrent? TBC + // cfg.DisableUTP = true + cfg.DisableTCP = true + cfg.Debug = false + cfg.DisableTrackers = true + cfg.EstablishedConnsPerTorrent = 1 + cfg.Observers = obs + }, + }, + } + + go testClientTransfer(t, ps) + + status := readChannelTimeout(t, obs.Peers.PeerStatus, 500*time.Millisecond).(PeerStatus) + var expectedPeerId types.PeerID + missinggo.CopyExact(&expectedPeerId, "12345123451234512345") + require.Equal(t, expectedPeerId, status.Id) + require.True(t, status.Ok) + require.Equal(t, "", status.Err) + + // Peer conn is dropped after transfer is finished. This is the next update we receive. + status = readChannelTimeout(t, obs.Peers.PeerStatus, 500*time.Millisecond).(PeerStatus) + require.Equal(t, expectedPeerId, status.Id) + require.False(t, status.Ok) + require.Equal(t, "", status.Err) +} + +type ConfigureClient struct { + Config func(cfg *ClientConfig) + Client func(cl *Client) +} + +type testClientTransferParams struct { + SeederUploadRateLimiter *rate.Limiter + LeecherDownloadRateLimiter *rate.Limiter + ConfigureSeeder ConfigureClient + ConfigureLeecher ConfigureClient + + LeecherStartsWithoutMetadata bool +} + +// Simplified version of testClientTransfer found in test/leecher-storage.go. +// Could not import and reuse that function due to circular dependencies between modules. +func testClientTransfer(t *testing.T, ps testClientTransferParams) { + greetingTempDir, mi := testutil.GreetingTestTorrent() + defer os.RemoveAll(greetingTempDir) + // Create seeder and a Torrent. + cfg := TestingConfig(t) + cfg.Seed = true + // Some test instances don't like this being on, even when there's no cache involved. + cfg.DropMutuallyCompletePeers = false + if ps.SeederUploadRateLimiter != nil { + cfg.UploadRateLimiter = ps.SeederUploadRateLimiter + } + cfg.DataDir = greetingTempDir + if ps.ConfigureSeeder.Config != nil { + ps.ConfigureSeeder.Config(cfg) + } + seeder, err := NewClient(cfg) + require.NoError(t, err) + if ps.ConfigureSeeder.Client != nil { + ps.ConfigureSeeder.Client(seeder) + } + seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi)) + defer seeder.Close() + <-seederTorrent.Complete.On() + + // Create leecher and a Torrent. + leecherDataDir := t.TempDir() + cfg = TestingConfig(t) + // See the seeder client config comment. + cfg.DropMutuallyCompletePeers = false + cfg.DataDir = leecherDataDir + if ps.LeecherDownloadRateLimiter != nil { + cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter + } + cfg.Seed = false + if ps.ConfigureLeecher.Config != nil { + ps.ConfigureLeecher.Config(cfg) + } + leecher, err := NewClient(cfg) + require.NoError(t, err) + defer leecher.Close() + if ps.ConfigureLeecher.Client != nil { + ps.ConfigureLeecher.Client(leecher) + } + leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) { + ret = TorrentSpecFromMetaInfo(mi) + ret.ChunkSize = 2 + if ps.LeecherStartsWithoutMetadata { + ret.InfoBytes = nil + } + return + }()) + require.NoError(t, err) + assert.False(t, leecherTorrent.Complete.Bool()) + assert.True(t, new) + + added := leecherTorrent.AddClientPeer(seeder) + assert.False(t, leecherTorrent.Seeding()) + // The leecher will use peers immediately if it doesn't have the metadata. Otherwise, they + // should be sitting idle until we demand data. + if !ps.LeecherStartsWithoutMetadata { + assert.EqualValues(t, added, leecherTorrent.Stats().PendingPeers) + } + if ps.LeecherStartsWithoutMetadata { + <-leecherTorrent.GotInfo() + } + r := leecherTorrent.NewReader() + defer r.Close() + go leecherTorrent.SetInfoBytes(mi.InfoBytes) + + assertReadAllGreeting(t, r) + <-leecherTorrent.Complete.On() + assert.NotEmpty(t, seederTorrent.PeerConns()) + leecherPeerConns := leecherTorrent.PeerConns() + if cfg.DropMutuallyCompletePeers { + // I don't think we can assume it will be empty already, due to timing. + // assert.Empty(t, leecherPeerConns) + } else { + assert.NotEmpty(t, leecherPeerConns) + } + foundSeeder := false + for _, pc := range leecherPeerConns { + completed := pc.PeerPieces().GetCardinality() + t.Logf("peer conn %v has %v completed pieces", pc, completed) + if completed == bitmap.BitRange(leecherTorrent.Info().NumPieces()) { + foundSeeder = true + } + } + if !foundSeeder { + t.Errorf("didn't find seeder amongst leecher peer conns") + } + + seederStats := seederTorrent.Stats() + assert.True(t, 13 <= seederStats.BytesWrittenData.Int64()) + assert.True(t, 8 <= seederStats.ChunksWritten.Int64()) + + leecherStats := leecherTorrent.Stats() + assert.True(t, 13 <= leecherStats.BytesReadData.Int64()) + assert.True(t, 8 <= leecherStats.ChunksRead.Int64()) + + // Try reading through again for the cases where the torrent data size + // exceeds the size of the cache. + assertReadAllGreeting(t, r) +} + +func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) { + pos, err := r.Seek(0, io.SeekStart) + assert.NoError(t, err) + assert.EqualValues(t, 0, pos) + quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil) +} diff --git a/client-tracker_test.go b/client-tracker_test.go new file mode 100644 index 0000000000..a75e861a25 --- /dev/null +++ b/client-tracker_test.go @@ -0,0 +1,166 @@ +package torrent + +import ( + "errors" + "github.com/anacrolix/torrent/internal/testutil" + "github.com/anacrolix/torrent/tracker" + "github.com/anacrolix/torrent/webtorrent" + "github.com/gorilla/websocket" + "github.com/stretchr/testify/require" + "net" + "net/http" + "net/http/httptest" + "os" + "strings" + "testing" + "time" +) + +func TestClientInvalidTracker(t *testing.T) { + cfg := TestingConfig(t) + cfg.DisableTrackers = false + cfg.Observers = NewClientObservers() + + cl, err := NewClient(cfg) + require.NoError(t, err) + defer cl.Close() + + dir, mi := testutil.GreetingTestTorrent() + defer os.RemoveAll(dir) + + mi.AnnounceList = [][]string{ + {"ws://test.invalid:4242"}, + } + + to, err := cl.AddTorrent(mi) + require.NoError(t, err) + + status := readChannelTimeout(t, cfg.Observers.Trackers.ConnStatus, 500*time.Millisecond).(webtorrent.TrackerStatus) + require.Equal(t, "ws://test.invalid:4242", status.Url) + var expected *net.OpError + require.ErrorAs(t, expected, &status.Err) + + to.Drop() +} + +var upgrader = websocket.Upgrader{} + +func testtracker(w http.ResponseWriter, r *http.Request) { + c, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + defer c.Close() + for { + _, _, err := c.ReadMessage() + if err != nil { + break + } + //err = c.WriteMessage(mt, message) + //if err != nil { + // break + //} + } +} + +func TestClientValidTrackerConn(t *testing.T) { + s, trackerUrl := startTestTracker() + defer s.Close() + + cfg := TestingConfig(t) + cfg.DisableTrackers = false + cfg.Observers = NewClientObservers() + + cl, err := NewClient(cfg) + require.NoError(t, err) + defer cl.Close() + + dir, mi := testutil.GreetingTestTorrent() + defer os.RemoveAll(dir) + + mi.AnnounceList = [][]string{ + {trackerUrl}, + } + + to, err := cl.AddTorrent(mi) + require.NoError(t, err) + + status := readChannelTimeout(t, cfg.Observers.Trackers.ConnStatus, 500*time.Millisecond).(webtorrent.TrackerStatus) + require.Equal(t, trackerUrl, status.Url) + require.True(t, status.Ok) + require.Nil(t, status.Err) + + to.Drop() +} + +func TestClientAnnounceFailure(t *testing.T) { + s, trackerUrl := startTestTracker() + defer s.Close() + + cfg := TestingConfig(t) + cfg.DisableTrackers = false + cfg.Observers = NewClientObservers() + + cl, err := NewClient(cfg) + require.NoError(t, err) + defer cl.Close() + + cl.websocketTrackers.GetAnnounceRequest = func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) { + return tracker.AnnounceRequest{}, errors.New("test error") + } + + dir, mi := testutil.GreetingTestTorrent() + defer os.RemoveAll(dir) + + mi.AnnounceList = [][]string{ + {trackerUrl}, + } + + to, err := cl.AddTorrent(mi) + require.NoError(t, err) + + status := readChannelTimeout(t, cfg.Observers.Trackers.AnnounceStatus, 500*time.Millisecond).(webtorrent.AnnounceStatus) + require.Equal(t, trackerUrl, status.Url) + require.False(t, status.Ok) + require.EqualError(t, status.Err, "test error") + require.Empty(t, status.Event) + + to.Drop() +} + +func TestClientAnnounceSuccess(t *testing.T) { + s, trackerUrl := startTestTracker() + defer s.Close() + + cfg := TestingConfig(t) + cfg.DisableTrackers = false + cfg.Observers = NewClientObservers() + + cl, err := NewClient(cfg) + require.NoError(t, err) + defer cl.Close() + + dir, mi := testutil.GreetingTestTorrent() + defer os.RemoveAll(dir) + + mi.AnnounceList = [][]string{ + {trackerUrl}, + } + + to, err := cl.AddTorrent(mi) + require.NoError(t, err) + + status := readChannelTimeout(t, cfg.Observers.Trackers.AnnounceStatus, 500*time.Millisecond).(webtorrent.AnnounceStatus) + require.Equal(t, trackerUrl, status.Url) + require.True(t, status.Ok) + require.Nil(t, status.Err) + require.Equal(t, "started", status.Event) + + to.Drop() +} + +func startTestTracker() (*httptest.Server, string) { + s := httptest.NewServer(http.HandlerFunc(testtracker)) + trackerUrl := "ws" + strings.TrimPrefix(s.URL, "http") + return s, trackerUrl +} diff --git a/client.go b/client.go index 1ea1d61048..77f60fd6a8 100644 --- a/client.go +++ b/client.go @@ -19,6 +19,8 @@ import ( "strconv" "time" + "github.com/anacrolix/torrent/webtorrent" + "github.com/anacrolix/chansync" "github.com/anacrolix/chansync/events" "github.com/anacrolix/dht/v2" @@ -48,7 +50,6 @@ import ( "github.com/anacrolix/torrent/storage" "github.com/anacrolix/torrent/tracker" "github.com/anacrolix/torrent/types/infohash" - "github.com/anacrolix/torrent/webtorrent" ) // Clients contain zero or more Torrents. A Client manages a blocklist, the @@ -290,7 +291,12 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) { } } + var obs *webtorrent.TrackerObserver + if cl.config.Observers != nil { + obs = &cl.config.Observers.Trackers + } cl.websocketTrackers = websocketTrackers{ + obs: obs, PeerId: cl.peerID, Logger: cl.logger, GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) { @@ -719,6 +725,11 @@ func doProtocolHandshakeOnDialResult( cl := t.cl nc := dr.Conn addrIpPort, _ := tryIpPortFromNetAddr(addr) + + var obs *PeerObserver + if t.cl.config.Observers != nil { + obs = &t.cl.config.Observers.Peers + } c, err = cl.initiateProtocolHandshakes( context.Background(), nc, t, obfuscatedHeader, newConnectionOpts{ @@ -728,6 +739,7 @@ func doProtocolHandshakeOnDialResult( localPublicAddr: cl.publicAddr(addrIpPort.IP), network: dr.Dialer.DialerNetwork(), connString: regularNetConnPeerConnConnString(nc), + obs: obs, }) if err != nil { nc.Close() @@ -1080,8 +1092,19 @@ func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error { c.startMessageWriter() cl.sendInitialMessages(c, t) c.initUpdateRequestsTimer() + + c.UpdatePeerConnStatus(PeerStatus{ + Id: c.PeerID, + Ok: true, + }) + err := c.mainReadLoop() if err != nil { + c.UpdatePeerConnStatus(PeerStatus{ + Id: c.PeerID, + Ok: false, + Err: fmt.Sprintf("%s", err), + }) return fmt.Errorf("main read loop: %w", err) } return nil @@ -1568,6 +1591,7 @@ type newConnectionOpts struct { localPublicAddr peerLocalPublicAddr network string connString string + obs *PeerObserver } func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerConn) { @@ -1588,6 +1612,7 @@ func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerCon }, connString: opts.connString, conn: nc, + Observers: opts.obs, } c.peerRequestDataAllocLimiter.Max = cl.config.MaxAllocPeerRequestDataPerConn c.initRequestState() diff --git a/config.go b/config.go index e2d0ea1ebe..aacf0fd3ba 100644 --- a/config.go +++ b/config.go @@ -7,6 +7,8 @@ import ( "net/url" "time" + "github.com/anacrolix/torrent/webtorrent" + "github.com/anacrolix/dht/v2" "github.com/anacrolix/dht/v2/krpc" "github.com/anacrolix/log" @@ -19,6 +21,23 @@ import ( "github.com/anacrolix/torrent/version" ) +type Observers struct { + Trackers webtorrent.TrackerObserver + Peers PeerObserver +} + +func NewClientObservers() *Observers { + return &Observers{ + Trackers: webtorrent.TrackerObserver{ + ConnStatus: make(chan webtorrent.TrackerStatus), + AnnounceStatus: make(chan webtorrent.AnnounceStatus), + }, + Peers: PeerObserver{ + PeerStatus: make(chan PeerStatus), + }, + } +} + // Contains config elements that are exclusive to tracker handling. There may be other fields in // ClientConfig that are also relevant. type ClientTrackerConfig struct { @@ -31,6 +50,7 @@ type ClientTrackerConfig struct { // Takes a tracker's hostname and requests DNS A and AAAA records. // Used in case DNS lookups require a special setup (i.e., dns-over-https) LookupTrackerIp func(*url.URL) ([]net.IP, error) + Observers *Observers } type ClientDhtConfig struct { diff --git a/peerconn.go b/peerconn.go index e2d944ff26..6df295777b 100644 --- a/peerconn.go +++ b/peerconn.go @@ -32,6 +32,16 @@ import ( utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch" ) +type PeerStatus struct { + Id PeerID + Ok bool + Err string // see https://github.com/golang/go/issues/5161 +} + +type PeerObserver struct { + PeerStatus chan PeerStatus +} + // Maintains the state of a BitTorrent-protocol based connection with a peer. type PeerConn struct { Peer @@ -69,6 +79,8 @@ type PeerConn struct { peerRequestDataAllocLimiter alloclim.Limiter outstandingHolepunchingRendezvous map[netip.AddrPort]struct{} + + Observers *PeerObserver } func (cn *PeerConn) pexStatus() string { @@ -1129,3 +1141,12 @@ func (c *PeerConn) useful() bool { } return false } + +func (c *PeerConn) UpdatePeerConnStatus(status PeerStatus) { + if c.Observers != nil { + select { + case c.Observers.PeerStatus <- status: + default: + } + } +} diff --git a/testing.go b/testing.go index 6fb5411267..24f4adfabf 100644 --- a/testing.go +++ b/testing.go @@ -1,6 +1,7 @@ package torrent import ( + "github.com/stretchr/testify/require" "testing" "time" @@ -35,3 +36,13 @@ func TestingConfig(t testing.TB) *ClientConfig { //}) return cfg } + +func readChannelTimeout[T any](t *testing.T, channel chan T, duration time.Duration) interface{} { + select { + case s := <-channel: + return s + case <-time.After(duration): + require.Fail(t, "Timeout reading observer channel.") + } + return nil +} diff --git a/torrent.go b/torrent.go index 6385a3fc02..7b541f2d9b 100644 --- a/torrent.go +++ b/torrent.go @@ -1592,6 +1592,13 @@ func (t *Torrent) assertPendingRequests() { func (t *Torrent) dropConnection(c *PeerConn) { t.cl.event.Broadcast() c.close() + + c.UpdatePeerConnStatus(PeerStatus{ + Id: c.PeerID, + Ok: false, + }) + t.logger.WithDefaultLevel(log.Debug).Printf("dropping connection to %+q, sent peerconn update", c.PeerID) + if t.deletePeerConn(c) { t.openNewConns() } @@ -1652,6 +1659,12 @@ func (t *Torrent) onWebRtcConn( return } localAddrIpPort := missinggo.IpPortFromNetAddr(netConn.LocalAddr()) + + var obs *PeerObserver + if t.cl.config.Observers != nil { + obs = &t.cl.config.Observers.Peers + } + pc, err := t.cl.initiateProtocolHandshakes( context.Background(), netConn, @@ -1663,6 +1676,7 @@ func (t *Torrent) onWebRtcConn( localPublicAddr: localAddrIpPort, network: webrtcNetwork, connString: fmt.Sprintf("webrtc offer_id %x: %v", dcc.OfferId, regularNetConnPeerConnConnString(netConn)), + obs: obs, }, ) if err != nil { diff --git a/webtorrent/tracker-client.go b/webtorrent/tracker-client.go index bc9dab312e..7cdfc920f8 100644 --- a/webtorrent/tracker-client.go +++ b/webtorrent/tracker-client.go @@ -5,6 +5,7 @@ import ( "crypto/rand" "encoding/json" "fmt" + "github.com/anacrolix/torrent/types/infohash" "net/http" "sync" "time" @@ -19,6 +20,23 @@ import ( "github.com/anacrolix/torrent/tracker" ) +type TrackerStatus struct { + Url string `json:"url"` + Ok bool `json:"ok"` + Err error `json:"err"` +} + +type AnnounceStatus struct { + TrackerStatus + Event string `json:"event"` + InfoHash string `json:"info_hash"` +} + +type TrackerObserver struct { + ConnStatus chan TrackerStatus + AnnounceStatus chan AnnounceStatus +} + type TrackerClientStats struct { Dials int64 ConvertedInboundConns int64 @@ -33,6 +51,7 @@ type TrackerClient struct { OnConn onDataChannelOpen Logger log.Logger Dialer *websocket.Dialer + Observers *TrackerObserver mu sync.Mutex cond sync.Cond @@ -98,6 +117,7 @@ func (tc *TrackerClient) doWebsocket() error { c, _, err := tc.Dialer.Dial(tc.Url, header) if err != nil { + tc.updateTrackerConnStatus(TrackerStatus{tc.Url, false, err}) return fmt.Errorf("dialing tracker: %w", err) } defer c.Close() @@ -124,6 +144,7 @@ func (tc *TrackerClient) doWebsocket() error { } } }() + tc.updateTrackerConnStatus(TrackerStatus{tc.Url, true, nil}) err = tc.trackerReadLoop(tc.wsConn) close(closeChan) tc.mu.Lock() @@ -132,6 +153,24 @@ func (tc *TrackerClient) doWebsocket() error { return err } +func (tc *TrackerClient) updateTrackerConnStatus(status TrackerStatus) { + if tc.Observers != nil { + select { + case tc.Observers.ConnStatus <- status: + default: + } + } +} + +func (tc *TrackerClient) updateTrackerAnnounceStatus(status AnnounceStatus) { + if tc.Observers != nil { + select { + case tc.Observers.AnnounceStatus <- status: + default: + } + } +} + // Finishes initialization and spawns the run routine, calling onStop when it completes with the // result. We don't let the caller just spawn the runner directly, since then we can race against // .Close to finish initialization. @@ -254,6 +293,17 @@ func (tc *TrackerClient) Announce(event tracker.AnnounceEvent, infoHash [20]byte func (tc *TrackerClient) announce(event tracker.AnnounceEvent, infoHash [20]byte, offers []outboundOffer) error { request, err := tc.GetAnnounceRequest(event, infoHash) + if err != nil { + tc.updateTrackerAnnounceStatus(AnnounceStatus{ + TrackerStatus: TrackerStatus{ + Url: tc.Url, + Ok: false, + Err: err, + }, + Event: "", + InfoHash: infohash.HashBytes(infoHash[:]).HexString(), + }) + } if err != nil { return fmt.Errorf("getting announce parameters: %w", err) } @@ -275,6 +325,16 @@ func (tc *TrackerClient) announce(event tracker.AnnounceEvent, infoHash [20]byte }) } + announceStatus := AnnounceStatus{ + TrackerStatus: TrackerStatus{ + Url: tc.Url, + Ok: true, + Err: nil, + }, + Event: req.Event, + InfoHash: binaryToJsonString(infoHash[:]), + } + data, err := json.Marshal(req) if err != nil { return fmt.Errorf("marshalling request: %w", err) @@ -284,8 +344,12 @@ func (tc *TrackerClient) announce(event tracker.AnnounceEvent, infoHash [20]byte defer tc.mu.Unlock() err = tc.writeMessage(data) if err != nil { + announceStatus.Ok = false + announceStatus.Err = err + tc.updateTrackerAnnounceStatus(announceStatus) return fmt.Errorf("write AnnounceRequest: %w", err) } + tc.updateTrackerAnnounceStatus(announceStatus) for _, offer := range offers { g.MakeMapIfNilAndSet(&tc.outboundOffers, offer.offerId, offer.outboundOfferValue) } diff --git a/wstracker.go b/wstracker.go index 84af9cbfc1..4a4693a374 100644 --- a/wstracker.go +++ b/wstracker.go @@ -3,6 +3,7 @@ package torrent import ( "context" "fmt" + "github.com/anacrolix/torrent/webtorrent" "net" netHttp "net/http" "net/url" @@ -14,7 +15,6 @@ import ( "github.com/anacrolix/torrent/tracker" httpTracker "github.com/anacrolix/torrent/tracker/http" - "github.com/anacrolix/torrent/webtorrent" ) type websocketTrackerStatus struct { @@ -42,6 +42,7 @@ type websocketTrackers struct { OnConn func(datachannel.ReadWriteCloser, webtorrent.DataChannelContext) mu sync.Mutex clients map[string]*refCountedWebtorrentTrackerClient + obs *webtorrent.TrackerObserver Proxy httpTracker.ProxyFunc DialContext func(ctx context.Context, network, addr string) (net.Conn, error) WebsocketTrackerHttpHeader func() netHttp.Header @@ -56,6 +57,7 @@ func (me *websocketTrackers) Get(url string, infoHash [20]byte) (*webtorrent.Tra dialer := &websocket.Dialer{Proxy: me.Proxy, NetDialContext: me.DialContext, HandshakeTimeout: websocket.DefaultDialer.HandshakeTimeout} value = &refCountedWebtorrentTrackerClient{ TrackerClient: webtorrent.TrackerClient{ + Observers: me.obs, Dialer: dialer, Url: url, GetAnnounceRequest: me.GetAnnounceRequest,