Skip to content

Commit

Permalink
Avoid recorder for io.ReadSeeker in anyio.NewReaderWithOpts (#4790)
Browse files Browse the repository at this point in the history
NewReaderWithOpts always creates a Recorder, which buffers its input and
can cause format detection to fail if the buffer is too small.  Avoid
that problem when the passed reader implements io.ReadSeeker by
modifying Track to create a Recorder only when the reader cannot seek.

Closes #4586.
  • Loading branch information
nwt authored Oct 2, 2023
1 parent fb9a8b3 commit 7cde67c
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 17 deletions.
10 changes: 4 additions & 6 deletions zio/anyio/gzip.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,18 @@ func GzipReader(r io.Reader) (io.Reader, error) {
return rs, nil
}
}
recorder := NewRecorder(r)
track := NewTrack(recorder)
track := NewTrack(r)
// gzip.NewReader blocks until it reads ten bytes. readGzipID only
// reads two bytes.
if !readGzipID(track) {
return recorder, nil
return track.Reader(), nil
}
track.Reset()
_, err := gzip.NewReader(track)
if err == nil {
// create a new reader from recorder (track keeps a copy of read data)
return gzip.NewReader(recorder)
return gzip.NewReader(track.Reader())
}
return recorder, nil
return track.Reader(), nil
}

// RFC 1952, Section 2.3.1
Expand Down
17 changes: 8 additions & 9 deletions zio/anyio/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,26 +65,25 @@ func NewReaderWithOpts(zctx *zed.Context, r io.Reader, opts ReaderOpts) (zio.Rea
vngErr = errors.New("vng: auto-detection requires seekable input")
}

recorder := NewRecorder(r)
track := NewTrack(recorder)
track := NewTrack(r)

arrowsErr := isArrowStream(track)
if arrowsErr == nil {
return arrowio.NewReader(zctx, recorder)
return arrowio.NewReader(zctx, track.Reader())
}
arrowsErr = fmt.Errorf("arrows: %w", arrowsErr)
track.Reset()

zeekErr := match(zeekio.NewReader(zed.NewContext(), track), "zeek", 1)
if zeekErr == nil {
return zio.NopReadCloser(zeekio.NewReader(zctx, recorder)), nil
return zio.NopReadCloser(zeekio.NewReader(zctx, track.Reader())), nil
}
track.Reset()

// ZJSON must come before JSON and ZSON since it is a subset of both.
zjsonErr := match(zjsonio.NewReader(zed.NewContext(), track), "zjson", 1)
if zjsonErr == nil {
return zio.NopReadCloser(zjsonio.NewReader(zctx, recorder)), nil
return zio.NopReadCloser(zjsonio.NewReader(zctx, track.Reader())), nil
}
track.Reset()

Expand All @@ -93,13 +92,13 @@ func NewReaderWithOpts(zctx *zed.Context, r io.Reader, opts ReaderOpts) (zio.Rea
// sake of tests.
jsonErr := match(jsonio.NewReader(zed.NewContext(), track), "json", 10)
if jsonErr == nil {
return zio.NopReadCloser(jsonio.NewReader(zctx, recorder)), nil
return zio.NopReadCloser(jsonio.NewReader(zctx, track.Reader())), nil
}
track.Reset()

zsonErr := match(zsonio.NewReader(zed.NewContext(), track), "zson", 1)
if zsonErr == nil {
return zio.NopReadCloser(zsonio.NewReader(zctx, recorder)), nil
return zio.NopReadCloser(zsonio.NewReader(zctx, track.Reader())), nil
}
track.Reset()

Expand All @@ -113,7 +112,7 @@ func NewReaderWithOpts(zctx *zed.Context, r io.Reader, opts ReaderOpts) (zio.Rea
// Close zngReader to ensure that it does not continue to call track.Read.
zngReader.Close()
if zngErr == nil {
return zngio.NewReaderWithOpts(zctx, recorder, opts.ZNG), nil
return zngio.NewReaderWithOpts(zctx, track.Reader(), opts.ZNG), nil
}
track.Reset()

Expand All @@ -126,7 +125,7 @@ func NewReaderWithOpts(zctx *zed.Context, r io.Reader, opts ReaderOpts) (zio.Rea
track.Reset()
csvErr = match(csvio.NewReader(zed.NewContext(), track, opts.CSV), "csv", 1)
if csvErr == nil {
return zio.NopReadCloser(csvio.NewReader(zctx, recorder, opts.CSV)), nil
return zio.NopReadCloser(csvio.NewReader(zctx, track.Reader(), opts.CSV)), nil
}
}
track.Reset()
Expand Down
31 changes: 29 additions & 2 deletions zio/anyio/track.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,51 @@
package anyio

import "io"

const TrackSize = InitBufferSize

type Track struct {
rs io.ReadSeeker
initial int64

recorder *Recorder
off int
}

func NewTrack(r *Recorder) *Track {
func NewTrack(r io.Reader) *Track {
if rs, ok := r.(io.ReadSeeker); ok {
if n, err := rs.Seek(0, io.SeekCurrent); err == nil {
return &Track{rs: rs, initial: n}
}
}
return &Track{
recorder: r,
recorder: NewRecorder(r),
}
}

func (t *Track) Reset() {
if t.rs != nil {
// We ignore errors here under the assumption that a subsequent
// call to Read will also fail.
t.rs.Seek(t.initial, io.SeekStart)
return
}
t.off = 0
}

func (t *Track) Read(b []byte) (int, error) {
if t.rs != nil {
return t.rs.Read(b)
}
n, err := t.recorder.ReadAt(t.off, b)
t.off += n
return n, err
}

func (t *Track) Reader() io.Reader {
if t.rs != nil {
t.Reset()
return t.rs
}
return t.recorder
}
23 changes: 23 additions & 0 deletions zio/anyio/ztests/huge.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
script: |
! yes ' ' | head -c $((11 * 1024 * 1024)) > huge.zson
echo 0 >> huge.zson
zq -z huge.zson
! cat huge.zson | zq -z -
outputs:
- name: stdout
data: |
0
- name: stderr
data: |
stdio:stdin: format detection error
arrows: schema message length exceeds 1 MiB
csv: line 1: no comma found
json: buffer exceeded max size trying to infer input format
line: auto-detection not supported
parquet: auto-detection requires seekable input
vng: auto-detection requires seekable input
zeek: line 1: bad types/fields definition in zeek header
zjson: line 1: malformed ZJSON: bad type object: "": unpacker error parsing JSON: unexpected end of JSON input
zng: buffer exceeded max size trying to infer input format
zson: buffer exceeded max size trying to infer input format

0 comments on commit 7cde67c

Please sign in to comment.