From 1f2d896bf9e180684804b4a6e13ac00aa02cb3aa Mon Sep 17 00:00:00 2001 From: Peter Waller Date: Tue, 13 Dec 2016 19:44:48 +0000 Subject: [PATCH] Add (*Reader).CopyNext(w) (int64, error) (#167) * Add (*Reader).CopyNext(w) (int64, error) It is useful to be able to efficiently copy objects without decoding them. My use case is filtering when I already know the indices of the objects I want to keep, and for rewriting a dictionary of objects as a column of objects. * Remove ReadNext * Add opportunistic optimization with m.R.Next * Remove unused ReadNextError * Remove commented code * small fixup - only call (*Reader).Next() when we're sure it won't realloc its buffer - promote io.ErrUnexpectedEOF to msgp.ErrShortBytes --- msgp/read.go | 52 +++++++++++++++++++++++++++++++++++++++++++++++ msgp/read_test.go | 45 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+) diff --git a/msgp/read.go b/msgp/read.go index 6760cd7a..b3aaa350 100644 --- a/msgp/read.go +++ b/msgp/read.go @@ -146,6 +146,56 @@ func (m *Reader) Read(p []byte) (int, error) { return m.R.Read(p) } +// CopyNext reads the next object from m without decoding it and writes it to w. +// It avoids unnecessary copies internally. +func (m *Reader) CopyNext(w io.Writer) (int64, error) { + sz, o, err := getNextSize(m.R) + if err != nil { + return 0, err + } + + var n int64 + // Opportunistic optimization: if we can fit the whole thing in the m.R + // buffer, then just get a pointer to that, and pass it to w.Write, + // avoiding an allocation. + if int(sz) <= m.R.BufferSize() { + var nn int + var buf []byte + buf, err = m.R.Next(int(sz)) + if err != nil { + if err == io.ErrUnexpectedEOF { + err = ErrShortBytes + } + return 0, err + } + nn, err = w.Write(buf) + n += int64(nn) + } else { + // Fall back to io.CopyN. + // May avoid allocating if w is a ReaderFrom (e.g. bytes.Buffer) + n, err = io.CopyN(w, m.R, int64(sz)) + if err == io.ErrUnexpectedEOF { + err = ErrShortBytes + } + } + if err != nil { + return n, err + } else if n < int64(sz) { + return n, io.ErrShortWrite + } + + // for maps and slices, read elements + for x := uintptr(0); x < o; x++ { + var n2 int64 + n2, err = m.CopyNext(w) + if err != nil { + return n, err + } + n += n2 + } + return n, nil +} + // ReadFull implements `io.ReadFull` func (m *Reader) ReadFull(p []byte) (int, error) { return m.R.ReadFull(p) @@ -194,8 +244,10 @@ func (m *Reader) IsNil() bool { return err == nil && p[0] == mnil } +// getNextSize returns the size of the next object on the wire. // returns (obj size, obj elements, error) // only maps and arrays have non-zero obj elements +// for maps and arrays, obj size does not include elements // // use uintptr b/c it's guaranteed to be large enough // to hold whatever we can fit in memory. diff --git a/msgp/read_test.go b/msgp/read_test.go index 12b4dcfd..fd6a9922 100644 --- a/msgp/read_test.go +++ b/msgp/read_test.go @@ -745,3 +745,48 @@ func BenchmarkSkip(b *testing.B) { } } } + +func TestCopyNext(t *testing.T) { + var buf bytes.Buffer + en := NewWriter(&buf) + + en.WriteMapHeader(6) + + en.WriteString("thing_one") + en.WriteString("value_one") + + en.WriteString("thing_two") + en.WriteFloat64(3.14159) + + en.WriteString("some_bytes") + en.WriteBytes([]byte("nkl4321rqw908vxzpojnlk2314rqew098-s09123rdscasd")) + + en.WriteString("the_time") + en.WriteTime(time.Now()) + + en.WriteString("what?") + en.WriteBool(true) + + en.WriteString("ext") + en.WriteExtension(&RawExtension{Type: 55, Data: []byte("raw data!!!")}) + + en.Flush() + + // Read from a copy of the original buf. + de := NewReader(bytes.NewReader(buf.Bytes())) + + w := new(bytes.Buffer) + + n, err := de.CopyNext(w) + if err != nil { + t.Fatal(err) + } + if n != int64(buf.Len()) { + t.Fatalf("CopyNext returned the wrong value (%d != %d)", + n, buf.Len()) + } + + if !bytes.Equal(buf.Bytes(), w.Bytes()) { + t.Fatalf("not equal! %v, %v", buf.Bytes(), w.Bytes()) + } +}