-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
chunker.go
180 lines (154 loc) · 4.27 KB
/
chunker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package sources
import (
"bufio"
"bytes"
"errors"
"io"
"github.com/trufflesecurity/trufflehog/v3/pkg/context"
)
const (
// ChunkSize is the maximum size of a chunk.
ChunkSize = 10 * 1024
// PeekSize is the size of the peek into the previous chunk.
PeekSize = 3 * 1024
// TotalChunkSize is the total size of a chunk with peek data.
TotalChunkSize = ChunkSize + PeekSize
)
// Chunker takes a chunk and splits it into chunks of ChunkSize.
func Chunker(originalChunk *Chunk) chan *Chunk {
chunkChan := make(chan *Chunk, 1)
go func() {
defer close(chunkChan)
if len(originalChunk.Data) <= TotalChunkSize {
chunkChan <- originalChunk
return
}
r := bytes.NewReader(originalChunk.Data)
reader := bufio.NewReaderSize(bufio.NewReader(r), ChunkSize)
for {
chunkBytes := make([]byte, TotalChunkSize)
chunk := *originalChunk
chunkBytes = chunkBytes[:ChunkSize]
n, err := io.ReadFull(reader, chunkBytes)
if n > 0 {
peekData, _ := reader.Peek(TotalChunkSize - n)
chunkBytes = append(chunkBytes[:n], peekData...)
chunk.Data = chunkBytes
chunkChan <- &chunk
}
if err != nil {
break
}
}
}()
return chunkChan
}
type chunkReaderConfig struct {
chunkSize int
totalSize int
peekSize int
}
// ConfigOption is a function that configures a chunker.
type ConfigOption func(*chunkReaderConfig)
// WithChunkSize sets the chunk size.
func WithChunkSize(size int) ConfigOption {
return func(c *chunkReaderConfig) {
c.chunkSize = size
}
}
// WithPeekSize sets the peek size.
func WithPeekSize(size int) ConfigOption {
return func(c *chunkReaderConfig) {
c.peekSize = size
}
}
// ChunkResult is the output unit of a ChunkReader,
// it contains the data and error of a chunk.
type ChunkResult struct {
data []byte
err error
}
// Bytes for a ChunkResult.
func (cr ChunkResult) Bytes() []byte {
return cr.data
}
// Error for a ChunkResult.
func (cr ChunkResult) Error() error {
return cr.err
}
// ChunkReader reads chunks from a reader and returns a channel of chunks and a channel of errors.
// The channel of chunks is closed when the reader is closed.
// This should be used whenever a large amount of data is read from a reader.
// Ex: reading attachments, archives, etc.
type ChunkReader func(ctx context.Context, reader io.Reader) <-chan ChunkResult
// NewChunkReader returns a ChunkReader with the given options.
func NewChunkReader(opts ...ConfigOption) ChunkReader {
config := applyOptions(opts)
return createReaderFn(config)
}
func applyOptions(opts []ConfigOption) *chunkReaderConfig {
// Set defaults.
config := &chunkReaderConfig{
chunkSize: ChunkSize, // default
peekSize: PeekSize, // default
}
for _, opt := range opts {
opt(config)
}
config.totalSize = config.chunkSize + config.peekSize
return config
}
func createReaderFn(config *chunkReaderConfig) ChunkReader {
return func(ctx context.Context, reader io.Reader) <-chan ChunkResult {
return readInChunks(ctx, reader, config)
}
}
func readInChunks(ctx context.Context, reader io.Reader, config *chunkReaderConfig) <-chan ChunkResult {
const channelSize = 1
chunkReader := bufio.NewReaderSize(reader, config.chunkSize)
chunkResultChan := make(chan ChunkResult, channelSize)
go func() {
defer close(chunkResultChan)
for {
chunkRes := ChunkResult{}
chunkBytes := make([]byte, config.totalSize)
chunkBytes = chunkBytes[:config.chunkSize]
n, err := io.ReadFull(chunkReader, chunkBytes)
if n > 0 {
peekData, _ := chunkReader.Peek(config.totalSize - n)
chunkBytes = append(chunkBytes[:n], peekData...)
chunkRes.data = chunkBytes
}
// If there is an error other than EOF, or if we have read some bytes, send the chunk.
// io.ReadFull will only return io.EOF when n == 0.
switch {
case isErrAndNotEOF(err):
ctx.Logger().Error(err, "error reading chunk")
chunkRes.err = err
case n > 0:
chunkRes.err = nil
default:
return
}
select {
case <-ctx.Done():
return
case chunkResultChan <- chunkRes:
}
if err != nil {
return
}
}
}()
return chunkResultChan
}
// reportableErr checks whether the error is one we are interested in flagging.
func isErrAndNotEOF(err error) bool {
if err == nil {
return false
}
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
return false
}
return true
}