Skip to content

Commit

Permalink
Use all remote-producers
Browse files Browse the repository at this point in the history
Use all of the producers described with produce-from blocks rather than
just the first

Use the standard cfg parsing entrypoint for parsing produce-from results
and get rid of the partial function
  • Loading branch information
gastrodon committed Oct 10, 2024
1 parent 8f77279 commit f09564e
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 27 deletions.
24 changes: 13 additions & 11 deletions core/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"sync"

"github.com/gastrodon/psyduck/parse"
"github.com/hashicorp/hcl/v2"
"github.com/psyduck-etl/sdk"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -239,7 +238,7 @@ func nok[T any](e error) result[T] {
return result[T]{zero, e}
}

func collectProducer(descriptor *parse.PipelineDesc, context *hcl.EvalContext, library Library, logger *logrus.Logger) (func() <-chan result[sdk.Producer], error) {
func collectProducer(descriptor *parse.PipelineDesc, library Library, logger *logrus.Logger) (func() <-chan result[sdk.Producer], error) {
if descriptor.RemoteProducers != nil {
logger.Trace("getting remote producers")
remoteProducers := make([]sdk.Producer, len(descriptor.RemoteProducers))
Expand All @@ -255,6 +254,7 @@ func collectProducer(descriptor *parse.PipelineDesc, context *hcl.EvalContext, l
pMeta := joinProducers(remoteProducers, logger)

return func() <-chan result[sdk.Producer] {
libCtx := library.Ctx()
send := make(chan result[sdk.Producer])
go func() {
mSend, mErrs := make(chan []byte), make(chan error)
Expand All @@ -275,20 +275,22 @@ func collectProducer(descriptor *parse.PipelineDesc, context *hcl.EvalContext, l
return
}

parts, diags := parse.Partial("remote-producer", msg, context)
group, diags := parse.NewFile("remote-producer", msg).Pipelines(libCtx)
if diags.HasErrors() {
send <- nok[sdk.Producer](fmt.Errorf("failed to configure remote: %s", diags))
return
}

// Should context be re-used here
p, err := library.Producer(parts.Producers[0].Kind, parts.Producers[0].Options)
if err != nil {
send <- nok[sdk.Producer](fmt.Errorf("failed to compile remote: %s", err))
return
}
for _, desc := range group.Monify().Producers {
p, err := library.Producer(desc.Kind, desc.Options)
// Should context be re-used here
if err != nil {
send <- nok[sdk.Producer](fmt.Errorf("failed to compile remote: %s", err))
return
}

send <- ok(p)
send <- ok(p)
}
}
}
}()
Expand Down Expand Up @@ -333,7 +335,7 @@ and the resulting pipeline is returned.
*/
func BuildPipeline(descriptor *parse.PipelineDesc, library Library) (*Pipeline, error) {
logger := pipelineLogger()
producer, err := collectProducer(descriptor, library.Ctx(), library, logger)
producer, err := collectProducer(descriptor, library, logger)
if err != nil {
return nil, err
}
Expand Down
110 changes: 110 additions & 0 deletions core/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
"errors"
"fmt"
"math"
"strings"
"testing"
"time"

"github.com/gastrodon/psyduck/parse"
"github.com/hashicorp/hcl/v2"
"github.com/psyduck-etl/sdk"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -187,6 +190,113 @@ func Test_RunPipeline_filtering(test *testing.T) {
}
}

func drawDiags(d hcl.Diagnostics) string {
buf := make([]string, len(d))
for i, diag := range d {
buf[i] = diag.Error()
}

return strings.Join(buf, "\n")
}

func Test_RunPipeline_remote(t *testing.T) {
pin := &sdk.Plugin{
Name: "test",
Resources: []*sdk.Resource{
{
Kinds: sdk.PRODUCER,
Name: "mono",
Spec: []*sdk.Spec{},
ProvideProducer: func(parse sdk.Parser) (sdk.Producer, error) {
cfg := new(struct {
V byte `cty:"v"`
})

if err := parse(cfg); err != nil {
return nil, err
}

return func(send chan<- []byte, errs chan<- error) {
send <- []byte{cfg.V}
close(send)
}, nil
},
},
{
Kinds: sdk.PRODUCER,
Name: "meta",
Spec: []*sdk.Spec{},
ProvideProducer: func(parse sdk.Parser) (sdk.Producer, error) {
cfg := new(struct {
V byte `cty:"v"`
C int `cty:"c"`
})

if err := parse(cfg); err != nil {
return nil, err
}

return func(send chan<- []byte, errs chan<- error) {
for i := 0; i < cfg.C; i++ {
send <- []byte(fmt.Sprintf(`
produce "mono" {
v = %d
}`, cfg.V))
}
close(send)
}, nil
},
},
},
}

lib := NewLibrary([]*sdk.Plugin{pin})

group, diags := parse.NewFile("test-remote-producers", []byte(`
produce-from "meta" {
v = 100
c = 5
}
produce-from "meta" {
v = 50
c = 10
}`)).Pipelines(lib.Ctx())
if diags.HasErrors() {
t.Fatalf("failed to parse: %s", drawDiags(diags))
}

pline, err := BuildPipeline(group.Monify(), lib)
if err != nil {
t.Fatalf("failed to build: %s", err)
}

tabs := make(map[byte]int, 2)
alt := pline.Transformer
pline.Transformer = func(in []byte) ([]byte, error) {
if v, ok := tabs[in[0]]; ok {
tabs[in[0]] = v + 1
} else {
tabs[in[0]] = 1
}

return alt(in)
}

if err := RunPipeline(pline); err != nil {
t.Fatalf("failed to run: %s", err)
}

if tabs[100] != 5 {
t.Fatalf("tabs[100] mismatch: 5 != %d", tabs[100])
}

if tabs[50] != 10 {
t.Fatalf("tabs[50] mismatch: 10 != %d", tabs[50])

}
}

type testHook struct {
fn func()
}
Expand Down
16 changes: 0 additions & 16 deletions parse/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,6 @@ type pipelineParts struct {
Transformers []*MoverDesc `hcl:"transform,block"`
}

// TODO this should take a library.Ctx! it should look more like Literal
// update: this should be swap-in replaceable with Literal
func Partial(filename string, literal []byte, context *hcl.EvalContext) (*pipelineParts, hcl.Diagnostics) {
file, diags := hclparse.NewParser().ParseHCL(literal, filename)
if diags.HasErrors() {
return nil, diags
}

resources := new(pipelineParts)
if diags := gohcl.DecodeBody(file.Body, context, resources); diags.HasErrors() {
return nil, diags
}

return resources, nil
}

type MoverDesc struct {
Kind string `hcl:"resource,label" cty:"resource"`
Options map[string]cty.Value `hcl:",remain" cty:"options"`
Expand Down

0 comments on commit f09564e

Please sign in to comment.