-
Notifications
You must be signed in to change notification settings - Fork 102
/
chunkwriting.go
215 lines (180 loc) · 6.38 KB
/
chunkwriting.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
package azblob
import (
"bytes"
"context"
"encoding/base64"
"encoding/binary"
"errors"
"fmt"
"io"
"sync"
"sync/atomic"
guuid "github.com/google/uuid"
)
// blockWriter provides methods to upload blocks that represent a file to a server and commit them.
// This allows us to provide a local implementation that fakes the server for hermetic testing.
type blockWriter interface {
StageBlock(context.Context, string, io.ReadSeeker, LeaseAccessConditions, []byte, ClientProvidedKeyOptions) (*BlockBlobStageBlockResponse, error)
CommitBlockList(context.Context, []string, BlobHTTPHeaders, Metadata, BlobAccessConditions, AccessTierType, BlobTagsMap, ClientProvidedKeyOptions, ImmutabilityPolicyOptions) (*BlockBlobCommitBlockListResponse, error)
}
// copyFromReader copies a source io.Reader to blob storage using concurrent uploads.
// TODO(someone): The existing model provides a buffer size and buffer limit as limiting factors. The buffer size is probably
// useless other than needing to be above some number, as the network stack is going to hack up the buffer over some size. The
// max buffers is providing a cap on how much memory we use (by multiplying it times the buffer size) and how many go routines can upload
// at a time. I think having a single max memory dial would be more efficient. We can choose an internal buffer size that works
// well, 4 MiB or 8 MiB, and autoscale to as many goroutines within the memory limit. This gives a single dial to tweak and we can
// choose a max value for the memory setting based on internal transfers within Azure (which will give us the maximum throughput model).
// We can even provide a utility to dial this number in for customer networks to optimize their copies.
func copyFromReader(ctx context.Context, from io.Reader, to blockWriter, o UploadStreamToBlockBlobOptions) (*BlockBlobCommitBlockListResponse, error) {
if err := o.defaults(); err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
cp := &copier{
ctx: ctx,
cancel: cancel,
reader: from,
to: to,
id: newID(),
o: o,
errCh: make(chan error, 1),
}
// Send all our chunks until we get an error.
var err error
for {
if err = cp.sendChunk(); err != nil {
break
}
}
// If the error is not EOF, then we have a problem.
if err != nil && !errors.Is(err, io.EOF) {
cp.wg.Wait()
return nil, err
}
// Close out our upload.
if err := cp.close(); err != nil {
return nil, err
}
return cp.result, nil
}
// copier streams a file via chunks in parallel from a reader representing a file.
// Do not use directly, instead use copyFromReader().
type copier struct {
// ctx holds the context of a copier. This is normally a faux pas to store a Context in a struct. In this case,
// the copier has the lifetime of a function call, so its fine.
ctx context.Context
cancel context.CancelFunc
// o contains our options for uploading.
o UploadStreamToBlockBlobOptions
// id provides the ids for each chunk.
id *id
// reader is the source to be written to storage.
reader io.Reader
// to is the location we are writing our chunks to.
to blockWriter
// errCh is used to hold the first error from our concurrent writers.
errCh chan error
// wg provides a count of how many writers we are waiting to finish.
wg sync.WaitGroup
// result holds the final result from blob storage after we have submitted all chunks.
result *BlockBlobCommitBlockListResponse
}
type copierChunk struct {
buffer []byte
id string
length int
}
// getErr returns an error by priority. First, if a function set an error, it returns that error. Next, if the Context has an error
// it returns that error. Otherwise it is nil. getErr supports only returning an error once per copier.
func (c *copier) getErr() error {
select {
case err := <-c.errCh:
return err
default:
}
return c.ctx.Err()
}
// sendChunk reads data from out internal reader, creates a chunk, and sends it to be written via a channel.
// sendChunk returns io.EOF when the reader returns an io.EOF or io.ErrUnexpectedEOF.
func (c *copier) sendChunk() error {
if err := c.getErr(); err != nil {
return err
}
buffer := c.o.TransferManager.Get()
if len(buffer) == 0 {
return fmt.Errorf("TransferManager returned a 0 size buffer, this is a bug in the manager")
}
n, err := io.ReadFull(c.reader, buffer)
if n > 0 {
// Some data was read, schedule the write.
id := c.id.next()
c.wg.Add(1)
c.o.TransferManager.Run(
func() {
defer c.wg.Done()
c.write(copierChunk{buffer: buffer, id: id, length: n})
},
)
} else {
// Return the unused buffer to the manager.
c.o.TransferManager.Put(buffer)
}
if err == nil {
return nil
} else if err == io.EOF || err == io.ErrUnexpectedEOF {
return io.EOF
}
if cerr := c.getErr(); cerr != nil {
return cerr
}
return err
}
// write uploads a chunk to blob storage.
func (c *copier) write(chunk copierChunk) {
defer c.o.TransferManager.Put(chunk.buffer)
if err := c.ctx.Err(); err != nil {
return
}
_, err := c.to.StageBlock(c.ctx, chunk.id, bytes.NewReader(chunk.buffer[:chunk.length]), c.o.AccessConditions.LeaseAccessConditions, nil, c.o.ClientProvidedKeyOptions)
if err != nil {
c.errCh <- fmt.Errorf("write error: %w", err)
return
}
}
// close commits our blocks to blob storage and closes our writer.
func (c *copier) close() error {
c.wg.Wait()
if err := c.getErr(); err != nil {
return err
}
var err error
c.result, err = c.to.CommitBlockList(c.ctx, c.id.issued(), c.o.BlobHTTPHeaders, c.o.Metadata, c.o.AccessConditions, c.o.BlobAccessTier, c.o.BlobTagsMap, c.o.ClientProvidedKeyOptions, c.o.ImmutabilityPolicyOptions)
return err
}
// id allows the creation of unique IDs based on UUID4 + an int32. This auto-increments.
type id struct {
u [64]byte
num uint32
all []string
}
// newID constructs a new id.
func newID() *id {
uu := guuid.New()
u := [64]byte{}
copy(u[:], uu[:])
return &id{u: u}
}
// next returns the next ID.
func (id *id) next() string {
defer atomic.AddUint32(&id.num, 1)
binary.BigEndian.PutUint32(id.u[len(guuid.UUID{}):], atomic.LoadUint32(&id.num))
str := base64.StdEncoding.EncodeToString(id.u[:])
id.all = append(id.all, str)
return str
}
// issued returns all ids that have been issued. This returned value shares the internal slice so it is not safe to modify the return.
// The value is only valid until the next time next() is called.
func (id *id) issued() []string {
return id.all
}