-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathtransmitter.go
63 lines (56 loc) · 1.71 KB
/
transmitter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package pump
import (
"errors"
"io"
)
var ChunkRatioInvalid = errors.New("the ratio of chunk to packet size is too high - either use larger packets or a smaller chunk")
type Transmitter struct {
readers map[Object]io.ReaderAt
chunkEncoders map[Chunk]*chunkEncoder
chunkIndex int64
chunkPacketIndexes map[Chunk]int64
}
func NewTransmitter() *Transmitter {
return &Transmitter{
readers: make(map[Object]io.ReaderAt),
chunkEncoders: make(map[Chunk]*chunkEncoder),
chunkPacketIndexes: make(map[Chunk]int64),
}
}
func (tx *Transmitter) AddObject(id string, r io.ReaderAt, totalSize int64) (o Object) {
o.ID = id
o.Size = totalSize
tx.readers[o] = r
return
}
func (tx *Transmitter) ActivateChunk(chunk Chunk) error {
if !chunk.valid() {
return ChunkRatioInvalid
}
data := make([]byte, chunk.Size)
tx.readers[chunk.Object].ReadAt(data, chunk.Offset)
tx.chunkEncoders[chunk] = chunk.encoder(data)
return nil
}
func (tx *Transmitter) GeneratePacket() (packet Packet) {
chosenChunk := tx.chooseChunk()
chosenPacketIndex := tx.choosePacketIndex(chosenChunk)
return tx.chunkEncoders[chosenChunk].generatePacket(chosenPacketIndex)
}
func (tx *Transmitter) DeactivateChunk(chunk Chunk) {}
func (tx *Transmitter) chooseChunk() Chunk {
idx := tx.chunkIndex % int64(len(tx.chunkEncoders))
tx.chunkIndex++
return tx.activeChunks()[idx]
}
func (tx *Transmitter) activeChunks() (activeChunks []Chunk) {
for c := range tx.chunkEncoders { // Not optimal, but good enough since N is usually small
activeChunks = append(activeChunks, c)
}
return
}
func (tx *Transmitter) choosePacketIndex(chunk Chunk) int64 {
idx := tx.chunkPacketIndexes[chunk]
tx.chunkPacketIndexes[chunk]++
return idx
}