Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use PiecePicker for arbitrary things #536

Merged
merged 1 commit into from
Jun 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions src/Benchmark/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ class TorrentInfo : ITorrentInfo
public string Name => "Name";
}

class TorrentData : ITorrentManagerInfo, IPieceRequesterData
class TorrentData : IPieceRequesterData
{
const int PieceCount = 500;

Expand All @@ -218,10 +218,10 @@ class TorrentData : ITorrentManagerInfo, IPieceRequesterData
int IPieceRequesterData.PieceCount => TorrentInfo.PieceCount ();
int IPieceRequesterData.PieceLength => TorrentInfo.PieceLength;

public IPeer CreatePeer ()
public Peer CreatePeer ()
=> new Peer (PieceCount);

int IPieceRequesterData.BlocksPerPiece (int piece)
int IPieceRequesterData.SegmentsPerPiece (int piece)
=> TorrentInfo.BlocksPerPiece (piece);

int IPieceRequesterData.ByteOffsetToPieceIndex (long byteOffset)
Expand All @@ -230,6 +230,25 @@ int IPieceRequesterData.ByteOffsetToPieceIndex (long byteOffset)
int IPieceRequesterData.BytesPerPiece (int piece)
=> TorrentInfo.BytesPerPiece (piece);

public void EnqueueRequest (IPeer peer, PieceSegment block)
{

}

public void EnqueueRequests (IPeer peer, Span<PieceSegment> blocks)
{

}

public void EnqueueCancellation (IPeer peer, PieceSegment segment)
{

}

public void EnqueueCancellations (IPeer peer, Span<PieceSegment> segments)
{

}
}

class Peer : IPeer
Expand Down Expand Up @@ -257,7 +276,7 @@ public Peer (int pieceCount)

readonly TorrentData Data;
readonly StandardPicker Picker;
readonly IPeer Requester;
readonly Peer Requester;
readonly Queue<PieceSegment> Requested;

public StandardPickerBenchmark ()
Expand Down
156 changes: 87 additions & 69 deletions src/MonoTorrent.Client/MonoTorrent.Client.Modes/MetadataMode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@


using System;
using System.Collections.Generic;
using System.IO;
using System.Security.Cryptography;
using System.Threading;
Expand All @@ -39,22 +40,71 @@
using MonoTorrent.Messages.Peer;
using MonoTorrent.Messages.Peer.FastPeer;
using MonoTorrent.Messages.Peer.Libtorrent;
using MonoTorrent.PiecePicking;

namespace MonoTorrent.Client.Modes
{
class MetadataMode : Mode
{
static readonly Logger logger = Logger.Create (nameof (MetadataMode));

BitField? bitField;
class MetadataData : IPieceRequesterData, IMessageEnqueuer
{
public IList<ITorrentManagerFile> Files => Array.Empty<ITorrentManagerFile> ();
public int PieceCount => 1;
public int PieceLength { get; }

int length;

public MetadataData (int size)
{
length = size;
PieceLength = (int) Math.Pow (2, Math.Ceiling (Math.Log (size, 2)) + 1);
}

public int BytesPerBlock(int pieceIndex, int blockIndex)
=> Math.Min (Constants.BlockSize, BytesPerPiece (pieceIndex) - blockIndex * Constants.BlockSize);

public int SegmentsPerPiece (int piece)
=> (length + Constants.BlockSize - 1) / Constants.BlockSize;

public int ByteOffsetToPieceIndex (long byteOffset)
=> 0;

public int BytesPerPiece (int piece)
=> length;

void IMessageEnqueuer.EnqueueRequest (IPeer peer, PieceSegment block)
{
var message = new LTMetadata (((PeerId) peer).ExtensionSupports, LTMetadata.MessageType.Request, block.BlockIndex);
((PeerId) peer).MessageQueue.Enqueue (message);
}

void IMessageEnqueuer.EnqueueRequests (IPeer peer, Span<PieceSegment> blocks)
{
foreach (var block in blocks)
((IMessageEnqueuer)this).EnqueueRequest (peer, block);
}

void IMessageEnqueuer.EnqueueCancellation (IPeer peer, PieceSegment segment)
{
// you can't cancel a request for metadata
}

void IMessageEnqueuer.EnqueueCancellations (IPeer peer, Span<PieceSegment> segments)
{
// you can't cancel a request for metadata
}
}

static readonly TimeSpan timeout = TimeSpan.FromSeconds (10);
PeerId? currentId;
string savePath;
DateTime requestTimeout;
bool stopWhenDone;

bool HasAnnounced { get; set; }
MemoryStream? Stream { get; set; }
MetadataData? RequesterData { get; set; }
IPieceRequester? Requester { get; set; }
byte[]? Stream { get; set; }

public override bool CanHashCheck => true;
public override TorrentState State => TorrentState.Metadata;
Expand All @@ -72,23 +122,23 @@ public MetadataMode (TorrentManager manager, DiskManager diskManager, Connection
this.stopWhenDone = stopWhenDone;
}

public override void HandlePeerDisconnected (PeerId id)
{
base.HandlePeerDisconnected (id);
if (Requester != null && RequesterData != null)
Requester.CancelRequests (id, 0, RequesterData.PieceCount);
}

public override void Tick (int counter)
{
if (!HasAnnounced) {
HasAnnounced = true;
SendAnnounces ();
}

//if one request have been sent and we have wait more than timeout
// request the next peer
if (requestTimeout < DateTime.Now) {
NextPeer ();

if (currentId != null && Stream != null) {
RequestNextNeededPiece (currentId);
}
}

foreach (PeerId id in Manager.Peers.ConnectedPeers)
if (id.SupportsLTMessages && id.ExtensionSupports.Supports (LTMetadata.Support.Name))
RequestNextNeededPiece (id);
}

async void SendAnnounces ()
Expand All @@ -104,60 +154,33 @@ await Task.WhenAll (
}
}

void NextPeer ()
{
bool flag = false;

foreach (PeerId id in Manager.Peers.ConnectedPeers) {
if (id.SupportsLTMessages && id.ExtensionSupports.Supports (LTMetadata.Support.Name)) {
if (id == currentId)
flag = true;
else if (flag) {
currentId = id;
return;
}
}
}
//second pass without removing the currentid and previous ones
foreach (PeerId id in Manager.Peers.ConnectedPeers) {
if (id.SupportsLTMessages && id.ExtensionSupports.Supports (LTMetadata.Support.Name)) {
currentId = id;
return;
}
}
currentId = null;
return;
}

protected override void HandleLtMetadataMessage (PeerId id, LTMetadata message)
{
base.HandleLtMetadataMessage (id, message);

if (Requester is null || RequesterData is null || Stream is null)
return;

switch (message.MetadataMessageType) {
case LTMetadata.MessageType.Data:
if (Stream is null || bitField is null)
throw new Exception ("Need extention handshake before ut_metadata message.");

// If we've already received everything successfully, do nothing!
if (bitField.AllTrue)
if (!Requester.ValidatePiece (id, new PieceSegment (0, message.Piece), out bool pieceComplete, out IList<IPeer> peersInvolved))
return;

Stream.Seek (message.Piece * LTMetadata.BlockSize, SeekOrigin.Begin);
Stream.Write (message.MetadataPiece, 0, message.MetadataPiece.Length);
bitField[message.Piece] = true;
if (bitField.AllTrue) {
message.MetadataPiece.CopyTo (Stream.AsMemory (message.Piece * LTMetadata.BlockSize));
if (pieceComplete) {
InfoHash? v1InfoHash;
InfoHash? v2InfoHash;
Stream.Position = 0;

using (SHA1 hasher = SHA1.Create ())
v1InfoHash = InfoHash.FromMemory (hasher.ComputeHash (Stream));

using (SHA256 hasher = SHA256.Create ())
v2InfoHash = InfoHash.FromMemory (hasher.ComputeHash (Stream));

if (!Manager.InfoHashes.Contains (v1InfoHash) && !Manager.InfoHashes.Contains (v2InfoHash)) {
bitField.SetAll (false);
// Do nothing. As the piece has been marked as 'complete' by the picker, the internal picker state has dropped all references to the piece.
// We'll automatically retry downloading all pieces now.
} else {
Stream.Position = 0;
BEncodedDictionary dict = new BEncodedDictionary ();
dict.Add ("info", BEncodedValue.Decode (Stream));

Expand All @@ -177,7 +200,11 @@ protected override void HandleLtMetadataMessage (PeerId id, LTMetadata message)
}
var rawData = dict.Encode ();
if (Torrent.TryLoad (rawData, out Torrent? t)) {
Requester = null;
RequesterData = null;

if (stopWhenDone) {
Manager.SetMetadata (t);
Manager.RaiseMetadataReceived (rawData);
return;
}
Expand All @@ -198,7 +225,8 @@ protected override void HandleLtMetadataMessage (PeerId id, LTMetadata message)
_ = Manager.StartAsync ();
Manager.RaiseMetadataReceived (rawData);
} else {
bitField.SetAll (false);
// Do nothing. As the piece has been marked as 'complete' by the picker, the internal picker state has dropped all references to the piece.
// We'll automatically retry downloading all pieces now.
}
}
}
Expand Down Expand Up @@ -243,19 +271,12 @@ protected override void HandleInterestedMessage (PeerId id, InterestedMessage me
// Nothing
}

int pieceToRequest;
void RequestNextNeededPiece (PeerId id)
{
if (bitField is null || bitField.AllTrue)
if (Requester is null )
return;

while (bitField[pieceToRequest % bitField.Length])
pieceToRequest++;

pieceToRequest = pieceToRequest % bitField.Length;
var m = new LTMetadata (id.ExtensionSupports, LTMetadata.MessageType.Request, pieceToRequest++);
id.MessageQueue.Enqueue (m);
requestTimeout = DateTime.Now.Add (timeout);
Requester.AddRequests (id, id.BitField, Array.Empty<ReadOnlyBitField> ());
}

protected override void AppendBitfieldMessage (PeerId id, MessageBundle bundle)
Expand Down Expand Up @@ -283,19 +304,16 @@ protected override void HandleExtendedHandshakeMessage (PeerId id, ExtendedHands
if (id.ExtensionSupports.Supports (LTMetadata.Support.Name)) {
var metadataSize = message.MetadataSize.GetValueOrDefault (0);
if (Stream == null && metadataSize > 0) {
Stream = new MemoryStream (new byte[metadataSize], 0, metadataSize, true, true);
int size = metadataSize % LTMetadata.BlockSize;
if (size > 0)
size = 1;
size += metadataSize / LTMetadata.BlockSize;
bitField = new BitField (size);
Stream = new byte[metadataSize];
Requester = Manager.Engine!.Factories.CreatePieceRequester (new PieceRequesterSettings (false, false, false, ignoreBitFieldAndChokeState: true));
RequesterData = new MetadataData (metadataSize);
Requester.Initialise (RequesterData, RequesterData, Array.Empty<ReadOnlyBitField> ());
}

// We only create the Stream if the remote peer has sent the metadata size key in their handshake.
// There's no guarantee the remote peer has the metadata yet, so even though they support metadata
// mode they might not be able to share the data.
if (Stream != null)
RequestNextNeededPiece (id);
RequestNextNeededPiece (id);
}
}

Expand Down
Loading