Skip to content

Commit

Permalink
Add (*Reader).CopyNext(w) (int64, error) (tinylib#167)
Browse files Browse the repository at this point in the history
* Add (*Reader).CopyNext(w) (int64, error)

It is useful to be able to efficiently copy objects without
decoding them.

My use case is filtering when I already know the indices of
the objects I want to keep, and for rewriting a dictionary
of objects as a column of objects.

* Remove ReadNext

* Add opportunistic optimization with m.R.Next

* Remove unused ReadNextError

* Remove commented code

* small fixup

- only call (*Reader).Next() when we're sure it won't realloc its buffer
- promote io.ErrUnexpectedEOF to msgp.ErrShortBytes
  • Loading branch information
pwaller authored and Sergey Kamardin committed Nov 20, 2017
1 parent 8eedca8 commit 1f2d896
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 0 deletions.
52 changes: 52 additions & 0 deletions msgp/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,56 @@ func (m *Reader) Read(p []byte) (int, error) {
return m.R.Read(p)
}

// CopyNext reads the next object from m without decoding it and writes it to w.
// It avoids unnecessary copies internally.
func (m *Reader) CopyNext(w io.Writer) (int64, error) {
sz, o, err := getNextSize(m.R)
if err != nil {
return 0, err
}

var n int64
// Opportunistic optimization: if we can fit the whole thing in the m.R
// buffer, then just get a pointer to that, and pass it to w.Write,
// avoiding an allocation.
if int(sz) <= m.R.BufferSize() {
var nn int
var buf []byte
buf, err = m.R.Next(int(sz))
if err != nil {
if err == io.ErrUnexpectedEOF {
err = ErrShortBytes
}
return 0, err
}
nn, err = w.Write(buf)
n += int64(nn)
} else {
// Fall back to io.CopyN.
// May avoid allocating if w is a ReaderFrom (e.g. bytes.Buffer)
n, err = io.CopyN(w, m.R, int64(sz))
if err == io.ErrUnexpectedEOF {
err = ErrShortBytes
}
}
if err != nil {
return n, err
} else if n < int64(sz) {
return n, io.ErrShortWrite
}

// for maps and slices, read elements
for x := uintptr(0); x < o; x++ {
var n2 int64
n2, err = m.CopyNext(w)
if err != nil {
return n, err
}
n += n2
}
return n, nil
}

// ReadFull implements `io.ReadFull`
func (m *Reader) ReadFull(p []byte) (int, error) {
return m.R.ReadFull(p)
Expand Down Expand Up @@ -194,8 +244,10 @@ func (m *Reader) IsNil() bool {
return err == nil && p[0] == mnil
}

// getNextSize returns the size of the next object on the wire.
// returns (obj size, obj elements, error)
// only maps and arrays have non-zero obj elements
// for maps and arrays, obj size does not include elements
//
// use uintptr b/c it's guaranteed to be large enough
// to hold whatever we can fit in memory.
Expand Down
45 changes: 45 additions & 0 deletions msgp/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -745,3 +745,48 @@ func BenchmarkSkip(b *testing.B) {
}
}
}

func TestCopyNext(t *testing.T) {
var buf bytes.Buffer
en := NewWriter(&buf)

en.WriteMapHeader(6)

en.WriteString("thing_one")
en.WriteString("value_one")

en.WriteString("thing_two")
en.WriteFloat64(3.14159)

en.WriteString("some_bytes")
en.WriteBytes([]byte("nkl4321rqw908vxzpojnlk2314rqew098-s09123rdscasd"))

en.WriteString("the_time")
en.WriteTime(time.Now())

en.WriteString("what?")
en.WriteBool(true)

en.WriteString("ext")
en.WriteExtension(&RawExtension{Type: 55, Data: []byte("raw data!!!")})

en.Flush()

// Read from a copy of the original buf.
de := NewReader(bytes.NewReader(buf.Bytes()))

w := new(bytes.Buffer)

n, err := de.CopyNext(w)
if err != nil {
t.Fatal(err)
}
if n != int64(buf.Len()) {
t.Fatalf("CopyNext returned the wrong value (%d != %d)",
n, buf.Len())
}

if !bytes.Equal(buf.Bytes(), w.Bytes()) {
t.Fatalf("not equal! %v, %v", buf.Bytes(), w.Bytes())
}
}

0 comments on commit 1f2d896

Please sign in to comment.