Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PSL-1215] wn download multi-volume #898

Merged
merged 1 commit into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions supernode/services/download/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,12 +283,12 @@ func (task *NftDownloadingTask) Download(ctx context.Context, txid, timestamp, s

<-task.NewAction(func(ctx context.Context) error {
log.WithContext(ctx).WithField("txid", txid).WithField("ttype", ttype).Info("Downloading File request received")
// Validate timestamp is not older than 10 minutes
// Validate timestamp is not older than 30 minutes
now := time.Now().UTC()
lastTenMinutes := now.Add(time.Duration(-10) * time.Minute)
lastTenMinutes := now.Add(time.Duration(-30) * time.Minute)
requestTime, _ := time.Parse(time.RFC3339, timestamp)
if lastTenMinutes.After(requestTime) {
err = errors.New("request time is older than 10 minutes")
err = errors.New("request time is older than 30 minutes")
task.UpdateStatus(common.StatusRequestTooLate)
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion walletnode/api/services/cascade.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
maxFileSize = 300 * 1024 * 1024 // 300MB in bytes
maxFileRegistrationAttempts = 3
downloadDeadline = 30 * time.Minute
downloadConcurrency = 1
)

// CascadeAPIHandler - CascadeAPIHandler service
Expand Down Expand Up @@ -320,7 +321,7 @@ func (service *CascadeAPIHandler) Download(ctx context.Context, p *cascade.Downl
}

// Channel to control the concurrency of downloads
sem := make(chan struct{}, 3) // Max 3 concurrent downloads
sem := make(chan struct{}, downloadConcurrency) // Max 3 concurrent downloads
taskResults := make(chan *DownloadResult)
errorsChan := make(chan error)

Expand Down
6 changes: 6 additions & 0 deletions walletnode/node/grpc/download_nft.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"

"github.com/pastelnetwork/gonode/common/types"
"github.com/pastelnetwork/gonode/common/utils"

"github.com/pastelnetwork/gonode/common/errors"
"github.com/pastelnetwork/gonode/common/log"
Expand Down Expand Up @@ -39,6 +40,7 @@ func (service *downloadNft) Download(ctx context.Context, txid, timestamp, signa
defer stream.CloseSend()

// Receive file
init := 0
for {
var resp *pb.DownloadReply
resp, err = stream.Recv()
Expand All @@ -50,6 +52,10 @@ func (service *downloadNft) Download(ctx context.Context, txid, timestamp, signa
return
}
file = append(file, resp.File...)
if utils.BytesIntToMB(len(file)) > init+10 {
log.WithContext(ctx).WithField("total-recieved", utils.BytesIntToMB(len(file))).Info("received 10 MB file chunk")
init = init + 10
}
}

return
Expand Down
9 changes: 8 additions & 1 deletion walletnode/services/cascaderegister/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"path/filepath"
"sync"
"time"

Expand All @@ -23,7 +24,7 @@ import (

const (
maxConcurrentRegistrationsPerBlock = 5
uploadDataConcurrency = 3
uploadDataConcurrency = 4
)

// BlockRegistrations tracks ongoing registrations per block
Expand Down Expand Up @@ -361,6 +362,12 @@ func (task *CascadeRegistrationTask) runTicketRegActTask(ctx context.Context) (r
log.WithContext(ctx).WithError(err).Error("error waiting for Reg TXID confirmations")
return task.regCascadeTxid, "", errors.Errorf("wait reg-nft ticket valid: %w", err)
}

filePath := filepath.Join(task.service.config.CascadeFilesDir, task.Request.BaseFileID, task.Request.FileID)
if err := os.Remove(filePath); err != nil {
log.WithContext(ctx).WithField("file", filePath).WithError(err).Error("error removing file")
}

task.UpdateStatus(common.StatusTicketRegistered)
task.UpdateStatus(&common.EphemeralStatus{
StatusTitle: "Validated Cascade Reg TXID: ",
Expand Down
5 changes: 5 additions & 0 deletions walletnode/services/download/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ func (service *NftDownloadingService) GetFilenameAndSize(ctx context.Context, tx
if err != nil {
return "", 0, fmt.Errorf("unable to get action ticket: %w", err)
}
actionTicket, err := pastel.DecodeActionTicket([]byte(ticket.ActionTicketData.ActionTicket))
if err != nil {
return "", 0, fmt.Errorf("unable to decode action ticket: %w", err)
}
ticket.ActionTicketData.ActionTicketData = *actionTicket

casacdeTicket, err := ticket.ActionTicketData.ActionTicketData.APICascadeTicket()
if err != nil {
Expand Down
48 changes: 32 additions & 16 deletions walletnode/services/download/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ import (
"golang.org/x/sync/errgroup"
)

const (
requiredSuccessfulCount = 1
concurrentDownloads = 1
)

type downFile struct {
file []byte
pastelID string
Expand Down Expand Up @@ -119,14 +124,6 @@ func (task *NftDownloadingTask) run(ctx context.Context) (err error) {
return errors.Errorf("task failed after too many tries")
}

// Sign current-timestamp with PastelID passed in request
timestamp := time.Now().UTC().Format(time.RFC3339)
signature, err := task.service.pastelHandler.PastelClient.Sign(ctx, []byte(timestamp), task.Request.PastelID, task.Request.PastelIDPassphrase, pastel.SignAlgorithmED448)
if err != nil {
log.WithContext(ctx).WithError(err).WithField("txid", task.Request.Txid).WithField("timestamp", timestamp).WithField("pastelid", task.Request.PastelID).Error("Could not sign timestamp")
continue
}

log.WithContext(ctx).WithField("txid", task.Request.Txid).WithField("skip nodes", skipNodes).WithField("tries", tries).Info("Connecting to supernodes")
if err = task.MeshHandler.ConnectToNSuperNodes(ctx, task.service.config.NumberSuperNodes, skipNodes); err != nil {
log.WithContext(ctx).WithError(err).WithField("txid", task.Request.Txid).Error("Could not connect to supernodes")
Expand All @@ -139,7 +136,9 @@ func (task *NftDownloadingTask) run(ctx context.Context) (err error) {
nodesDone = task.MeshHandler.ConnectionsSupervisor(ctx, cancel)
//send download requests to ALL Supernodes, number defined by mesh handler's "minNumberSuperNodes" (really just set in a config file as NumberSuperNodes)
//or max nodes that it was able to connect with defined by mesh handler config.UseMaxNodes
downloadErrs, badNodes, err := task.Download(ctx, task.Request.Txid, timestamp, string(signature), ttxid, task.Request.Type, tInfo.EstimatedDownloadTime)
// Sign current-timestamp with PastelID passed in request

downloadErrs, badNodes, err := task.Download(ctx, task.Request.Txid, ttxid, task.Request.Type, tInfo.EstimatedDownloadTime)
if err != nil {
log.WithContext(ctx).WithError(err).WithField("txid", task.Request.Txid).WithField("download errors", downloadErrs).Error("Could not download files")
addSkipNodes(badNodes)
Expand Down Expand Up @@ -188,7 +187,7 @@ func (task *NftDownloadingTask) run(ctx context.Context) (err error) {
}

// Download downloads the file from supernodes.
func (task *NftDownloadingTask) Download(cctx context.Context, txid, timestamp, signature, ttxid, ttype string, timeout time.Duration) ([]error, []string, error) {
func (task *NftDownloadingTask) Download(cctx context.Context, txid, ttxid, ttype string, timeout time.Duration) ([]error, []string, error) {
var wg sync.WaitGroup
errChan := make(chan error, len(task.MeshHandler.Nodes))
var badNodes []string
Expand All @@ -199,7 +198,11 @@ func (task *NftDownloadingTask) Download(cctx context.Context, txid, timestamp,
defer cancel()

// Create a buffered channel to communicate successful downloads
successes := make(chan struct{}, 3)
successes := make(chan struct{}, requiredSuccessfulCount)

// Semaphore channel to limit concurrent downloads to 3
semaphore := make(chan struct{}, concurrentDownloads)

defer close(successes)

for _, someNode := range task.MeshHandler.Nodes {
Expand All @@ -210,14 +213,24 @@ func (task *NftDownloadingTask) Download(cctx context.Context, txid, timestamp,

someNode := someNode
wg.Add(1)

semaphore <- struct{}{}
go func() {
defer wg.Done()
defer func() { <-semaphore }() // Release semaphore when goroutine finishes

// Create a new context with a timeout for this goroutine
goroutineCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

file, subErr := nftDownNode.Download(goroutineCtx, txid, timestamp, signature, ttxid, ttype, task.Request.HashOnly)
timestamp := time.Now().UTC().Format(time.RFC3339)
signature, err := task.service.pastelHandler.PastelClient.Sign(ctx, []byte(timestamp), task.Request.PastelID, task.Request.PastelIDPassphrase, pastel.SignAlgorithmED448)
if err != nil {
log.WithContext(ctx).WithError(err).WithField("txid", task.Request.Txid).WithField("timestamp", timestamp).WithField("pastelid", task.Request.PastelID).Error("Could not sign timestamp")
errChan <- err
}
log.WithContext(ctx).WithField("address", someNode.String()).WithField("txid", txid).Info("Downloading from supernode")
file, subErr := nftDownNode.Download(goroutineCtx, txid, timestamp, string(signature), ttxid, ttype, task.Request.HashOnly)
if subErr != nil || len(file) == 0 {
if subErr == nil {
subErr = errors.New("empty file")
Expand Down Expand Up @@ -252,12 +265,15 @@ func (task *NftDownloadingTask) Download(cctx context.Context, txid, timestamp,

// Wait for 3 successful downloads, then cancel the context
go func() {
for i := 0; i < 3; i++ {
<-successes
for i := 0; i < requiredSuccessfulCount; i++ {
select {
case <-successes:
case <-ctx.Done():
return
}
}
cancel()
}()

wg.Wait()

close(errChan)
Expand All @@ -274,7 +290,7 @@ func (task *NftDownloadingTask) Download(cctx context.Context, txid, timestamp,
func (task *NftDownloadingTask) MatchFiles() (int, []string, error) {
var badNodes []string

if len(task.files) < 3 {
if len(task.files) < requiredSuccessfulCount {
return 0, badNodes, errors.Errorf("number of files is less than 3, no. of files - %d", len(task.files))
}

Expand Down
Loading