Skip to content

Commit

Permalink
Add (*Reader).CopyNext(w) (int64, error)
Browse files Browse the repository at this point in the history
It is useful to be able to efficiently copy objects without
decoding them.

My use case is filtering when I already know the indices of
the objects I want to keep, and for rewriting a dictionary
of objects as a column of objects.
  • Loading branch information
pwaller committed Nov 11, 2016
1 parent ad0ff2e commit 5ca5615
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 0 deletions.
16 changes: 16 additions & 0 deletions msgp/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,19 @@ func (e *ErrUnsupportedType) Error() string { return fmt.Sprintf("msgp: type %q

// Resumable returns 'true' for ErrUnsupportedType
func (e *ErrUnsupportedType) Resumable() bool { return true }

// ReadNextError is returned the buffer passed to ReadNext is not large enough.
type ReadNextError struct {
Got uintptr
Wanted uintptr
}

// Error implements the error interface
func (err ReadNextError) Error() string {
return fmt.Sprintf("msgp: slice not big enough (%d < %d)",
err.Got, err.Wanted)
}

// Resumable is always 'false' for ReadNextError, since we may be inside a
// recursive call which has already partially consumed the stream.
func (err ReadNextError) Resumable() bool { return false }
62 changes: 62 additions & 0 deletions msgp/read.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package msgp

import (
"fmt"
"io"
"math"
"sync"
Expand Down Expand Up @@ -146,6 +147,65 @@ func (m *Reader) Read(p []byte) (int, error) {
return m.R.Read(p)
}

// ReadNext reads the raw bytes for the next object on the wire into p.
// If p is not large enough, an error is returned. See GetNextSize.
func (m *Reader) ReadNext(p []byte) (int, error) {
sz, o, err := getNextSize(m.R)
if err != nil {
return 0, err
}
if uintptr(cap(p)) < sz {
return 0, ReadNextError{uintptr(len(p)), uintptr(sz)}
}
n, err := m.R.ReadFull(p[:sz])
if err != nil {
return 0, err
}
if uintptr(n) != sz {
return 0, fmt.Errorf("wrong # bytes read (%d != %d)", n, int64(sz))
}

p = p[n:n:cap(p)]
tot := n

// for maps and slices, read elements
for x := uintptr(0); x < o; x++ {
n, err = m.ReadNext(p)
if err != nil {
return 0, err
}
p = p[n:n:cap(p)]
tot += n
}
return tot, nil
}

// CopyNext reads the next object from m without decoding it and writes it to w.
// It avoids unnecessary copies internally.
func (m *Reader) CopyNext(w io.Writer) (int64, error) {
sz, o, err := getNextSize(m.R)
if err != nil {
return 0, err
}

// avoids allocating because m.R implements WriteTo.
n, err := io.CopyN(w, m.R, int64(sz))
if err != nil {
return 0, err
}

// for maps and slices, read elements
for x := uintptr(0); x < o; x++ {
var n2 int64
n2, err = m.CopyNext(w)
if err != nil {
return n, err
}
n += n2
}
return n, nil
}

// ReadFull implements `io.ReadFull`
func (m *Reader) ReadFull(p []byte) (int, error) {
return m.R.ReadFull(p)
Expand Down Expand Up @@ -194,8 +254,10 @@ func (m *Reader) IsNil() bool {
return err == nil && p[0] == mnil
}

// getNextSize returns the size of the next object on the wire.
// returns (obj size, obj elements, error)
// only maps and arrays have non-zero obj elements
// for maps and arrays, obj size does not include elements
//
// use uintptr b/c it's guaranteed to be large enough
// to hold whatever we can fit in memory.
Expand Down
90 changes: 90 additions & 0 deletions msgp/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,3 +722,93 @@ func BenchmarkSkip(b *testing.B) {
}
}
}

func TestReadNext(t *testing.T) {
var buf bytes.Buffer
en := NewWriter(&buf)

en.WriteMapHeader(6)

en.WriteString("thing_one")
en.WriteString("value_one")

en.WriteString("thing_two")
en.WriteFloat64(3.14159)

en.WriteString("some_bytes")
en.WriteBytes([]byte("nkl4321rqw908vxzpojnlk2314rqew098-s09123rdscasd"))

en.WriteString("the_time")
en.WriteTime(time.Now())

en.WriteString("what?")
en.WriteBool(true)

en.WriteString("ext")
en.WriteExtension(&RawExtension{Type: 55, Data: []byte("raw data!!!")})

en.Flush()

// Read from a copy of the original buf.
de := NewReader(bytes.NewReader(buf.Bytes()))
p := make([]byte, 0, len(buf.Bytes()))
n, err := de.ReadNext(p)
if err != nil {
t.Fatal(err)
}

if !bytes.Equal(buf.Bytes(), p[:n]) {
t.Fatalf("not equal! %v, %v", buf.Bytes(), p[:n])
}
}

func TestCopyNext(t *testing.T) {
var buf bytes.Buffer
en := NewWriter(&buf)

en.WriteMapHeader(6)

en.WriteString("thing_one")
en.WriteString("value_one")

en.WriteString("thing_two")
en.WriteFloat64(3.14159)

en.WriteString("some_bytes")
en.WriteBytes([]byte("nkl4321rqw908vxzpojnlk2314rqew098-s09123rdscasd"))

en.WriteString("the_time")
en.WriteTime(time.Now())

en.WriteString("what?")
en.WriteBool(true)

en.WriteString("ext")
en.WriteExtension(&RawExtension{Type: 55, Data: []byte("raw data!!!")})

en.Flush()

// Read from a copy of the original buf.
de := NewReader(bytes.NewReader(buf.Bytes()))

w := new(bytes.Buffer)

n, err := de.CopyNext(w)
if err != nil {
t.Fatal(err)
}
if n != int64(buf.Len()) {
t.Fatalf("CopyNext returned the wrong value (%d != %d)",
n, buf.Len())
}

// p := make([]byte, 0, len(buf.Bytes()))
// n, err := de.ReadNext(p)
// if err != nil {
// t.Fatal(err)
// }

if !bytes.Equal(buf.Bytes(), w.Bytes()) {
t.Fatalf("not equal! %v, %v", buf.Bytes(), w.Bytes())
}
}

0 comments on commit 5ca5615

Please sign in to comment.