Skip to content

Commit

Permalink
Merge pull request #436 from CortexFoundation/event
Browse files Browse the repository at this point in the history
caffe event added
  • Loading branch information
ucwong authored Jan 3, 2024
2 parents 3c4274b + 7ab415d commit bb5dc3b
Show file tree
Hide file tree
Showing 9 changed files with 304 additions and 261 deletions.
10 changes: 7 additions & 3 deletions backend/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,20 +224,24 @@ func (tm *TorrentManager) Search(ctx context.Context, hex string, request uint64
downloadMeter.Mark(1)

if request == 0 {
// TODO
// sync create torrent
// tm.addInfoHash(hex, int64(request))
// return nil
}

return tm.commit(ctx, hex, request)
}

// Add torrent to the leeching loop
func (tm *TorrentManager) commit(ctx context.Context, hex string, request uint64) error {
select {
/*select {
case tm.taskChan <- types.NewBitsFlow(hex, request):
case <-ctx.Done():
return ctx.Err()
case <-tm.closeAll:
}
return nil
return nil */

return tm.taskEvent.Post(mainEvent{types.NewBitsFlow(hex, request)})
}
8 changes: 7 additions & 1 deletion backend/caffe/t.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/CortexFoundation/CortexTheseus/common"
"github.com/CortexFoundation/CortexTheseus/common/mclock"
"github.com/CortexFoundation/CortexTheseus/event"
"github.com/CortexFoundation/CortexTheseus/log"
"github.com/anacrolix/torrent"
//"github.com/anacrolix/torrent/metainfo"
Expand All @@ -46,6 +47,10 @@ func (t *Torrent) Birth() mclock.AbsTime {
return t.start
}

func (t *Torrent) Mux() *event.TypeMux {
return t.mux
}

func (t *Torrent) SetStart(s mclock.AbsTime) {
t.start = s
}
Expand Down Expand Up @@ -303,7 +308,8 @@ func (t *Torrent) Close() {
t.Lock()
defer t.Unlock()

defer t.Torrent.Drop()
t.Torrent.Drop()
t.mux.Stop()

log.Info("Nas closed", "ih", t.InfoHash(), "total", common.StorageSize(t.Torrent.Length()), "req", common.StorageSize(t.BytesRequested()), "finish", common.StorageSize(t.Torrent.BytesCompleted()), "status", t.Status(), "cited", t.Cited())
t = nil
Expand Down
8 changes: 8 additions & 0 deletions backend/caffe/torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/CortexFoundation/CortexTheseus/common/mclock"
"github.com/CortexFoundation/CortexTheseus/event"
"github.com/CortexFoundation/CortexTheseus/log"
"github.com/CortexFoundation/torrentfs/params"
"github.com/anacrolix/torrent"
Expand Down Expand Up @@ -66,13 +67,19 @@ type Torrent struct {

//jobCh chan bool
priority int

mux *event.TypeMux
}

type task struct {
start int
end int
}

type TorrentEvent struct {
S int
}

func NewTorrent(t *torrent.Torrent, requested int64, ih string, path string, slot int, spec *torrent.TorrentSpec) *Torrent {
tor := Torrent{
Torrent: t,
Expand All @@ -85,6 +92,7 @@ func NewTorrent(t *torrent.Torrent, requested int64, ih string, path string, slo
closeAll: make(chan any),
slot: slot,
spec: spec,
mux: new(event.TypeMux),
}

tor.bytesRequested.Store(requested)
Expand Down
8 changes: 5 additions & 3 deletions backend/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"github.com/CortexFoundation/CortexTheseus/common"
"github.com/CortexFoundation/CortexTheseus/common/mclock"
"github.com/CortexFoundation/CortexTheseus/event"
"github.com/CortexFoundation/CortexTheseus/log"
"os"
"path/filepath"
Expand Down Expand Up @@ -58,14 +59,14 @@ func (tm *TorrentManager) ExistsOrActive(ctx context.Context, ih string, rawSize
}
}

func (tm *TorrentManager) GetFile(ctx context.Context, infohash, subpath string) (data []byte, err error) {
func (tm *TorrentManager) GetFile(ctx context.Context, infohash, subpath string) (data []byte, mu *event.TypeMux, err error) {
getfileMeter.Mark(1)
if tm.metrics {
defer func(start time.Time) { tm.Updates += time.Since(start) }(time.Now())
}

if !common.IsHexAddress(infohash) {
return nil, errors.New("invalid infohash format")
return nil, nil, errors.New("invalid infohash format")
}

infohash = strings.TrimPrefix(strings.ToLower(infohash), common.Prefix)
Expand All @@ -78,13 +79,14 @@ func (tm *TorrentManager) GetFile(ctx context.Context, infohash, subpath string)

if t := tm.getTorrent(infohash); t != nil {
if !t.Ready() {
return nil, ErrUnfinished
return nil, t.Mux(), ErrUnfinished
}

// Data protection when torrent is active
t.RLock()
defer t.RUnlock()

mu = t.Mux()
}

diskReadMeter.Mark(1)
Expand Down
Loading

0 comments on commit bb5dc3b

Please sign in to comment.