Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add -unbuffered flag to zed and zq #4320

Merged
merged 1 commit into from
Jan 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions cli/outputflags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/brimdata/zed/pkg/storage"
"github.com/brimdata/zed/pkg/terminal"
"github.com/brimdata/zed/pkg/terminal/color"
"github.com/brimdata/zed/zbuf"
"github.com/brimdata/zed/zio"
"github.com/brimdata/zed/zio/anyio"
"github.com/brimdata/zed/zio/emitter"
Expand All @@ -31,6 +32,7 @@ type Flags struct {
zsonPretty bool
zsonPersist string
color bool
unbuffered bool
}

func (f *Flags) Options() anyio.WriterOpts {
Expand Down Expand Up @@ -58,6 +60,7 @@ func (f *Flags) setFlags(fs *flag.FlagSet) {
"split output into one file per data type in this directory (but see -splitsize)")
fs.Var(&f.splitSize, "splitsize",
"if >0 and -split is set, split into files at least this big rather than by data type")
fs.BoolVar(&f.unbuffered, "unbuffered", false, "disable output buffering")
fs.StringVar(&f.outputFile, "o", "", "write data to output file")
}

Expand Down Expand Up @@ -112,6 +115,9 @@ func (f *Flags) Init() error {
f.Format = "zson"
f.ZSON.Pretty = 0
}
if f.unbuffered {
zbuf.ScannerBatchSize = 1
}
return nil
}

Expand All @@ -126,9 +132,9 @@ func (f *Flags) Open(ctx context.Context, engine storage.Engine) (zio.WriteClose
return nil, fmt.Errorf("-split option: %w", err)
}
if size := f.splitSize.Bytes; size > 0 {
return emitter.NewSizeSplitter(ctx, engine, dir, f.outputFile, f.WriterOpts, int64(size))
return emitter.NewSizeSplitter(ctx, engine, dir, f.outputFile, f.unbuffered, f.WriterOpts, int64(size))
}
d, err := emitter.NewSplit(ctx, engine, dir, f.outputFile, f.WriterOpts)
d, err := emitter.NewSplit(ctx, engine, dir, f.outputFile, f.unbuffered, f.WriterOpts)
if err != nil {
return nil, err
}
Expand All @@ -137,7 +143,7 @@ func (f *Flags) Open(ctx context.Context, engine storage.Engine) (zio.WriteClose
if f.outputFile == "" && f.color && terminal.IsTerminalFile(os.Stdout) {
color.Enabled = true
}
w, err := emitter.NewFileFromPath(ctx, engine, f.outputFile, f.WriterOpts)
w, err := emitter.NewFileFromPath(ctx, engine, f.outputFile, f.unbuffered, f.WriterOpts)
if err != nil {
return nil, err
}
Expand Down
16 changes: 16 additions & 0 deletions cmd/zq/ztests/unbuffered.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
script: |
mkfifo fifo
# "-i json" avoids reader buffering.
zq -i json -unbuffered -z fifo > out.zson &
# Prevent zq from seeing EOF on fifo and exiting before the shell exits.
exec {fd}> fifo
echo 1 > fifo
# Wait for out.zson to have size greater than zero.
while [ ! -s out.zson -a $((i++)) < 50 ]; do sleep 0.1; done
# Get out.zson contents now, before zq exits.
cat out.zson

outputs:
- name: stdout
data: |
1
17 changes: 7 additions & 10 deletions zio/emitter/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ import (
"github.com/brimdata/zed/zio/anyio"
)

func NewFileFromPath(ctx context.Context, engine storage.Engine, path string, opts anyio.WriterOpts) (zio.WriteCloser, error) {
func NewFileFromPath(ctx context.Context, engine storage.Engine, path string, unbuffered bool, opts anyio.WriterOpts) (zio.WriteCloser, error) {
if path == "" {
path = "stdio:stdout"
}
uri, err := storage.ParseURI(path)
if err != nil {
return nil, err
}
return NewFileFromURI(ctx, engine, uri, opts)
return NewFileFromURI(ctx, engine, uri, unbuffered, opts)
}

func IsTerminal(w io.Writer) bool {
Expand All @@ -33,23 +33,20 @@ func IsTerminal(w io.Writer) bool {
return false
}

func NewFileFromURI(ctx context.Context, engine storage.Engine, path *storage.URI, opts anyio.WriterOpts) (zio.WriteCloser, error) {
func NewFileFromURI(ctx context.Context, engine storage.Engine, path *storage.URI, unbuffered bool, opts anyio.WriterOpts) (zio.WriteCloser, error) {
f, err := engine.Put(ctx, path)
if err != nil {
return nil, err
}

var wc io.WriteCloser
wc := io.WriteCloser(f)
if path.Scheme == "stdio" {
// Don't close stdio in case we live inside something
// that has multiple stdio users.
wc = zio.NopCloser(f)
}
if !unbuffered && !IsTerminal(f) {
// Don't buffer terminal output.
if !IsTerminal(f) {
wc = bufwriter.New(wc)
}
} else {
wc = bufwriter.New(f)
wc = bufwriter.New(wc)
}
// On close, zio.WriteCloser.Close will close and flush the
// downstream writer, which will flush the bufwriter here and,
Expand Down
35 changes: 20 additions & 15 deletions zio/emitter/sizesplitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ import (
)

type sizeSplitter struct {
ctx context.Context
engine storage.Engine
dir *storage.URI
prefix string
opts anyio.WriterOpts
size int64
ctx context.Context
engine storage.Engine
dir *storage.URI
prefix string
unbuffered bool
opts anyio.WriterOpts
size int64

cwc countingWriteCloser
ext string
Expand All @@ -32,7 +33,7 @@ type sizeSplitter struct {
// creating a new file after the current one reaches size bytes. Files may
// exceed size substantially due to buffering in the underlying writer as
// determined by opts.Format.
func NewSizeSplitter(ctx context.Context, engine storage.Engine, dir *storage.URI, prefix string,
func NewSizeSplitter(ctx context.Context, engine storage.Engine, dir *storage.URI, prefix string, unbuffered bool,
opts anyio.WriterOpts, size int64) (zio.WriteCloser, error) {
ext := zio.Extension(opts.Format)
if ext == "" {
Expand All @@ -42,13 +43,14 @@ func NewSizeSplitter(ctx context.Context, engine storage.Engine, dir *storage.UR
prefix = prefix + "-"
}
return &sizeSplitter{
ctx: ctx,
engine: engine,
dir: dir,
prefix: prefix,
opts: opts,
size: size,
ext: ext,
ctx: ctx,
engine: engine,
dir: dir,
prefix: prefix,
unbuffered: unbuffered,
opts: opts,
size: size,
ext: ext,
}, nil
}

Expand Down Expand Up @@ -84,7 +86,10 @@ func (s *sizeSplitter) nextFile() error {
if err != nil {
return err
}
s.cwc = countingWriteCloser{bufwriter.New(wc), 0}
if s.unbuffered {
wc = bufwriter.New(wc)
}
s.cwc = countingWriteCloser{wc, 0}
s.zwc, err = anyio.NewWriter(&s.cwc, s.opts)
if err != nil {
wc.Close()
Expand Down
38 changes: 20 additions & 18 deletions zio/emitter/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,20 @@ import (
)

type Split struct {
ctx context.Context
dir *storage.URI
prefix string
ext string
opts anyio.WriterOpts
writers map[zed.Type]zio.WriteCloser
seen map[string]struct{}
engine storage.Engine
ctx context.Context
dir *storage.URI
prefix string
unbuffered bool
ext string
opts anyio.WriterOpts
writers map[zed.Type]zio.WriteCloser
seen map[string]struct{}
engine storage.Engine
}

var _ zio.Writer = (*Split)(nil)

func NewSplit(ctx context.Context, engine storage.Engine, dir *storage.URI, prefix string, opts anyio.WriterOpts) (*Split, error) {
func NewSplit(ctx context.Context, engine storage.Engine, dir *storage.URI, prefix string, unbuffered bool, opts anyio.WriterOpts) (*Split, error) {
e := zio.Extension(opts.Format)
if e == "" {
return nil, fmt.Errorf("unknown format: %s", opts.Format)
Expand All @@ -33,14 +34,15 @@ func NewSplit(ctx context.Context, engine storage.Engine, dir *storage.URI, pref
prefix = prefix + "-"
}
return &Split{
ctx: ctx,
dir: dir,
prefix: prefix,
ext: e,
opts: opts,
writers: make(map[zed.Type]zio.WriteCloser),
seen: make(map[string]struct{}),
engine: engine,
ctx: ctx,
dir: dir,
prefix: prefix,
unbuffered: unbuffered,
ext: e,
opts: opts,
writers: make(map[zed.Type]zio.WriteCloser),
seen: make(map[string]struct{}),
engine: engine,
}, nil
}

Expand All @@ -58,7 +60,7 @@ func (s *Split) lookupOutput(val *zed.Value) (zio.WriteCloser, error) {
if ok {
return w, nil
}
w, err := NewFileFromURI(s.ctx, s.engine, s.path(val), s.opts)
w, err := NewFileFromURI(s.ctx, s.engine, s.path(val), s.unbuffered, s.opts)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion zio/emitter/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestDirS3Source(t *testing.T) {

r := zsonio.NewReader(zed.NewContext(), strings.NewReader(input))
require.NoError(t, err)
w, err := NewSplit(context.Background(), engine, uri, "", anyio.WriterOpts{Format: "zson"})
w, err := NewSplit(context.Background(), engine, uri, "", false, anyio.WriterOpts{Format: "zson"})
require.NoError(t, err)
require.NoError(t, zio.Copy(w, r))
}
Expand Down