Skip to content

Commit

Permalink
output operator (#5191)
Browse files Browse the repository at this point in the history
Add output operator which allows users to set the name of the resultant
channel. Channels are no longer identified by an integer but instead
with a string- if a channel doesn't not have an output it will be
assigned to channel "main".
  • Loading branch information
mattnibs authored Aug 6, 2024
1 parent 038a6ef commit 343ac63
Show file tree
Hide file tree
Showing 50 changed files with 2,462 additions and 2,187 deletions.
4 changes: 2 additions & 2 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ type QueryRequest struct {
}

type QueryChannelSet struct {
ChannelID int `json:"channel_id" zed:"channel_id"`
Channel string `json:"channel" zed:"channel"`
}

type QueryChannelEnd struct {
ChannelID int `json:"channel_id" zed:"channel_id"`
Channel string `json:"channel" zed:"channel"`
}

type QueryError struct {
Expand Down
4 changes: 2 additions & 2 deletions api/queryio/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ func (q *Query) Read() (*zed.Value, error) {
func controlToError(ctrl interface{}) error {
switch ctrl := ctrl.(type) {
case *api.QueryChannelSet:
return &zbuf.Control{Message: zbuf.SetChannel(ctrl.ChannelID)}
return &zbuf.Control{Message: zbuf.SetChannel(ctrl.Channel)}
case *api.QueryChannelEnd:
return &zbuf.Control{Message: zbuf.EndChannel(ctrl.ChannelID)}
return &zbuf.Control{Message: zbuf.EndChannel(ctrl.Channel)}
case *api.QueryStats:
return &zbuf.Control{Message: ctrl.Progress}
case *api.QueryError:
Expand Down
15 changes: 7 additions & 8 deletions api/queryio/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type controlWriter interface {
}

type Writer struct {
cid int
channel string
start nano.Ts
writer zio.WriteCloser
ctrl bool
Expand All @@ -27,7 +27,6 @@ type Writer struct {

func NewWriter(w io.WriteCloser, format string, flusher http.Flusher, ctrl bool) (*Writer, error) {
d := &Writer{
cid: -1,
ctrl: ctrl,
start: nano.Now(),
flusher: flusher,
Expand All @@ -51,19 +50,19 @@ func NewWriter(w io.WriteCloser, format string, flusher http.Flusher, ctrl bool)
return d, err
}

func (w *Writer) WriteBatch(cid int, batch zbuf.Batch) error {
if w.cid != cid {
w.cid = cid
if err := w.WriteControl(api.QueryChannelSet{ChannelID: cid}); err != nil {
func (w *Writer) WriteBatch(channel string, batch zbuf.Batch) error {
if w.channel != channel {
w.channel = channel
if err := w.WriteControl(api.QueryChannelSet{Channel: channel}); err != nil {
return err
}
}
defer batch.Unref()
return zbuf.WriteBatch(w.writer, batch)
}

func (w *Writer) WhiteChannelEnd(channelID int) error {
return w.WriteControl(api.QueryChannelEnd{ChannelID: channelID})
func (w *Writer) WhiteChannelEnd(channel string) error {
return w.WriteControl(api.QueryChannelEnd{Channel: channel})
}

func (w *Writer) WriteProgress(stats zbuf.Progress) error {
Expand Down
8 changes: 4 additions & 4 deletions api/queryio/zjson_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@ import (
func TestZJSONWriter(t *testing.T) {
const record = `{x:1}`
const expected = `
{"type":"QueryChannelSet","value":{"channel_id":1}}
{"type":"QueryChannelSet","value":{"channel":"main"}}
{"type":{"kind":"record","id":30,"fields":[{"name":"x","type":{"kind":"primitive","name":"int64"}}]},"value":["1"]}
{"type":"QueryChannelEnd","value":{"channel_id":1}}
{"type":"QueryChannelEnd","value":{"channel":"main"}}
{"type":"QueryError","value":{"error":"test.err"}}
`
var buf bytes.Buffer
w := queryio.NewZJSONWriter(&buf)
err := w.WriteControl(api.QueryChannelSet{ChannelID: 1})
err := w.WriteControl(api.QueryChannelSet{Channel: "main"})
require.NoError(t, err)
arena := zed.NewArena()
defer arena.Unref()
err = w.Write(zson.MustParseValue(zed.NewContext(), arena, record))
require.NoError(t, err)
err = w.WriteControl(api.QueryChannelEnd{ChannelID: 1})
err = w.WriteControl(api.QueryChannelEnd{Channel: "main"})
require.NoError(t, err)
err = w.WriteControl(api.QueryError{Error: "test.err"})
require.NoError(t, err)
Expand Down
8 changes: 8 additions & 0 deletions compiler/ast/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,11 @@ type (
Expr Expr `json:"expr"`
Text string `json:"text"`
}
Output struct {
Kind string `json:"kind" unpack:""`
KeywordPos int `json:"keyword_pos"`
Name *ID `json:"name"`
}
)

// Source structure
Expand Down Expand Up @@ -775,6 +780,7 @@ func (*Yield) OpAST() {}
func (*Sample) OpAST() {}
func (*Load) OpAST() {}
func (*Assert) OpAST() {}
func (*Output) OpAST() {}

func (x *Scope) Pos() int {
if x.Decls != nil {
Expand Down Expand Up @@ -810,6 +816,7 @@ func (x *Yield) Pos() int { return x.KeywordPos }
func (x *Sample) Pos() int { return x.KeywordPos }
func (x *Load) Pos() int { return x.KeywordPos }
func (x *Assert) Pos() int { return x.KeywordPos }
func (x *Output) Pos() int { return x.KeywordPos }

func (x *Scope) End() int { return x.Body.End() }
func (x *Parallel) End() int { return x.Rparen }
Expand Down Expand Up @@ -918,6 +925,7 @@ func (x *Sample) End() int {
}
func (x *Load) End() int { return x.EndPos }
func (x *Assert) End() int { return x.Expr.End() }
func (x *Output) End() int { return x.Name.End() }

// An Agg is an AST node that represents a aggregate function. The Name
// field indicates the aggregation method while the Expr field indicates
Expand Down
5 changes: 5 additions & 0 deletions compiler/ast/dag/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ type (
Vars []Def `json:"vars"`
Body Seq `json:"body"`
}
Output struct {
Kind string `json:"kind" unpack:""`
Name string `json:"name"`
}
Pass struct {
Kind string `json:"kind" unpack:""`
}
Expand Down Expand Up @@ -313,6 +317,7 @@ func (*Merge) OpNode() {}
func (*Combine) OpNode() {}
func (*Scope) OpNode() {}
func (*Load) OpNode() {}
func (*Output) OpNode() {}

// NewFilter returns a filter node for e.
func NewFilter(e Expr) *Filter {
Expand Down
1 change: 1 addition & 0 deletions compiler/ast/dag/unpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var unpacker = unpack.New(
MapCall{},
MapExpr{},
Merge{},
Output{},
Over{},
OverExpr{},
Pass{},
Expand Down
1 change: 1 addition & 0 deletions compiler/ast/unpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var unpacker = unpack.New(
Join{},
Load{},
Merge{},
Output{},
Over{},
Trunk{},
astzed.Map{},
Expand Down
27 changes: 27 additions & 0 deletions compiler/describe/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func (*Pool) Source() {}
func (*Path) Source() {}

type Channel struct {
Name string `json:"name"`
AggregationKeys field.List `json:"aggregation_keys"`
Sort *order.SortKey `json:"sort"`
}
Expand Down Expand Up @@ -82,17 +83,30 @@ func AnalyzeDAG(ctx context.Context, entry dag.Seq, src *data.Source, head *lake
return nil, err
}
aggKeys := describeAggs(entry, []field.List{nil})
outputs := collectOutputs(entry)
m := make(map[string]int)
for i := range sortKeys {
// Convert SortKey to a pointer so a nil sort is encoded as null for
// JSON/ZSON.
var s *order.SortKey
if !sortKeys[i].IsNil() {
s = &sortKeys[i]
}
name := outputs[i].Name
if k, ok := m[name]; ok {
// If output already exists, this means the outputs will be
// combined so nil everything out.
// XXX This is currently what happens but is this right?
c := &info.Channels[k]
c.Sort, c.AggregationKeys = nil, nil
continue
}
info.Channels = append(info.Channels, Channel{
Name: name,
Sort: s,
AggregationKeys: aggKeys[i],
})
m[name] = i
}
return &info, nil
}
Expand Down Expand Up @@ -183,3 +197,16 @@ func describeOpAggs(op dag.Op, parents []field.List) []field.List {
}
return parents
}

func collectOutputs(seq dag.Seq) []*dag.Output {
var outputs []*dag.Output
optimizer.Walk(seq, func(seq dag.Seq) dag.Seq {
if len(seq) > 0 {
if o, ok := seq[len(seq)-1].(*dag.Output); ok {
outputs = append(outputs, o)
}
}
return seq
})
return outputs
}
8 changes: 5 additions & 3 deletions compiler/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type Job struct {
rctx *runtime.Context
builder *kernel.Builder
optimizer *optimizer.Optimizer
outputs []zbuf.Puller
outputs map[string]zbuf.Puller
puller zbuf.Puller
entry dag.Seq
}
Expand Down Expand Up @@ -123,7 +123,9 @@ func (j *Job) Puller() zbuf.Puller {
case 0:
return nil
case 1:
j.puller = op.NewCatcher(op.NewSingle(outputs[0]))
for k, p := range outputs {
j.puller = op.NewCatcher(op.NewSingle(k, p))
}
default:
j.puller = op.NewMux(j.rctx, outputs)
}
Expand Down Expand Up @@ -190,7 +192,7 @@ func VectorFilterCompile(rctx *runtime.Context, query string, src *data.Source,
}
return nil, err
}
if len(entry) != 1 {
if len(entry) != 2 {
return nil, errors.New("filter query must have a single op")
}
f, ok := entry[0].(*dag.Filter)
Expand Down
21 changes: 19 additions & 2 deletions compiler/kernel/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type Builder struct {
readers []zio.Reader
progress *zbuf.Progress
arena *zed.Arena // For zed.Values created during compilation.
channels map[string][]zbuf.Puller
deletes *sync.Map
udfs map[string]dag.Expr
compiledUDFs map[string]*expr.UDF
Expand All @@ -77,6 +78,7 @@ func NewBuilder(rctx *runtime.Context, source *data.Source) *Builder {
RecordsMatched: 0,
},
arena: arena,
channels: make(map[string][]zbuf.Puller),
udfs: make(map[string]dag.Expr),
compiledUDFs: make(map[string]*expr.UDF),
}
Expand All @@ -90,12 +92,24 @@ func (b *Builder) clone(arena *zed.Arena) *Builder {

// Build builds a flowgraph for seq. If seq contains a dag.DefaultSource, it
// will read from readers.
func (b *Builder) Build(seq dag.Seq, readers ...zio.Reader) ([]zbuf.Puller, error) {
func (b *Builder) Build(seq dag.Seq, readers ...zio.Reader) (map[string]zbuf.Puller, error) {
if !isEntry(seq) {
return nil, errors.New("internal error: DAG entry point is not a data source")
}
b.readers = readers
return b.compileSeq(seq, nil)

if _, err := b.compileSeq(seq, nil); err != nil {
return nil, err
}
channels := make(map[string]zbuf.Puller)
for key, pullers := range b.channels {
if len(pullers) == 1 {
channels[key] = pullers[0]
} else {
channels[key] = combine.New(b.rctx, pullers)
}
}
return channels, nil
}

func (b *Builder) BuildWithPuller(seq dag.Seq, parent vector.Puller) ([]vector.Puller, error) {
Expand Down Expand Up @@ -375,6 +389,9 @@ func (b *Builder) compileLeaf(o dag.Op, parent zbuf.Puller) (zbuf.Puller, error)
}
//XXX
return nil, errors.New("dag.Vectorize must begin with SeqScan")
case *dag.Output:
b.channels[v.Name] = append(b.channels[v.Name], parent)
return parent, nil
default:
return nil, fmt.Errorf("unknown DAG operator type: %v", v)
}
Expand Down
3 changes: 3 additions & 0 deletions compiler/kernel/vop.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ func (b *Builder) compileVamLeaf(o dag.Op, parent vector.Puller) (vector.Puller,
return nil, err
}
return vamop.NewYield(b.rctx.Zctx, parent, exprs), nil
case *dag.Output:
// XXX Ignore Output op for vectors for now.
return parent, nil
default:
return nil, fmt.Errorf("internal error: unknown dag.Op while compiling for vector runtime: %#v", o)
}
Expand Down
2 changes: 1 addition & 1 deletion compiler/optimizer/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (o *Optimizer) analyzeSortKey(op dag.Op, in order.SortKey) (order.SortKey,
case *dag.Lister:
// This shouldn't happen.
return order.Nil, errors.New("internal error: dag.Lister encountered in anaylzeSortKey")
case *dag.Filter, *dag.Head, *dag.Pass, *dag.Uniq, *dag.Tail, *dag.Fuse:
case *dag.Filter, *dag.Head, *dag.Pass, *dag.Uniq, *dag.Tail, *dag.Fuse, *dag.Output:
return in, nil
case *dag.Cut:
return analyzeCuts(op.Args, in), nil
Expand Down
18 changes: 15 additions & 3 deletions compiler/optimizer/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ func removePassOps(seq dag.Seq) dag.Seq {
})
}

func Walk(seq dag.Seq, post func(dag.Seq) dag.Seq) dag.Seq {
return walk(seq, true, post)
}

func walk(seq dag.Seq, over bool, post func(dag.Seq) dag.Seq) dag.Seq {
for _, op := range seq {
switch op := op.(type) {
Expand Down Expand Up @@ -141,7 +145,7 @@ func (o *Optimizer) Optimize(seq dag.Seq) (dag.Seq, error) {
}

func (o *Optimizer) OptimizeDeleter(seq dag.Seq, replicas int) (dag.Seq, error) {
if len(seq) != 2 {
if len(seq) != 3 {
return nil, errors.New("internal error: bad deleter structure")
}
scan, ok := seq[0].(*dag.DeleteScan)
Expand All @@ -152,6 +156,10 @@ func (o *Optimizer) OptimizeDeleter(seq dag.Seq, replicas int) (dag.Seq, error)
if !ok {
return nil, errors.New("internal error: bad deleter structure")
}
output, ok := seq[2].(*dag.Output)
if !ok {
return nil, errors.New("internal error: bad deleter structure")
}
lister := &dag.Lister{
Kind: "Lister",
Pool: scan.ID,
Expand Down Expand Up @@ -182,7 +190,7 @@ func (o *Optimizer) OptimizeDeleter(seq dag.Seq, replicas int) (dag.Seq, error)
Order: sortKey.Order,
}
}
return dag.Seq{lister, scatter, merge}, nil
return dag.Seq{lister, scatter, merge, output}, nil
}

func (o *Optimizer) optimizeSourcePaths(seq dag.Seq) (dag.Seq, error) {
Expand Down Expand Up @@ -247,7 +255,11 @@ func (o *Optimizer) optimizeSourcePaths(seq dag.Seq) (dag.Seq, error) {
// in a normal filtering operation.
op.KeyPruner = maybeNewRangePruner(filter, sortKey)
// Delete the downstream operators when we are tapping the object list.
seq = dag.Seq{op}
o, ok := seq[len(seq)-1].(*dag.Output)
if !ok {
o = &dag.Output{Kind: "Output", Name: "main"}
}
seq = dag.Seq{op, o}
}
case *dag.DefaultScan:
op.Filter = filter
Expand Down
2 changes: 1 addition & 1 deletion compiler/optimizer/parallelize.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func (o *Optimizer) concurrentPath(ops []dag.Op, sortKey order.SortKey) (int, or
// upstream sort is the same as the Load destination sort we
// request a merge and set the Load operator to do a sorted write.
return k, order.Nil, false, false, nil
case *dag.Fork, *dag.Scatter, *dag.Head, *dag.Tail, *dag.Uniq, *dag.Fuse, *dag.Join:
case *dag.Fork, *dag.Scatter, *dag.Head, *dag.Tail, *dag.Uniq, *dag.Fuse, *dag.Join, *dag.Output:
return k, sortKey, true, true, nil
default:
next, err := o.analyzeSortKey(op, sortKey)
Expand Down
Loading

0 comments on commit 343ac63

Please sign in to comment.