-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgostream.go
83 lines (70 loc) · 2.86 KB
/
gostream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package main
import (
"context"
"encoding/binary"
"fmt"
"io"
"github.com/golang/protobuf/proto"
)
// This is forked from https://github.com/robinpowered/go-proto
type MessageCollection []proto.Message
type UnmarshalFunc func([]byte) (proto.Message, error)
const (
MaximumDataRecordLength = 1024 * 10
)
// ReadLengthPrefixedCollection reads a collection of protocol buffer messages from the supplied reader.
// Each message is presumed prefixed by a 32 bit varint which represents the size of the ensuing message.
// The UnmarshalFunc argument is a supplied callback used to convert the raw bytes read as a message to the desired message type.
// The protocol buffer message collection is returned, along with any error arising.
// For more detailed information on this approach, see the official protocol buffer documentation https://developers.google.com/protocol-buffers/docs/techniques#streaming.
func ReadLengthPrefixedCollection(ctx context.Context, maximumMessageLength uint64, r io.Reader, f UnmarshalFunc) (pbs MessageCollection, totalBytesRead int, err error) {
position := 0
for {
var prefixBuf [binary.MaxVarintLen32]byte
var bytesRead, varIntBytes int
var messageLength uint64
for varIntBytes == 0 { // i.e. no varint has been decoded yet.
if bytesRead >= len(prefixBuf) {
return pbs, position, fmt.Errorf("Invalid varint32 encountered (position = %d)", position)
}
// We have to read byte by byte here to avoid reading more bytes
// than required. Each read byte is appended to what we have
// read before.
newBytesRead, err := r.Read(prefixBuf[bytesRead : bytesRead+1])
if newBytesRead == 0 {
if io.EOF == err {
return pbs, position, nil
} else if err != nil {
return pbs, position, fmt.Errorf("Error reading length (position = %d) (%v)", position, err)
}
// A Reader should not return (0, nil), but if it does,
// it should be treated as no-op (according to the
// Reader contract). So let's go on...
continue
}
bytesRead += newBytesRead
position += newBytesRead
// Now present everything read so far to the varint decoder and
// see if a varint can be decoded already.
messageLength, varIntBytes = proto.DecodeVarint(prefixBuf[:bytesRead])
}
if messageLength > maximumMessageLength {
return pbs, position, fmt.Errorf("Refusing to allocate %d bytes (position = %d)", messageLength, position)
}
messageBuf := make([]byte, messageLength)
newBytesRead, err := io.ReadFull(r, messageBuf)
bytesRead += newBytesRead
if err != nil {
return pbs, position, fmt.Errorf("Error reading message (position = %d) (%v)", position, err)
}
pb, err := f(messageBuf)
if io.EOF == err {
return pbs, position, nil
}
if nil != err {
return nil, position, fmt.Errorf("Error handling raw message (position = %d) (%v)", position, err)
}
position += newBytesRead
pbs = append(pbs, pb)
}
}