Skip to content

Commit

Permalink
Improved downloader webseed performance (#10715)
Browse files Browse the repository at this point in the history
This contains fixes, mainly in erigontech/torrent which improve the
parallelization of the lib to support downloads speeds over ~25MB per
second on a reliable basis.

`--torrent.download.rate=256mb --torrent.download.slots=32` should now
run a download to completion at a consistent ~250MB/second download
rate, assuming `64GB` of available memory.

If more memory is available `--torrent.download.rate=512mb
--torrent.download.slots=48` will around ~400MB/second. Its not yet
clear where the loss in performance is but for this version ofthe code
it seems 400MB/second is around the maximum it can support.

# Outstanding Issues

The current version of the code is memory hungry at high bandwidths the
reason behind this is the way the http data is dealt with under high
load. The buffer model is currently:

`http->hashing->mmap`

Where both the http connection and the torrent hasher will retain
intermediate buffers until they are finally flushed to te memory mapped
file. A more memory efficient model would be to get the http connection
to write directly to the memory mapped segment which can the be directly
hashed. This will require further code modification - which is outside
of the scope of this change.

# Downloader changes

Along with the torrent lib changes a number of changes have been made to
the downloader code. The most significant are:

* d.webseeds.DownloadAndSaveTorrentFile has been added to the post
processing step after webseed torrents are downloaded. This is becuase
it appears that if this is not done certian scenarios will lead to a
torrrent's metadata never becoming availible. (If the existing checks
are made before the download is finished)
* `mdbxPieceCompletion` now has a `Flushed(infoHash infohash.T, flushed
*roaring.Bitmap)` method which is used to commit the completion status
to the db after an asynchronous flush of the mmap files has been made.
This means that the completion state will only be confirmed once the
data is flushed. (This may lead to re-downloading of peices in the case
of a crash.

---------

Co-authored-by: alex.sharov <AskAlexSharov@gmail.com>
  • Loading branch information
mh0lt and AskAlexSharov authored Jul 1, 2024
1 parent 4e2b026 commit 1b060dd
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 129 deletions.
11 changes: 6 additions & 5 deletions erigon-lib/diagnostics/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,12 @@ func (d *DiagnosticClient) runSyncStagesListListener(rootCtx context.Context) {
case <-rootCtx.Done():
return
case info := <-ch:
d.mu.Lock()
d.SetStagesList(info.StagesList)
d.mu.Unlock()

d.saveSyncStagesToDB()
func() {
d.mu.Lock()
defer d.mu.Unlock()
d.SetStagesList(info.StagesList)
d.saveSyncStagesToDB()
}()
}
}
}()
Expand Down
13 changes: 7 additions & 6 deletions erigon-lib/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg, logger log.Logger, verbosi

cfg.ClientConfig.WebTransport = requestHandler

db, c, m, torrentClient, err := openClient(ctx, cfg.Dirs.Downloader, cfg.Dirs.Snap, cfg.ClientConfig, cfg.MdbxWriteMap)
db, c, m, torrentClient, err := openClient(ctx, cfg.Dirs.Downloader, cfg.Dirs.Snap, cfg.ClientConfig, cfg.MdbxWriteMap, logger)
if err != nil {
return nil, fmt.Errorf("openClient: %w", err)
}
Expand Down Expand Up @@ -838,6 +838,7 @@ func (d *Downloader) mainLoop(silent bool) error {
if ok && err == nil {
_, _, err = addTorrentFile(d.ctx, ts, d.torrentClient, d.db, d.webseeds)
if err != nil {
d.logger.Warn("[snapshots] addTorrentFile from webseed", "err", err)
continue
}
}
Expand Down Expand Up @@ -1092,10 +1093,10 @@ func (d *Downloader) mainLoop(silent bool) error {
}
}

d.lock.RLock()
d.lock.Lock()
downloadingLen := len(d.downloading)
d.stats.Downloading = int32(downloadingLen)
d.lock.RUnlock()
d.lock.Unlock()

// the call interval of the loop (elapsed sec) used to get slots/sec for
// calculating the number of files to download based on the loop speed
Expand Down Expand Up @@ -2139,7 +2140,7 @@ func (d *Downloader) ReCalcStats(interval time.Duration) {
}

if !stats.Completed {
logger.Debug("[snapshots] info",
logger.Debug("[snapshots] download info",
"len", len(torrents),
"webTransfers", webTransfers,
"torrent", torrentInfo,
Expand Down Expand Up @@ -2717,7 +2718,7 @@ func (d *Downloader) StopSeeding(hash metainfo.Hash) error {

func (d *Downloader) TorrentClient() *torrent.Client { return d.torrentClient }

func openClient(ctx context.Context, dbDir, snapDir string, cfg *torrent.ClientConfig, writeMap bool) (db kv.RwDB, c storage.PieceCompletion, m storage.ClientImplCloser, torrentClient *torrent.Client, err error) {
func openClient(ctx context.Context, dbDir, snapDir string, cfg *torrent.ClientConfig, writeMap bool, logger log.Logger) (db kv.RwDB, c storage.PieceCompletion, m storage.ClientImplCloser, torrentClient *torrent.Client, err error) {
dbCfg := mdbx.NewMDBX(log.New()).
Label(kv.DownloaderDB).
WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.DownloaderTablesCfg }).
Expand All @@ -2735,7 +2736,7 @@ func openClient(ctx context.Context, dbDir, snapDir string, cfg *torrent.ClientC
return nil, nil, nil, nil, fmt.Errorf("torrentcfg.openClient: %w", err)
}
//c, err = NewMdbxPieceCompletion(db)
c, err = NewMdbxPieceCompletionBatch(db)
c, err = NewMdbxPieceCompletion(db, logger)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("torrentcfg.NewMdbxPieceCompletion: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion erigon-lib/downloader/downloadercfg/downloadercfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package downloadercfg

import (
"github.com/ledgerwatch/erigon-lib/common/dbg"
"net"
"net/url"
"os"
Expand Down Expand Up @@ -68,7 +69,7 @@ func Default() *torrent.ClientConfig {
// better don't increase because erigon periodically producing "new seedable files" - and adding them to downloader.
// it must not impact chain tip sync - so, limit resources to minimum by default.
// but when downloader is started as a separated process - rise it to max
//torrentConfig.PieceHashersPerTorrent = max(1, runtime.NumCPU()-1)
torrentConfig.PieceHashersPerTorrent = dbg.EnvInt("DL_HASHERS", min(16, max(2, runtime.NumCPU()-2)))

torrentConfig.MinDialTimeout = 6 * time.Second //default: 3s
torrentConfig.HandshakesTimeout = 8 * time.Second //default: 4s
Expand Down
165 changes: 101 additions & 64 deletions erigon-lib/downloader/mdbx_piece_completion.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ package downloader
import (
"context"
"encoding/binary"
"sync"

"github.com/RoaringBitmap/roaring"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
"github.com/anacrolix/torrent/types/infohash"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/erigon-lib/log/v3"
)

const (
Expand All @@ -33,17 +36,37 @@ const (
)

type mdbxPieceCompletion struct {
db kv.RwDB
db *mdbx.MdbxKV
mu sync.RWMutex
completed map[infohash.T]*roaring.Bitmap
flushed map[infohash.T]*roaring.Bitmap
logger log.Logger
}

var _ storage.PieceCompletion = (*mdbxPieceCompletion)(nil)

func NewMdbxPieceCompletion(db kv.RwDB) (ret storage.PieceCompletion, err error) {
ret = &mdbxPieceCompletion{db: db}
func NewMdbxPieceCompletion(db kv.RwDB, logger log.Logger) (ret storage.PieceCompletion, err error) {
ret = &mdbxPieceCompletion{
db: db.(*mdbx.MdbxKV),
logger: logger,
completed: map[infohash.T]*roaring.Bitmap{},
flushed: map[infohash.T]*roaring.Bitmap{}}
return
}

func (m mdbxPieceCompletion) Get(pk metainfo.PieceKey) (cn storage.Completion, err error) {
func (m *mdbxPieceCompletion) Get(pk metainfo.PieceKey) (cn storage.Completion, err error) {
m.mu.RLock()
if completed, ok := m.completed[pk.InfoHash]; ok {
if completed.Contains(uint32(pk.Index)) {
m.mu.RUnlock()
return storage.Completion{
Complete: true,
Ok: true,
}, nil
}
}
m.mu.RUnlock()

err = m.db.View(context.Background(), func(tx kv.Tx) error {
var key [infohash.Size + 4]byte
copy(key[:], pk.InfoHash[:])
Expand All @@ -66,11 +89,14 @@ func (m mdbxPieceCompletion) Get(pk metainfo.PieceKey) (cn storage.Completion, e
return
}

func (m mdbxPieceCompletion) Set(pk metainfo.PieceKey, b bool) error {
func (m *mdbxPieceCompletion) Set(pk metainfo.PieceKey, b bool) error {
if c, err := m.Get(pk); err == nil && c.Ok && c.Complete == b {
return nil
}

m.mu.Lock()
defer m.mu.Unlock()

var tx kv.RwTx
var err error
// On power-off recent "no-sync" txs may be lost.
Expand All @@ -84,91 +110,102 @@ func (m mdbxPieceCompletion) Set(pk metainfo.PieceKey, b bool) error {
// 1K fsyncs/2minutes it's quite expensive, but even on cloud (high latency) drive it allow download 100mb/s
// and Erigon doesn't do anything when downloading snapshots
if b {
tx, err = m.db.BeginRwNosync(context.Background())
if err != nil {
return err
completed, ok := m.completed[pk.InfoHash]

if !ok {
completed = &roaring.Bitmap{}
m.completed[pk.InfoHash] = completed
}
} else {
tx, err = m.db.BeginRw(context.Background())
if err != nil {
return err

completed.Add(uint32(pk.Index))

if flushed, ok := m.flushed[pk.InfoHash]; !ok || !flushed.Contains(uint32(pk.Index)) {
return nil
}
}

tx, err = m.db.BeginRw(context.Background())
if err != nil {
return err
}

defer tx.Rollback()

var key [infohash.Size + 4]byte
copy(key[:], pk.InfoHash[:])
binary.BigEndian.PutUint32(key[infohash.Size:], uint32(pk.Index))
err = putCompletion(tx, pk.InfoHash, uint32(pk.Index), b)

v := []byte(incomplete)
if b {
v = []byte(complete)
}
err = tx.Put(kv.BittorrentCompletion, key[:], v)
if err != nil {
return err
}

return tx.Commit()
}

func (m *mdbxPieceCompletion) Close() error {
m.db.Close()
return nil
}
func putCompletion(tx kv.RwTx, infoHash infohash.T, index uint32, c bool) error {
var key [infohash.Size + 4]byte
copy(key[:], infoHash[:])
binary.BigEndian.PutUint32(key[infohash.Size:], index)

type mdbxPieceCompletionBatch struct {
db *mdbx.MdbxKV
v := []byte(incomplete)
if c {
v = []byte(complete)
}
//fmt.Println("PUT", infoHash, index, c)
return tx.Put(kv.BittorrentCompletion, key[:], v)
}

var _ storage.PieceCompletion = (*mdbxPieceCompletionBatch)(nil)
func (m *mdbxPieceCompletion) Flushed(infoHash infohash.T, flushed *roaring.Bitmap) {
m.mu.Lock()
defer m.mu.Unlock()

func NewMdbxPieceCompletionBatch(db kv.RwDB) (ret storage.PieceCompletion, err error) {
ret = &mdbxPieceCompletionBatch{db: db.(*mdbx.MdbxKV)}
return
tx, err := m.db.BeginRw(context.Background())

if err != nil {
m.logger.Warn("[snapshots] failed to flush piece completions", "hash", infoHash, err, err)
return
}

defer tx.Rollback()

m.putFlushed(tx, infoHash, flushed)

err = tx.Commit()

if err != nil {
m.logger.Warn("[snapshots] failed to flush piece completions", "hash", infoHash, err, err)
}
}

func (m *mdbxPieceCompletionBatch) Get(pk metainfo.PieceKey) (cn storage.Completion, err error) {
err = m.db.View(context.Background(), func(tx kv.Tx) error {
var key [infohash.Size + 4]byte
copy(key[:], pk.InfoHash[:])
binary.BigEndian.PutUint32(key[infohash.Size:], uint32(pk.Index))
cn.Ok = true
v, err := tx.GetOne(kv.BittorrentCompletion, key[:])
if err != nil {
return err
}
switch string(v) {
case complete:
cn.Complete = true
case incomplete:
cn.Complete = false
default:
cn.Ok = false
func (m *mdbxPieceCompletion) putFlushed(tx kv.RwTx, infoHash infohash.T, flushed *roaring.Bitmap) {
if completed, ok := m.completed[infoHash]; ok {
setters := flushed.Clone()
setters.And(completed)

if setters.GetCardinality() > 0 {
setters.Iterate(func(piece uint32) bool {
// TODO deal with error (? don't remove from bitset ?)
_ = putCompletion(tx, infoHash, piece, true)
return true
})
}
return nil
})
return
}

func (m *mdbxPieceCompletionBatch) Set(pk metainfo.PieceKey, b bool) error {
if c, err := m.Get(pk); err == nil && c.Ok && c.Complete == b {
return nil
completed.AndNot(setters)

if completed.IsEmpty() {
delete(m.completed, infoHash)
}
}
var key [infohash.Size + 4]byte
copy(key[:], pk.InfoHash[:])
binary.BigEndian.PutUint32(key[infohash.Size:], uint32(pk.Index))

v := []byte(incomplete)
if b {
v = []byte(complete)
allFlushed, ok := m.flushed[infoHash]

if !ok {
allFlushed = &roaring.Bitmap{}
m.flushed[infoHash] = allFlushed
}
return m.db.Batch(func(tx kv.RwTx) error {
return tx.Put(kv.BittorrentCompletion, key[:], v)
})

allFlushed.Or(flushed)
}

func (m *mdbxPieceCompletionBatch) Close() error {
func (m *mdbxPieceCompletion) Close() error {
m.db.Close()
return nil
}
28 changes: 2 additions & 26 deletions erigon-lib/downloader/mdbx_piece_completion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/ledgerwatch/erigon-lib/kv/memdb"
"github.com/ledgerwatch/erigon-lib/log/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -29,32 +30,7 @@ import (

func TestMdbxPieceCompletion(t *testing.T) {
db := memdb.NewTestDownloaderDB(t)
pc, err := NewMdbxPieceCompletion(db)
require.NoError(t, err)
defer pc.Close()

pk := metainfo.PieceKey{}

b, err := pc.Get(pk)
require.NoError(t, err)
assert.False(t, b.Ok)

require.NoError(t, pc.Set(pk, false))

b, err = pc.Get(pk)
require.NoError(t, err)
assert.Equal(t, storage.Completion{Complete: false, Ok: true}, b)

require.NoError(t, pc.Set(pk, true))

b, err = pc.Get(pk)
require.NoError(t, err)
assert.Equal(t, storage.Completion{Complete: true, Ok: true}, b)
}

func TestMdbxPieceCompletionBatch(t *testing.T) {
db := memdb.NewTestDownloaderDB(t)
pc, err := NewMdbxPieceCompletionBatch(db)
pc, err := NewMdbxPieceCompletion(db, log.New())
require.NoError(t, err)
defer pc.Close()

Expand Down
8 changes: 4 additions & 4 deletions erigon-lib/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ require (
github.com/ianlancetaylor/cgosymbolizer v0.0.0-20240503222823-736c933a666d // indirect
github.com/opencontainers/runtime-spec v1.2.0 // indirect
github.com/pion/udp v0.1.4 // indirect
golang.org/x/mod v0.18.0 // indirect
golang.org/x/tools v0.22.0 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240515191416-fc5f0ca64291 // indirect
modernc.org/libc v1.50.4 // indirect
modernc.org/memory v1.8.0 // indirect
modernc.org/sqlite v1.29.8 // indirect
Expand Down Expand Up @@ -142,15 +143,14 @@ require (
go.uber.org/goleak v1.3.0 // indirect
golang.org/x/net v0.26.0
golang.org/x/text v0.16.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
modernc.org/mathutil v1.6.0 // indirect
rsc.io/tmplfunc v0.0.3 // indirect
zombiezen.com/go/sqlite v0.13.1 // indirect
)

replace (
github.com/anacrolix/torrent => github.com/erigontech/torrent v1.54.2-alpha-10
github.com/anacrolix/torrent => github.com/erigontech/torrent v1.54.2-alpha-24
github.com/holiman/bloomfilter/v2 => github.com/AskAlexSharov/bloomfilter/v2 v2.0.8
github.com/tidwall/btree => github.com/AskAlexSharov/btree v1.6.2
)
Loading

0 comments on commit 1b060dd

Please sign in to comment.