Skip to content

Commit

Permalink
Use message headers when downloading chunks via CLI
Browse files Browse the repository at this point in the history
  • Loading branch information
kthomas committed Mar 7, 2024
1 parent 4a2176c commit 6f9f109
Showing 1 changed file with 32 additions and 16 deletions.
48 changes: 32 additions & 16 deletions internal/catalogserver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"log/slog"
"os"
"strconv"
"time"

"github.com/nats-io/nats.go"
Expand All @@ -16,14 +17,11 @@ import (

type Client struct {
nc *nats.Conn

dlResponses chan models.TypedApiResult[models.DownloadResponse]
}

func NewClient(nc *nats.Conn) *Client {
return &Client{
nc: nc,
dlResponses: make(chan models.TypedApiResult[models.DownloadResponse]),
nc: nc,
}
}

Expand Down Expand Up @@ -75,28 +73,47 @@ func (c *Client) DownloadFile(catalog string, hash string, targetPath string) er

ch := make(chan []byte)

chunkCount := 0

// subscribe refers to data that doesn't exist yet, so we can't use a closure of
// a local var...so we have to use a *sob* global
c.nc.Subscribe(subscribeSubject, func(m *nats.Msg) {
lastResponse := <-c.dlResponses
decrypted, err := targetKp.Open(m.Data, lastResponse.Data.SenderXKey)
sub, err := c.nc.Subscribe(subscribeSubject, func(m *nats.Msg) {
senderXKey := m.Header.Get("x-natster-sender-xkey")

chunkIdx, err := strconv.Atoi(m.Header.Get("x-natster-chunk-idx"))
if err != nil {
slog.Error("Failed to parse x-natster-chunk-idx header", err,
slog.String("sender_key", senderXKey),
)
}

totalChunks, err := strconv.Atoi(m.Header.Get("x-natster-total-chunks"))
if err != nil {
slog.Error("Failed to parse x-natster-chunk-idx header", err,
slog.String("sender_key", senderXKey),
)
}

decrypted, err := targetKp.Open(m.Data, senderXKey)
if err != nil {
fmt.Printf("(%+v)\n", lastResponse)
slog.Error("Failed to decrypt chunk", err,
slog.String("sender_key", lastResponse.Data.SenderXKey),
slog.Int("chunk_idx", chunkIdx),
slog.String("sender_key", senderXKey),
)
}
//_, _ = writer.Write(decrypted)

ch <- decrypted

fmt.Printf("Received chunk %d (%d bytes)\n", chunkCount, len(decrypted))
chunkCount := chunkCount + 1
if chunkCount == int(lastResponse.Data.TotalChunks) {
fmt.Printf("Received chunk %d (%d bytes)\n", chunkIdx, len(decrypted))
if chunkIdx == totalChunks-1 {
close(ch)
}
})
if err != nil {
return err
}

defer func() {
_ = sub.Unsubscribe()
}()

dlRequest := models.DownloadRequest{
Hash: hash,
Expand All @@ -113,7 +130,6 @@ func (c *Client) DownloadFile(catalog string, hash string, targetPath string) er
if err != nil {
return err
}
c.dlResponses <- newResponse

fmt.Printf("File download request acknowledged: %d bytes (%d chunks of %d bytes each.) from %s\n",
newResponse.Data.TotalBytes,
Expand Down

0 comments on commit 6f9f109

Please sign in to comment.