Skip to content

Commit

Permalink
add write_through option to disk_io_write_mode, which will attempt to…
Browse files Browse the repository at this point in the history
… flush pieces to disk once they're validated (with msync). Also make disable_os_cache also flush write blocks and mark all cache blocks as cold
  • Loading branch information
arvidn committed Feb 17, 2022
1 parent 1f39005 commit a45ead2
Show file tree
Hide file tree
Showing 17 changed files with 536 additions and 179 deletions.
1 change: 1 addition & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* add write_through disk_io_write_mode, which flushes pieces to disk immediately
* improve copy file function to preserve sparse regions (when supported)
* add function to truncate over-sized files part of a torrent
* fix directory creation on windows shared folders
Expand Down
1 change: 1 addition & 0 deletions bindings/python/src/session_settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ void bind_session_settings()
.value("disable_os_cache_for_aligned_files", settings_pack::disable_os_cache_for_aligned_files)
#endif
.value("disable_os_cache", settings_pack::disable_os_cache)
.value("write_through", settings_pack::write_through)
;

enum_<settings_pack::bandwidth_mixed_algo_t>("bandwidth_mixed_algo_t")
Expand Down
1 change: 1 addition & 0 deletions examples/client_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,7 @@ void assign_setting(lt::settings_pack& settings, std::string const& key, char co
{"anti_leech"_sv, settings_pack::anti_leech},
{"enable_os_cache"_sv, settings_pack::enable_os_cache},
{"disable_os_cache"_sv, settings_pack::disable_os_cache},
{"write_through"_sv, settings_pack::write_through},
{"prefer_tcp"_sv, settings_pack::prefer_tcp},
{"peer_proportional"_sv, settings_pack::peer_proportional},
{"pe_forced"_sv, settings_pack::pe_forced},
Expand Down
11 changes: 11 additions & 0 deletions include/libtorrent/aux_/mmap.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ namespace aux {
// anytime soon
void dont_need(span<byte const> range);

// hint the kernel that the given (dirty) range of pages should be
// flushed to disk
void page_out(span<byte const> range);

std::int64_t m_size;
#if TORRENT_HAVE_MAP_VIEW_OF_FILE
file_mapping_handle m_file;
Expand Down Expand Up @@ -185,6 +189,13 @@ namespace aux {
m_mapping->dont_need(range);
}

void page_out(span<byte const> range)
{
TORRENT_ASSERT(m_mapping);
m_mapping->page_out(range);
}


private:
explicit file_view(std::shared_ptr<file_mapping> m) : m_mapping(std::move(m)) {}
std::shared_ptr<file_mapping> m_mapping;
Expand Down
4 changes: 4 additions & 0 deletions include/libtorrent/disk_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ namespace file_open_mode {
// hash does not need to be computed.
static constexpr disk_job_flags_t v1_hash = 5_bit;

// this flag instructs a hash job that we just completed this piece, and
// it should be flushed to disk
static constexpr disk_job_flags_t flush_piece = 7_bit;

// this is called when a new torrent is added. The shared_ptr can be
// used to hold the internal torrent object alive as long as there are
// outstanding disk operations on the storage.
Expand Down
16 changes: 10 additions & 6 deletions include/libtorrent/mmap_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,19 @@ namespace aux {
bool tick();

int readv(settings_interface const&, span<iovec_t const> bufs
, piece_index_t piece, int offset, aux::open_mode_t flags, storage_error&);
, piece_index_t piece, int offset, aux::open_mode_t mode
, disk_job_flags_t flags
, storage_error&);
int writev(settings_interface const&, span<iovec_t const> bufs
, piece_index_t piece, int offset, aux::open_mode_t flags, storage_error&);
, piece_index_t piece, int offset, aux::open_mode_t mode
, disk_job_flags_t flags
, storage_error&);
int hashv(settings_interface const&, hasher& ph, std::ptrdiff_t len
, piece_index_t piece, int offset, aux::open_mode_t flags
, disk_job_flags_t mode, storage_error&);
, piece_index_t piece, int offset, aux::open_mode_t mode
, disk_job_flags_t flags, storage_error&);
int hashv2(settings_interface const&, hasher256& ph, std::ptrdiff_t len
, piece_index_t piece, int offset, aux::open_mode_t flags
, disk_job_flags_t mode, storage_error&);
, piece_index_t piece, int offset, aux::open_mode_t mode
, disk_job_flags_t flags, storage_error&);

// if the files in this storage are mapped, returns the mapped
// file_storage, otherwise returns the original file_storage object.
Expand Down
6 changes: 5 additions & 1 deletion include/libtorrent/settings_pack.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,8 @@ namespace aux {
// potentially evict all other processes' cache by simply handling
// high throughput and large files. If libtorrent's read cache is
// disabled, enabling this may reduce performance.
// write_through
// flush pieces to disk as they complete validation.
//
// One reason to disable caching is that it may help the operating
// system from growing its file cache indefinitely.
Expand Down Expand Up @@ -2078,7 +2080,9 @@ namespace aux {
#else
deprecated_disable_os_cache_for_aligned_files = 1,
#endif
disable_os_cache = 2
disable_os_cache = 2,

write_through = 3,
};

enum bandwidth_mixed_algo_t : std::uint8_t
Expand Down
1 change: 1 addition & 0 deletions src/disk_interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,6 @@ constexpr disk_job_flags_t disk_interface::force_copy;
constexpr disk_job_flags_t disk_interface::sequential_access;
constexpr disk_job_flags_t disk_interface::volatile_read;
constexpr disk_job_flags_t disk_interface::v1_hash;
constexpr disk_job_flags_t disk_interface::flush_piece;

}
30 changes: 28 additions & 2 deletions src/mmap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,9 @@ file_mapping::file_mapping(file_mapping&& rhs)

void file_mapping::dont_need(span<byte const> range)
{
TORRENT_UNUSED(range);
auto* const start = const_cast<byte*>(range.data());
auto const size = static_cast<std::size_t>(range.size());

#if TORRENT_USE_MADVISE
int const advise = 0
#if defined TORRENT_LINUX && defined MADV_COLD
Expand All @@ -705,10 +707,34 @@ void file_mapping::dont_need(span<byte const> range)
;

if (advise)
madvise(const_cast<byte*>(range.data()), static_cast<std::size_t>(range.size()), advise);
::madvise(start, size, advise);
#endif
#ifndef TORRENT_WINDOWS
::msync(start, size, MS_INVALIDATE);
#else
TORRENT_UNUSED(start);
TORRENT_UNUSED(size);
#endif
}

void file_mapping::page_out(span<byte const> range)
{
#if TORRENT_HAVE_MAP_VIEW_OF_FILE
// ignore errors, this is best-effort
FlushViewOfFile(range.data(), static_cast<std::size_t>(range.size()));
#else

auto* const start = const_cast<byte*>(range.data());
auto const size = static_cast<std::size_t>(range.size());
#if TORRENT_USE_MADVISE && defined MADV_PAGEOUT
::madvise(start, size, MADV_PAGEOUT);
#endif

::msync(start, size, MS_ASYNC);

#endif // MAP_VIEW_OF_FILE
}

} // aux
} // libtorrent

Expand Down
14 changes: 9 additions & 5 deletions src/mmap_disk_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ namespace {
return (flags & ~(disk_interface::force_copy
| disk_interface::sequential_access
| disk_interface::volatile_read
| disk_interface::v1_hash))
| disk_interface::v1_hash
| disk_interface::flush_piece))
== disk_job_flags_t{};
}
#endif
Expand Down Expand Up @@ -618,7 +619,7 @@ TORRENT_EXPORT std::unique_ptr<disk_interface> mmap_disk_io_constructor(
iovec_t b = {buffer.data() + j->d.io.buffer_offset, j->d.io.buffer_size};

int const ret = j->storage->readv(m_settings, b
, j->piece, j->d.io.offset, file_mode, j->error);
, j->piece, j->d.io.offset, file_mode, j->flags, j->error);

TORRENT_ASSERT(ret >= 0 || j->error.ec);
TORRENT_UNUSED(ret);
Expand Down Expand Up @@ -653,7 +654,7 @@ TORRENT_EXPORT std::unique_ptr<disk_interface> mmap_disk_io_constructor(
iovec_t b = {buffer.data(), j->d.io.buffer_size};

int const ret = j->storage->readv(m_settings, b
, j->piece, j->d.io.offset, file_mode, j->error);
, j->piece, j->d.io.offset, file_mode, j->flags, j->error);

TORRENT_ASSERT(ret >= 0 || j->error.ec);
TORRENT_UNUSED(ret);
Expand Down Expand Up @@ -683,7 +684,7 @@ TORRENT_EXPORT std::unique_ptr<disk_interface> mmap_disk_io_constructor(

// the actual write operation
int const ret = j->storage->writev(m_settings, b
, j->piece, j->d.io.offset, file_mode, j->error);
, j->piece, j->d.io.offset, file_mode, j->flags, j->error);

m_stats_counters.inc_stats_counter(counters::num_writing_threads, -1);

Expand Down Expand Up @@ -1103,9 +1104,12 @@ TORRENT_EXPORT std::unique_ptr<disk_interface> mmap_disk_io_constructor(
{
if (v1)
{
// if we will call hashv2() in a bit, don't trigger a flush
// just yet, let hashv2() do it
auto const flags = v2_block ? (j->flags & ~disk_interface::flush_piece) : j->flags;
j->error.ec.clear();
ret = j->storage->hashv(m_settings, h, len, j->piece, offset
, file_mode, j->flags, j->error);
, file_mode, flags, j->error);
if (ret < 0) break;
}
if (v2_block)
Expand Down
30 changes: 24 additions & 6 deletions src/mmap_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -487,13 +487,15 @@ error_code translate_error(std::system_error const& err, bool const write)
int mmap_storage::readv(settings_interface const& sett
, span<iovec_t const> bufs
, piece_index_t const piece, int const offset
, aux::open_mode_t const mode, storage_error& error)
, aux::open_mode_t const mode
, disk_job_flags_t const flags
, storage_error& error)
{
#ifdef TORRENT_SIMULATE_SLOW_READ
std::this_thread::sleep_for(seconds(1));
#endif
return readwritev(files(), bufs, piece, offset, error
, [this, mode, &sett](file_index_t const file_index
, [this, mode, flags, &sett](file_index_t const file_index
, std::int64_t const file_offset
, span<iovec_t const> vec, storage_error& ec)
{
Expand Down Expand Up @@ -549,6 +551,10 @@ error_code translate_error(std::system_error const& err, bool const write)
file_range = file_range.subspan(buf.size());
ret += static_cast<int>(buf.size());
}
if (flags & disk_interface::volatile_read)
handle->dont_need(file_range);
if (flags & disk_interface::flush_piece)
handle->page_out(file_range);
}
}
catch (std::system_error const& err)
Expand Down Expand Up @@ -576,10 +582,12 @@ error_code translate_error(std::system_error const& err, bool const write)
int mmap_storage::writev(settings_interface const& sett
, span<iovec_t const> bufs
, piece_index_t const piece, int const offset
, aux::open_mode_t const mode, storage_error& error)
, aux::open_mode_t const mode
, disk_job_flags_t const flags
, storage_error& error)
{
return readwritev(files(), bufs, piece, offset, error
, [this, mode, &sett](file_index_t const file_index
, [this, mode, flags, &sett](file_index_t const file_index
, std::int64_t const file_offset
, span<iovec_t const> vec, storage_error& ec)
{
Expand Down Expand Up @@ -639,6 +647,11 @@ error_code translate_error(std::system_error const& err, bool const write)
file_range = file_range.subspan(buf.size());
ret += static_cast<int>(buf.size());
}

if (flags & disk_interface::volatile_read)
handle->dont_need(file_range);
if (flags & disk_interface::flush_piece)
handle->page_out(file_range);
}
catch (std::system_error const& err)
{
Expand Down Expand Up @@ -730,6 +743,8 @@ error_code translate_error(std::system_error const& err, bool const write)
ret += static_cast<int>(file_range.size());
if (flags & disk_interface::volatile_read)
handle->dont_need(file_range);
if (flags & disk_interface::flush_piece)
handle->page_out(file_range);
}

return ret;
Expand Down Expand Up @@ -779,6 +794,8 @@ error_code translate_error(std::system_error const& err, bool const write)
ph.update(file_range);
if (flags & disk_interface::volatile_read)
handle->dont_need(file_range);
if (flags & disk_interface::flush_piece)
handle->page_out(file_range);

return static_cast<int>(file_range.size());
}
Expand Down Expand Up @@ -880,8 +897,9 @@ error_code translate_error(std::system_error const& err, bool const write)
}

// if we have a cache already, don't store the data twice by leaving it in the OS cache as well
if (sett.get_int(settings_pack::disk_io_write_mode)
== settings_pack::disable_os_cache)
auto const write_mode = sett.get_int(settings_pack::disk_io_write_mode);
if (write_mode == settings_pack::disable_os_cache
|| write_mode == settings_pack::write_through)
{
mode |= aux::open_mode::no_cache;
}
Expand Down
8 changes: 7 additions & 1 deletion src/settings_pack.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ namespace libtorrent {
constexpr int CLOSE_FILE_INTERVAL = 240;
#else
constexpr int CLOSE_FILE_INTERVAL = 0;
#endif

#ifdef TORRENT_WINDOWS
constexpr int DISK_WRITE_MODE = settings_pack::write_through;
#else
constexpr int DISK_WRITE_MODE = settings_pack::enable_os_cache;
#endif

// tested to fail with _MSC_VER <= 1916. The actual version condition
Expand Down Expand Up @@ -274,7 +280,7 @@ constexpr int CLOSE_FILE_INTERVAL = 0;
DEPRECATED_SET(cache_size, 2048, nullptr),
DEPRECATED_SET(cache_buffer_chunk_size, 0, nullptr),
DEPRECATED_SET(cache_expiry, 300, nullptr),
SET(disk_io_write_mode, settings_pack::enable_os_cache, nullptr),
SET(disk_io_write_mode, DISK_WRITE_MODE, nullptr),
SET(disk_io_read_mode, settings_pack::enable_os_cache, nullptr),
SET(outgoing_port, 0, nullptr),
SET(num_outgoing_ports, 0, nullptr),
Expand Down
28 changes: 25 additions & 3 deletions src/torrent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,11 @@ bool is_downloading_state(int const st)
rp->blocks_left = blocks_in_piece;
rp->fail = false;

disk_job_flags_t flags{};
auto const read_mode = settings().get_int(settings_pack::disk_io_read_mode);
if (read_mode == settings_pack::disable_os_cache)
flags |= disk_interface::volatile_read;

peer_request r;
r.piece = piece;
r.start = 0;
Expand All @@ -773,7 +778,8 @@ bool is_downloading_state(int const st)
r.length = std::min(piece_size - r.start, block_size());
m_ses.disk_thread().async_read(m_storage, r
, [self, r, rp](disk_buffer_holder block, storage_error const& se) mutable
{ self->on_disk_read_complete(std::move(block), se, r, rp); });
{ self->on_disk_read_complete(std::move(block), se, r, rp); }
, flags);
}
m_ses.deferred_submit_jobs();
}
Expand Down Expand Up @@ -1308,8 +1314,16 @@ bool is_downloading_state(int const st)
p.length = std::min(piece_size - p.start, block_size());

m_stats_counters.inc_stats_counter(counters::queued_write_bytes, p.length);

disk_job_flags_t dflags{};

auto const write_mode = settings().get_int(settings_pack::disk_io_write_mode);
if (write_mode == settings_pack::disable_os_cache)
dflags |= disk_interface::flush_piece | disk_interface::volatile_read;

m_ses.disk_thread().async_write(m_storage, p, data + p.start, nullptr
, [self, p](storage_error const& error) { self->on_disk_write_complete(error, p); });
, [self, p](storage_error const& error) { self->on_disk_write_complete(error, p); }
, dflags);

bool const was_finished = picker().is_piece_finished(p.piece);
bool const multi = picker().num_peers(block) > 1;
Expand Down Expand Up @@ -11036,9 +11050,17 @@ namespace {
TORRENT_ASSERT(m_storage);
TORRENT_ASSERT(!m_picker->is_hashing(piece));

disk_job_flags_t flags;
// we just completed the piece, it should be flushed to disk
disk_job_flags_t flags{};

auto const write_mode = settings().get_int(settings_pack::disk_io_write_mode);
if (write_mode == settings_pack::write_through)
flags |= disk_interface::flush_piece;
else if (write_mode == settings_pack::disable_os_cache)
flags |= disk_interface::flush_piece | disk_interface::volatile_read;
if (torrent_file().info_hashes().has_v1())
flags |= disk_interface::v1_hash;

aux::vector<sha256_hash> hashes;
if (torrent_file().info_hashes().has_v2())
{
Expand Down
Loading

0 comments on commit a45ead2

Please sign in to comment.