Skip to content

Commit

Permalink
Export GetNextSize and implement ReadNext
Browse files Browse the repository at this point in the history
It is useful to be able to efficiently copy objects without
decoding them.

My use case is filtering when I already know the indices of
the objects I want to keep, and for rewriting a dictionary
of objects as a column of objects.

This commit:

1. Exports `GetNextSize`.
2. Adds a method `ReadNext(p)` to `*Reader`.

I wasn't sure about exporting `GetNextSize`, but I can't see the
harm in it, and it may be useful for finer-grained control in
the copying.

I also experimented with a `NextReader() io.Reader` method which
returned `io.LimitReader(m.R, GetNextSize()), but for my use case
this was twice as slow and also did not handle nested objects.
In principle that might be useful for very large simple objects,
but is not included in this PR.
  • Loading branch information
pwaller committed Nov 11, 2016
1 parent ad0ff2e commit 8a1bc12
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 3 deletions.
40 changes: 38 additions & 2 deletions msgp/read.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package msgp

import (
"fmt"
"io"
"math"
"sync"
Expand Down Expand Up @@ -146,6 +147,39 @@ func (m *Reader) Read(p []byte) (int, error) {
return m.R.Read(p)
}

// ReadNext reads the raw bytes for the next object on the wire into p.
// If p is not large enough, an error is returned. See GetNextSize.
func (m *Reader) ReadNext(p []byte) (int, error) {
sz, o, err := GetNextSize(m.R)
if err != nil {
return 0, err
}
if uintptr(cap(p)) < sz {
return 0, fmt.Errorf("p not big enough (%d < %d)", len(p), sz)
}
n, err := m.R.ReadFull(p[:sz])
if err != nil {
return 0, err
}
if uintptr(n) != sz {
return 0, fmt.Errorf("wrong # bytes read (%d != %d)", n, int64(sz))
}

p = p[n:n:cap(p)]
tot := n

// for maps and slices, read elements
for x := uintptr(0); x < o; x++ {
n, err = m.ReadNext(p)
if err != nil {
return 0, err
}
p = p[n:n:cap(p)]
tot += n
}
return tot, nil
}

// ReadFull implements `io.ReadFull`
func (m *Reader) ReadFull(p []byte) (int, error) {
return m.R.ReadFull(p)
Expand Down Expand Up @@ -194,12 +228,14 @@ func (m *Reader) IsNil() bool {
return err == nil && p[0] == mnil
}

// GetNextSize returns the size of the next object on the wire.
// returns (obj size, obj elements, error)
// only maps and arrays have non-zero obj elements
// for maps and arrays, obj size does not include elements
//
// use uintptr b/c it's guaranteed to be large enough
// to hold whatever we can fit in memory.
func getNextSize(r *fwd.Reader) (uintptr, uintptr, error) {
func GetNextSize(r *fwd.Reader) (uintptr, uintptr, error) {
b, err := r.Peek(1)
if err != nil {
return 0, 0, err
Expand Down Expand Up @@ -261,7 +297,7 @@ func (m *Reader) Skip() error {
return err
}
} else {
v, o, err = getNextSize(m.R)
v, o, err = GetNextSize(m.R)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion msgp/read_bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (r Raw) Msgsize() int {
}

func appendNext(f *Reader, d *[]byte) error {
amt, o, err := getNextSize(f.R)
amt, o, err := GetNextSize(f.R)
if err != nil {
return err
}
Expand Down
44 changes: 44 additions & 0 deletions msgp/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,3 +722,47 @@ func BenchmarkSkip(b *testing.B) {
}
}
}

func TestReadNext(t *testing.T) {
var buf bytes.Buffer
en := NewWriter(&buf)

en.WriteMapHeader(6)

en.WriteString("thing_one")
en.WriteString("value_one")

en.WriteString("thing_two")
en.WriteFloat64(3.14159)

en.WriteString("some_bytes")
en.WriteBytes([]byte("nkl4321rqw908vxzpojnlk2314rqew098-s09123rdscasd"))

en.WriteString("the_time")
en.WriteTime(time.Now())

en.WriteString("what?")
en.WriteBool(true)

en.WriteString("ext")
en.WriteExtension(&RawExtension{Type: 55, Data: []byte("raw data!!!")})

en.Flush()

// Read from a copy of the original buf.
de := NewReader(bytes.NewReader(buf.Bytes()))
p := make([]byte, 0, len(buf.Bytes()))
n, err := de.ReadNext(p)
if err != nil {
t.Fatal(err)
}

if !bytes.Equal(buf.Bytes(), p[:n]) {
t.Fatalf("not equal! %v, %v", buf.Bytes(), p[:n])
}

// Copy the bytes
// cpy := make([]byte, len(buf.Bytes())
// copy(cpy, buf.Bytes())

}

0 comments on commit 8a1bc12

Please sign in to comment.