Skip to content

Commit

Permalink
Downloader atomic snapshot dir, step 1 (#4085)
Browse files Browse the repository at this point in the history
* save

* save

* save
  • Loading branch information
AskAlexSharov authored May 6, 2022
1 parent 0fc99b7 commit f480865
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 201 deletions.
224 changes: 48 additions & 176 deletions cmd/downloader/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package downloader
import (
"context"
"fmt"
"net"
"runtime"
"sync"
"time"
Expand All @@ -18,39 +17,31 @@ import (
"golang.org/x/sync/semaphore"
)

type Protocols struct {
TorrentClient *torrent.Client
DB kv.RwDB
type Downloader struct {
torrentClient *torrent.Client
db kv.RwDB
cfg *torrentcfg.Cfg

statsLock *sync.RWMutex
stats AggStats
snapshotDir *dir.Rw
}

func portMustBeTCPAndUDPOpen(port int) error {
tcpAddr := &net.TCPAddr{
Port: port,
IP: net.ParseIP("127.0.0.1"),
}
ln, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
return fmt.Errorf("please open port %d for TCP and UDP. %w", port, err)
}
_ = ln.Close()
udpAddr := &net.UDPAddr{
Port: port,
IP: net.ParseIP("127.0.0.1"),
}
ser, err := net.ListenUDP("udp", udpAddr)
if err != nil {
return fmt.Errorf("please open port %d for UDP. %w", port, err)
}
_ = ser.Close()
return nil
type AggStats struct {
MetadataReady, FilesTotal int32
PeersUnique int32
ConnectionsTotal uint64

Completed bool
Progress float32

BytesCompleted, BytesTotal uint64

BytesDownload, BytesUpload uint64
UploadRate, DownloadRate uint64
}

func New(cfg *torrentcfg.Cfg, snapshotDir *dir.Rw) (*Protocols, error) {
func New(cfg *torrentcfg.Cfg, snapshotDir *dir.Rw) (*Downloader, error) {
if err := portMustBeTCPAndUDPOpen(cfg.ListenPort); err != nil {
return nil, err
}
Expand All @@ -70,45 +61,25 @@ func New(cfg *torrentcfg.Cfg, snapshotDir *dir.Rw) (*Protocols, error) {
}
}

return &Protocols{
return &Downloader{
cfg: cfg,
TorrentClient: torrentClient,
DB: cfg.DB,
torrentClient: torrentClient,
db: cfg.DB,
statsLock: &sync.RWMutex{},
snapshotDir: snapshotDir,
}, nil
}

func savePeerID(db kv.RwDB, peerID torrent.PeerID) error {
return db.Update(context.Background(), func(tx kv.RwTx) error {
return tx.Put(kv.BittorrentInfo, []byte(kv.BittorrentPeerID), peerID[:])
})
}

func readPeerID(db kv.RoDB) (peerID []byte, err error) {
if err = db.View(context.Background(), func(tx kv.Tx) error {
peerIDFromDB, err := tx.GetOne(kv.BittorrentInfo, []byte(kv.BittorrentPeerID))
if err != nil {
return fmt.Errorf("get peer id: %w", err)
}
peerID = common2.Copy(peerIDFromDB)
return nil
}); err != nil {
return nil, err
}
return peerID, nil
}

func (cli *Protocols) Start(ctx context.Context, silent bool) error {
if err := BuildTorrentsAndAdd(ctx, cli.snapshotDir, cli.TorrentClient); err != nil {
func (d *Downloader) Start(ctx context.Context, silent bool) error {
if err := BuildTorrentsAndAdd(ctx, d.snapshotDir, d.torrentClient); err != nil {
return fmt.Errorf("BuildTorrentsAndAdd: %w", err)
}

var sem = semaphore.NewWeighted(int64(cli.cfg.DownloadSlots))
var sem = semaphore.NewWeighted(int64(d.cfg.DownloadSlots))

go func() {
for {
torrents := cli.TorrentClient.Torrents()
torrents := d.Torrent().Torrents()
for _, t := range torrents {
<-t.GotInfo()
if t.Complete.Bool() {
Expand Down Expand Up @@ -144,14 +115,14 @@ func (cli *Protocols) Start(ctx context.Context, silent bool) error {
case <-ctx.Done():
return
case <-statEvery.C:
cli.ReCalcStats(interval)
d.ReCalcStats(interval)

case <-logEvery.C:
if silent {
continue
}

stats := cli.Stats()
stats := d.Stats()

if stats.MetadataReady < stats.FilesTotal {
log.Info(fmt.Sprintf("[Snapshots] Waiting for torrents metadata: %d/%d", stats.MetadataReady, stats.FilesTotal))
Expand All @@ -178,7 +149,7 @@ func (cli *Protocols) Start(ctx context.Context, silent bool) error {
"files", stats.FilesTotal,
"alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys))
if stats.PeersUnique == 0 {
ips := cli.TorrentClient.BadPeerIPs()
ips := d.Torrent().BadPeerIPs()
if len(ips) > 0 {
log.Info("[Snapshots] Stats", "banned", ips)
}
Expand All @@ -190,14 +161,14 @@ func (cli *Protocols) Start(ctx context.Context, silent bool) error {
return nil
}

func (cli *Protocols) ReCalcStats(interval time.Duration) {
cli.statsLock.Lock()
defer cli.statsLock.Unlock()
prevStats, stats := cli.stats, cli.stats
func (d *Downloader) ReCalcStats(interval time.Duration) {
d.statsLock.Lock()
defer d.statsLock.Unlock()
prevStats, stats := d.stats, d.stats

peers := make(map[torrent.PeerID]struct{}, 16)
torrents := cli.TorrentClient.Torrents()
connStats := cli.TorrentClient.ConnStats()
torrents := d.torrentClient.Torrents()
connStats := d.torrentClient.ConnStats()

stats.Completed = true
stats.BytesDownload = uint64(connStats.BytesReadUsefulIntendedData.Int64())
Expand Down Expand Up @@ -234,43 +205,32 @@ func (cli *Protocols) ReCalcStats(interval time.Duration) {
stats.PeersUnique = int32(len(peers))
stats.FilesTotal = int32(len(torrents))

cli.stats = stats
d.stats = stats
}

func (cli *Protocols) Stats() AggStats {
cli.statsLock.RLock()
defer cli.statsLock.RUnlock()
return cli.stats
func (d *Downloader) Stats() AggStats {
d.statsLock.RLock()
defer d.statsLock.RUnlock()
return d.stats
}

func (cli *Protocols) Close() {
//for _, tr := range cli.TorrentClient.Torrents() {
// go func() {}()
// fmt.Printf("alex: CLOse01: %s\n", tr.Name())
// tr.DisallowDataUpload()
// fmt.Printf("alex: CLOse02: %s\n", tr.Name())
// tr.DisallowDataDownload()
// fmt.Printf("alex: CLOse03: %s\n", tr.Name())
// ch := t.Closed()
// tr.Drop()
// <-ch
//}
cli.TorrentClient.Close()
cli.DB.Close()
if cli.cfg.CompletionCloser != nil {
if err := cli.cfg.CompletionCloser.Close(); err != nil {
func (d *Downloader) Close() {
d.torrentClient.Close()
d.db.Close()
if d.cfg.CompletionCloser != nil {
if err := d.cfg.CompletionCloser.Close(); err != nil {
log.Warn("[Snapshots] CompletionCloser", "err", err)
}
}
}

func (cli *Protocols) PeerID() []byte {
peerID := cli.TorrentClient.PeerID()
func (d *Downloader) PeerID() []byte {
peerID := d.torrentClient.PeerID()
return peerID[:]
}

func (cli *Protocols) StopSeeding(hash metainfo.Hash) error {
t, ok := cli.TorrentClient.Torrent(hash)
func (d *Downloader) StopSeeding(hash metainfo.Hash) error {
t, ok := d.torrentClient.Torrent(hash)
if !ok {
return nil
}
Expand All @@ -280,94 +240,6 @@ func (cli *Protocols) StopSeeding(hash metainfo.Hash) error {
return nil
}

type AggStats struct {
MetadataReady, FilesTotal int32
PeersUnique int32
ConnectionsTotal uint64

Completed bool
Progress float32

BytesCompleted, BytesTotal uint64

BytesDownload, BytesUpload uint64
UploadRate, DownloadRate uint64
}

// AddTorrentFile - adding .torrent file to torrentClient (and checking their hashes), if .torrent file
// added first time - pieces verification process will start (disk IO heavy) - Progress
// kept in `piece completion storage` (surviving reboot). Once it done - no disk IO needed again.
// Don't need call torrent.VerifyData manually
func AddTorrentFile(ctx context.Context, torrentFilePath string, torrentClient *torrent.Client) (*torrent.Torrent, error) {
mi, err := metainfo.LoadFromFile(torrentFilePath)
if err != nil {
return nil, err
}
mi.AnnounceList = Trackers
ts, err := torrent.TorrentSpecFromMetaInfoErr(mi)
if err != nil {
return nil, err
}
ts.ChunkSize = torrentcfg.DefaultNetworkChunkSize
t, _, err := torrentClient.AddTorrentSpec(ts)
if err != nil {
return nil, err
}
t.DisallowDataDownload()
t.AllowDataUpload()
return t, nil
}

func VerifyDtaFiles(ctx context.Context, snapshotDir string) error {
logEvery := time.NewTicker(5 * time.Second)
defer logEvery.Stop()
files, err := AllTorrentPaths(snapshotDir)
if err != nil {
return err
}
totalPieces := 0
for _, f := range files {
metaInfo, err := metainfo.LoadFromFile(f)
if err != nil {
return err
}
info, err := metaInfo.UnmarshalInfo()
if err != nil {
return err
}
totalPieces += info.NumPieces()
}

j := 0
for _, f := range files {
metaInfo, err := metainfo.LoadFromFile(f)
if err != nil {
return err
}
info, err := metaInfo.UnmarshalInfo()
if err != nil {
return err
}

err = verifyTorrent(&info, snapshotDir, func(i int, good bool) error {
j++
if !good {
log.Error("[Snapshots] Verify hash mismatch", "at piece", i, "file", f)
return fmt.Errorf("invalid file")
}
select {
case <-logEvery.C:
log.Info("[Snapshots] Verify", "Progress", fmt.Sprintf("%.2f%%", 100*float64(j)/float64(totalPieces)))
case <-ctx.Done():
return ctx.Err()
default:
}
return nil
})
if err != nil {
return err
}
}
log.Info("[Snapshots] Verify succeed")
return nil
func (d *Downloader) Torrent() *torrent.Client {
return d.torrentClient
}
24 changes: 6 additions & 18 deletions cmd/downloader/downloader/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,36 @@ package downloader

import (
"context"
"errors"

"github.com/anacrolix/torrent/metainfo"
"github.com/ledgerwatch/erigon-lib/common/dir"
"github.com/ledgerwatch/erigon-lib/gointerfaces"
proto_downloader "github.com/ledgerwatch/erigon-lib/gointerfaces/downloader"
prototypes "github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/log/v3"
"google.golang.org/protobuf/types/known/emptypb"
)

var (
ErrNotSupportedNetworkID = errors.New("not supported network id")
ErrNotSupportedSnapshot = errors.New("not supported snapshot for this network id")
)
var (
_ proto_downloader.DownloaderServer = &GrpcServer{}
)

func NewGrpcServer(db kv.RwDB, client *Protocols, snapshotDir *dir.Rw) (*GrpcServer, error) {
sn := &GrpcServer{
db: db,
t: client,
snapshotDir: snapshotDir,
}
return sn, nil
func NewGrpcServer(d *Downloader, snapshotDir *dir.Rw) (*GrpcServer, error) {
return &GrpcServer{d: d, snapshotDir: snapshotDir}, nil
}

type GrpcServer struct {
proto_downloader.UnimplementedDownloaderServer
t *Protocols
db kv.RwDB
d *Downloader
snapshotDir *dir.Rw
}

func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.DownloadRequest) (*emptypb.Empty, error) {
torrentClient := s.t.TorrentClient
torrentClient := s.d.Torrent()
mi := &metainfo.MetaInfo{AnnounceList: Trackers}
for _, it := range request.Items {
if it.TorrentHash == nil {
err := BuildTorrentAndAdd(ctx, it.Path, s.snapshotDir, s.t.TorrentClient)
err := BuildTorrentAndAdd(ctx, it.Path, s.snapshotDir, torrentClient)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -77,7 +65,7 @@ func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.Dow
}

func (s *GrpcServer) Stats(ctx context.Context, request *proto_downloader.StatsRequest) (*proto_downloader.StatsReply, error) {
stats := s.t.Stats()
stats := s.d.Stats()
return &proto_downloader.StatsReply{
MetadataReady: stats.MetadataReady,
FilesTotal: stats.FilesTotal,
Expand Down
Loading

0 comments on commit f480865

Please sign in to comment.