Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid recorder for io.ReadSeeker in anyio.NewReaderWithOpts #4790

Merged
merged 1 commit into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions zio/anyio/gzip.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,18 @@ func GzipReader(r io.Reader) (io.Reader, error) {
return rs, nil
}
}
recorder := NewRecorder(r)
track := NewTrack(recorder)
track := NewTrack(r)
// gzip.NewReader blocks until it reads ten bytes. readGzipID only
// reads two bytes.
if !readGzipID(track) {
return recorder, nil
return track.Reader(), nil
}
track.Reset()
_, err := gzip.NewReader(track)
if err == nil {
// create a new reader from recorder (track keeps a copy of read data)
return gzip.NewReader(recorder)
return gzip.NewReader(track.Reader())
}
return recorder, nil
return track.Reader(), nil
}

// RFC 1952, Section 2.3.1
Expand Down
17 changes: 8 additions & 9 deletions zio/anyio/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,26 +65,25 @@ func NewReaderWithOpts(zctx *zed.Context, r io.Reader, opts ReaderOpts) (zio.Rea
vngErr = errors.New("vng: auto-detection requires seekable input")
}

recorder := NewRecorder(r)
track := NewTrack(recorder)
track := NewTrack(r)

arrowsErr := isArrowStream(track)
if arrowsErr == nil {
return arrowio.NewReader(zctx, recorder)
return arrowio.NewReader(zctx, track.Reader())
}
arrowsErr = fmt.Errorf("arrows: %w", arrowsErr)
track.Reset()

zeekErr := match(zeekio.NewReader(zed.NewContext(), track), "zeek", 1)
if zeekErr == nil {
return zio.NopReadCloser(zeekio.NewReader(zctx, recorder)), nil
return zio.NopReadCloser(zeekio.NewReader(zctx, track.Reader())), nil
}
track.Reset()

// ZJSON must come before JSON and ZSON since it is a subset of both.
zjsonErr := match(zjsonio.NewReader(zed.NewContext(), track), "zjson", 1)
if zjsonErr == nil {
return zio.NopReadCloser(zjsonio.NewReader(zctx, recorder)), nil
return zio.NopReadCloser(zjsonio.NewReader(zctx, track.Reader())), nil
}
track.Reset()

Expand All @@ -93,13 +92,13 @@ func NewReaderWithOpts(zctx *zed.Context, r io.Reader, opts ReaderOpts) (zio.Rea
// sake of tests.
jsonErr := match(jsonio.NewReader(zed.NewContext(), track), "json", 10)
if jsonErr == nil {
return zio.NopReadCloser(jsonio.NewReader(zctx, recorder)), nil
return zio.NopReadCloser(jsonio.NewReader(zctx, track.Reader())), nil
}
track.Reset()

zsonErr := match(zsonio.NewReader(zed.NewContext(), track), "zson", 1)
if zsonErr == nil {
return zio.NopReadCloser(zsonio.NewReader(zctx, recorder)), nil
return zio.NopReadCloser(zsonio.NewReader(zctx, track.Reader())), nil
}
track.Reset()

Expand All @@ -113,7 +112,7 @@ func NewReaderWithOpts(zctx *zed.Context, r io.Reader, opts ReaderOpts) (zio.Rea
// Close zngReader to ensure that it does not continue to call track.Read.
zngReader.Close()
if zngErr == nil {
return zngio.NewReaderWithOpts(zctx, recorder, opts.ZNG), nil
return zngio.NewReaderWithOpts(zctx, track.Reader(), opts.ZNG), nil
}
track.Reset()

Expand All @@ -126,7 +125,7 @@ func NewReaderWithOpts(zctx *zed.Context, r io.Reader, opts ReaderOpts) (zio.Rea
track.Reset()
csvErr = match(csvio.NewReader(zed.NewContext(), track, opts.CSV), "csv", 1)
if csvErr == nil {
return zio.NopReadCloser(csvio.NewReader(zctx, recorder, opts.CSV)), nil
return zio.NopReadCloser(csvio.NewReader(zctx, track.Reader(), opts.CSV)), nil
}
}
track.Reset()
Expand Down
31 changes: 29 additions & 2 deletions zio/anyio/track.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,51 @@
package anyio

import "io"

const TrackSize = InitBufferSize

type Track struct {
rs io.ReadSeeker
initial int64

recorder *Recorder
off int
}

func NewTrack(r *Recorder) *Track {
func NewTrack(r io.Reader) *Track {
if rs, ok := r.(io.ReadSeeker); ok {
if n, err := rs.Seek(0, io.SeekCurrent); err == nil {
return &Track{rs: rs, initial: n}
}
}
return &Track{
recorder: r,
recorder: NewRecorder(r),
}
}

func (t *Track) Reset() {
if t.rs != nil {
// We ignore errors here under the assumption that a subsequent
// call to Read will also fail.
t.rs.Seek(t.initial, io.SeekStart)
return
}
t.off = 0
}

func (t *Track) Read(b []byte) (int, error) {
if t.rs != nil {
return t.rs.Read(b)
}
n, err := t.recorder.ReadAt(t.off, b)
t.off += n
return n, err
}

func (t *Track) Reader() io.Reader {
if t.rs != nil {
t.Reset()
return t.rs
}
return t.recorder
}
23 changes: 23 additions & 0 deletions zio/anyio/ztests/huge.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
script: |
! yes ' ' | head -c $((11 * 1024 * 1024)) > huge.zson
echo 0 >> huge.zson
zq -z huge.zson
! cat huge.zson | zq -z -

outputs:
- name: stdout
data: |
0
- name: stderr
data: |
stdio:stdin: format detection error
arrows: schema message length exceeds 1 MiB
csv: line 1: no comma found
json: buffer exceeded max size trying to infer input format
line: auto-detection not supported
parquet: auto-detection requires seekable input
vng: auto-detection requires seekable input
zeek: line 1: bad types/fields definition in zeek header
zjson: line 1: malformed ZJSON: bad type object: "": unpacker error parsing JSON: unexpected end of JSON input
zng: buffer exceeded max size trying to infer input format
zson: buffer exceeded max size trying to infer input format