Skip to content

Commit

Permalink
[core] Optimise TorrentCreator performance
Browse files Browse the repository at this point in the history
Use some fancier threading here to ensure disk IO happens in
parallel with hashing data.
  • Loading branch information
alanmcgovern committed Jul 3, 2022
1 parent 388aacf commit c539bc7
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 31 deletions.
29 changes: 23 additions & 6 deletions src/MonoTorrent.Client/MonoTorrent.Client/Managers/DiskManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -355,15 +355,32 @@ internal async ReusableTask<bool> GetHashAsync (ITorrentManagerInfo manager, int
// Note that 'startOffset' may not be the very start of the piece if we have a partial hash.
int startOffset = incrementalHash.NextOffsetToHash;
int endOffset = manager.TorrentInfo!.BytesPerPiece (pieceIndex);
using (BufferPool.Rent (Constants.BlockSize, out Memory<byte> hashBuffer)) {
using (BufferPool.Rent (Constants.BlockSize, out Memory<byte> readingBuffer))
using (BufferPool.Rent (Constants.BlockSize, out Memory<byte> hashingBuffer)) {
try {

int hashingCount = 0;
int readingCount = Math.Min (Constants.BlockSize, endOffset - startOffset);
var readingTask = ReadAsync (manager, new BlockInfo (pieceIndex, startOffset, readingCount), readingBuffer.Slice (0, readingCount));
while (startOffset != endOffset) {
int count = Math.Min (Constants.BlockSize, endOffset - startOffset);
if (!await ReadAsync (manager, new BlockInfo (pieceIndex, startOffset, count), hashBuffer.Slice (0, count)).ConfigureAwait (false))
// Wait for the reading task to complete.
if (!await readingTask.ConfigureAwait (false)) {
return false;
startOffset += count;
incrementalHash.AppendData (hashBuffer.Span.Slice (0, count));
} else {
// Great success. Increment 'startOffset' by the amount of data we just read
startOffset += readingCount;

// Now flip the buffers so we can asynchronously read the next block while we hash the current block.
(readingBuffer, hashingBuffer) = (hashingBuffer, readingBuffer);
(readingCount, hashingCount) = (hashingCount, readingCount);

// begin the next read with into the readbuffer
readingCount = Math.Min (Constants.BlockSize, endOffset - startOffset);
if (readingCount > 0)
readingTask = ReadAsync (manager, new BlockInfo (pieceIndex, startOffset, readingCount), readingBuffer.Slice (0, readingCount));
}

// While the prior async read is completing, hash the data
incrementalHash.AppendData (hashingBuffer.Span.Slice (0, hashingCount));
}
return incrementalHash.TryGetHashAndReset (dest);
} finally {
Expand Down
68 changes: 49 additions & 19 deletions src/MonoTorrent.Client/MonoTorrent/TorrentCreator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Security.Cryptography;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -131,12 +132,7 @@ public static int RecommendedPieceSize (IEnumerable<FileMapping> files)
/// </summary>
public bool StoreMD5 { get; set; }

/// <summary>
/// A SHA1 checksum will be generated for each file when this is set to <see langword="true"/>. This
/// value is not used to verify file data, but can be used to aid de-duplicating files across torrents.
/// Defaults to false.
/// </summary>
public bool StoreSHA1 { get; set; }
bool StoreSHA1 => Type == TorrentType.V1OnlyWithPaddingFiles || Type == TorrentType.V1V2Hybrid;

/// <summary>
/// Determines whether
Expand Down Expand Up @@ -284,8 +280,8 @@ internal async Task<BEncodedDictionary> CreateAsync (string name, ITorrentFileSo
if (merkleLayers.Count > 0) {
var dict = new BEncodedDictionary ();
foreach (var kvp in merkleLayers.Where (t => t.Key.StartPieceIndex != t.Key.EndPieceIndex)) {
var merkle = ReadOnlyMerkleLayers.FromLayer (PieceLength, kvp.Value.Span);
dict[BEncodedString.FromMemory (merkle.Root)] = BEncodedString.FromMemory (kvp.Value);
var rootHash = MerkleHash.Hash (kvp.Value.Span, PieceLength);
dict[BEncodedString.FromMemory (rootHash)] = BEncodedString.FromMemory (kvp.Value);
}
torrent["piece layers"] = dict;

Expand Down Expand Up @@ -314,9 +310,12 @@ void AppendFileTree (ITorrentManagerFile key, ReadOnlyMemory<byte> value, BEncod
}
fileTree = (BEncodedDictionary) inner;
}
if (value.Length > 32)
value = MerkleHash.Hash (value.Span, PieceLength);

var fileData = new BEncodedDictionary {
{"length", (BEncodedNumber) key.Length },
{ "pieces root", value.Length == 32 ? BEncodedString.FromMemory (value) : BEncodedString.FromMemory (ReadOnlyMerkleLayers.FromLayer (PieceLength, value.Span).Root) }
{ "pieces root", BEncodedString.FromMemory (value) }
};

fileTree.Add ("", fileData);
Expand Down Expand Up @@ -369,7 +368,7 @@ void AddCommonStuff (BEncodedDictionary torrent)
CalcPiecesHash (ITorrentManagerInfo manager, CancellationToken token)
{
var settings = new EngineSettingsBuilder {
DiskCacheBytes = PieceLength,
DiskCacheBytes = PieceLength * 2, // Store the currently processing piece and allow preping the next piece
DiskCachePolicy = StoreMD5 || StoreSHA1 ? CachePolicy.ReadsAndWrites : CachePolicy.WritesOnly
}.ToSettings ();

Expand All @@ -392,16 +391,34 @@ void AddCommonStuff (BEncodedDictionary torrent)
// All files will be merkle hashed into individual merkle trees
Memory<byte> merkleHashes = Type.HasV2 () ? new byte[pieceCount * 32] : Array.Empty<byte> ();

var hashes = new PieceHash (sha1Hashes.IsEmpty ? sha1Hashes : sha1Hashes.Slice (0, 20), merkleHashes.IsEmpty ? merkleHashes : merkleHashes.Slice (0, 32));
var currentPiece = diskManager.GetHashAsync (manager, 0, hashes).ConfigureAwait (false);
var previousAppend = ReusableTask.CompletedTask;
for (int piece = 0; piece < pieceCount; piece++) {
token.ThrowIfCancellationRequested ();
var hashes = new PieceHash (sha1Hashes.IsEmpty ? sha1Hashes : sha1Hashes.Slice (piece * 20, 20), merkleHashes.IsEmpty ? merkleHashes : merkleHashes.Slice (piece * 32, 32));
await diskManager.GetHashAsync (manager, piece, hashes);
for (int i = 0; i < torrentInfo.BlocksPerPiece (piece); i++) {
var buffer = reusableBlockBuffer.Slice (0, torrentInfo.BytesPerBlock (piece, i));
await diskManager.ReadAsync (manager, new BlockInfo (piece, i * Constants.BlockSize, buffer.Length), reusableBlockBuffer);
AppendPerFileHashes (manager, fileMD5, fileMD5Hashes, fileSHA1, fileSHA1Hashes, (long) torrentInfo.PieceLength * piece + i * Constants.BlockSize, buffer);
// Wait for the current piece's async read to complete. When this task completes, the V1 and/or V2 hash will
// be stored in the 'hashes' object
await currentPiece;

// Asynchronously begin reading the *next* piece and computing the hash for that piece.
var nextPiece = piece + 1;
if (nextPiece < pieceCount) {
hashes = new PieceHash (sha1Hashes.IsEmpty ? sha1Hashes : sha1Hashes.Slice (nextPiece * 20, 20), merkleHashes.IsEmpty ? merkleHashes : merkleHashes.Slice (nextPiece * 32, 32));
currentPiece = diskManager.GetHashAsync (manager, nextPiece, hashes).ConfigureAwait (false);
}

// While we're computing the hash for 'piece + 1', we can compute the MD5 and/or SHA1 for the specific file
// being hashed.
if (StoreMD5 || StoreSHA1) {
for (int i = 0; i < torrentInfo.BlocksPerPiece (piece); i++) {
var buffer = reusableBlockBuffer.Slice (0, torrentInfo.BytesPerBlock (piece, i));
await diskManager.ReadAsync (manager, new BlockInfo (piece, i * Constants.BlockSize, buffer.Length), reusableBlockBuffer).ConfigureAwait (false);
await AppendPerFileHashes (manager, fileMD5, fileMD5Hashes, fileSHA1, fileSHA1Hashes, (long) torrentInfo.PieceLength * piece + i * Constants.BlockSize, buffer).ConfigureAwait (false);
}
}
}
// Ensure the final block from the final piece is hashed.
await previousAppend;

var merkleLayers = new Dictionary<ITorrentManagerFile, ReadOnlyMemory<byte>> ();
if (merkleHashes.Length > 0) {
Expand All @@ -412,7 +429,7 @@ void AddCommonStuff (BEncodedDictionary torrent)
return (sha1Hashes, merkleLayers, fileSHA1Hashes, fileMD5Hashes);
}

void AppendPerFileHashes (ITorrentManagerInfo manager, IncrementalHash? fileMD5, Dictionary<ITorrentManagerFile, ReadOnlyMemory<byte>> fileMD5Hashes, IncrementalHash? fileSHA1, Dictionary<ITorrentManagerFile, ReadOnlyMemory<byte>> fileSHA1Hashes, long offset, Memory<byte> buffer)
async ReusableTask AppendPerFileHashes (ITorrentManagerInfo manager, IncrementalHash? fileMD5, Dictionary<ITorrentManagerFile, ReadOnlyMemory<byte>> fileMD5Hashes, IncrementalHash? fileSHA1, Dictionary<ITorrentManagerFile, ReadOnlyMemory<byte>> fileSHA1Hashes, long offset, Memory<byte> buffer)
{
while (buffer.Length > 0) {
var fileIndex = manager.Files.FindFileByOffset (offset);
Expand All @@ -435,8 +452,21 @@ void AppendPerFileHashes (ITorrentManagerInfo manager, IncrementalHash? fileMD5,
if (!(fileMD5 is null))
fileMD5Hashes.Add (file, fileMD5.GetHashAndReset ());
} else {
fileMD5?.AppendData (buffer.Span);
fileSHA1?.AppendData (buffer.Span);
static async ReusableTask AsyncHash (IncrementalHash hash, Memory<byte> buffer)
{
await MainLoop.SwitchThread ();
hash.AppendData (buffer.Span);
}
if (!(fileMD5 is null) && !(fileSHA1 is null)) {
// Both of these are non-null, so hash both in parallel
var t1 = fileMD5 is null ? ReusableTask.CompletedTask : AsyncHash (fileMD5, buffer);
var t2 = fileSHA1 is null ? ReusableTask.CompletedTask : AsyncHash (fileSHA1, buffer);
await t1.ConfigureAwait (false);
await t2.ConfigureAwait (false);
} else {
// Only one of them is non-null, so no fancy threading needed.
(fileMD5 ?? fileSHA1)!.AppendData (buffer.Span);
}
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ namespace MonoTorrent.PieceWriter
{
class FileStreamBuffer : IDisposable
{
class Comparer : IEqualityComparer<ITorrentManagerFile>
{
public static Comparer Instance { get; } = new Comparer ();

public bool Equals (ITorrentManagerFile? x, ITorrentManagerFile? y)
=> x == y;

public int GetHashCode (ITorrentManagerFile obj)
=> obj.GetHashCode ();
}

internal readonly struct RentedStream : IDisposable
{
internal readonly Stream? Stream;
Expand Down Expand Up @@ -77,7 +88,7 @@ internal FileStreamBuffer (Func<ITorrentManagerFile, FileAccess, Stream> streamC
{
StreamCreator = streamCreator;
MaxStreams = maxStreams;
Streams = new Dictionary<ITorrentManagerFile, StreamData> (maxStreams);
Streams = new Dictionary<ITorrentManagerFile, StreamData> (maxStreams, Comparer.Instance);
}

internal async ReusableTask<bool> CloseStreamAsync (ITorrentManagerFile file)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ private static async Task<BEncodedDictionary> CreateTestBenc (TorrentType type,
var creator = new TorrentCreator (type, Factories.Default
.WithPieceWriterCreator (maxOpenFiles => new TestWriter { DontWrite = false, FillValue = 0 })) {
StoreMD5 = true,
StoreSHA1 = true,
};

var announces = new List<List<string>> {
Expand Down Expand Up @@ -127,20 +126,32 @@ public async Task MD5HashNotAffectedByPadding ()
var fileA = (BEncodedDictionary) filesA[0];
long lengthA = ((BEncodedNumber) fileA[(BEncodedString) "length"]).Number;
var md5sumA = ((BEncodedString) fileA[(BEncodedString) "md5sum"]);
var sha1sumA = ((BEncodedString) fileA[(BEncodedString) "sha1"]);

var fileB = (BEncodedDictionary) filesB[0];
long lengthB = ((BEncodedNumber) fileB[(BEncodedString) "length"]).Number;
var md5sumB = ((BEncodedString) fileB[(BEncodedString) "md5sum"]);
var sha1sumB = ((BEncodedString) fileB[(BEncodedString) "sha1"]);

Assert.AreEqual (lengthA, lengthB);
Assert.AreEqual (md5sumA, md5sumB);
Assert.AreEqual (sha1sumA, sha1sumB);

Assert.AreEqual (SHA1SumZeros (lengthA), sha1sumA);
Assert.AreEqual (MD5SumZeros (lengthA), md5sumA);
}
}

[Test]
public async Task SHA1HashNotAffectedByPadding ()
{
var paddedBenc = await CreateTestBenc (TorrentType.V1OnlyWithPaddingFiles);

var infoA = (BEncodedDictionary) paddedBenc[(BEncodedString) "info"];
var filesA = (BEncodedList) infoA[(BEncodedString) "files"];

for (int i = 0; i < filesA.Count; i++) {
var fileA = (BEncodedDictionary) filesA[0];
long lengthA = ((BEncodedNumber) fileA[(BEncodedString) "length"]).Number;
var sha1sumA = ((BEncodedString) fileA[(BEncodedString) "sha1"]);

Assert.AreEqual (SHA1SumZeros (lengthA), sha1sumA);
}
}

Expand Down

0 comments on commit c539bc7

Please sign in to comment.