diff --git a/core/build.go b/core/build.go index adc15ac..08c31a5 100644 --- a/core/build.go +++ b/core/build.go @@ -6,7 +6,6 @@ import ( "sync" "github.com/gastrodon/psyduck/parse" - "github.com/hashicorp/hcl/v2" "github.com/psyduck-etl/sdk" "github.com/sirupsen/logrus" ) @@ -239,7 +238,7 @@ func nok[T any](e error) result[T] { return result[T]{zero, e} } -func collectProducer(descriptor *parse.PipelineDesc, context *hcl.EvalContext, library Library, logger *logrus.Logger) (func() <-chan result[sdk.Producer], error) { +func collectProducer(descriptor *parse.PipelineDesc, library Library, logger *logrus.Logger) (func() <-chan result[sdk.Producer], error) { if descriptor.RemoteProducers != nil { logger.Trace("getting remote producers") remoteProducers := make([]sdk.Producer, len(descriptor.RemoteProducers)) @@ -255,6 +254,7 @@ func collectProducer(descriptor *parse.PipelineDesc, context *hcl.EvalContext, l pMeta := joinProducers(remoteProducers, logger) return func() <-chan result[sdk.Producer] { + libCtx := library.Ctx() send := make(chan result[sdk.Producer]) go func() { mSend, mErrs := make(chan []byte), make(chan error) @@ -275,20 +275,22 @@ func collectProducer(descriptor *parse.PipelineDesc, context *hcl.EvalContext, l return } - parts, diags := parse.Partial("remote-producer", msg, context) + group, diags := parse.NewFile("remote-producer", msg).Pipelines(libCtx) if diags.HasErrors() { send <- nok[sdk.Producer](fmt.Errorf("failed to configure remote: %s", diags)) return } - // Should context be re-used here - p, err := library.Producer(parts.Producers[0].Kind, parts.Producers[0].Options) - if err != nil { - send <- nok[sdk.Producer](fmt.Errorf("failed to compile remote: %s", err)) - return - } + for _, desc := range group.Monify().Producers { + p, err := library.Producer(desc.Kind, desc.Options) + // Should context be re-used here + if err != nil { + send <- nok[sdk.Producer](fmt.Errorf("failed to compile remote: %s", err)) + return + } - send <- ok(p) + send <- ok(p) + } } } }() @@ -333,7 +335,7 @@ and the resulting pipeline is returned. */ func BuildPipeline(descriptor *parse.PipelineDesc, library Library) (*Pipeline, error) { logger := pipelineLogger() - producer, err := collectProducer(descriptor, library.Ctx(), library, logger) + producer, err := collectProducer(descriptor, library, logger) if err != nil { return nil, err } diff --git a/core/run_test.go b/core/run_test.go index cd12a8a..992773a 100644 --- a/core/run_test.go +++ b/core/run_test.go @@ -4,9 +4,12 @@ import ( "errors" "fmt" "math" + "strings" "testing" "time" + "github.com/gastrodon/psyduck/parse" + "github.com/hashicorp/hcl/v2" "github.com/psyduck-etl/sdk" "github.com/sirupsen/logrus" ) @@ -187,6 +190,113 @@ func Test_RunPipeline_filtering(test *testing.T) { } } +func drawDiags(d hcl.Diagnostics) string { + buf := make([]string, len(d)) + for i, diag := range d { + buf[i] = diag.Error() + } + + return strings.Join(buf, "\n") +} + +func Test_RunPipeline_remote(t *testing.T) { + pin := &sdk.Plugin{ + Name: "test", + Resources: []*sdk.Resource{ + { + Kinds: sdk.PRODUCER, + Name: "mono", + Spec: []*sdk.Spec{}, + ProvideProducer: func(parse sdk.Parser) (sdk.Producer, error) { + cfg := new(struct { + V byte `cty:"v"` + }) + + if err := parse(cfg); err != nil { + return nil, err + } + + return func(send chan<- []byte, errs chan<- error) { + send <- []byte{cfg.V} + close(send) + }, nil + }, + }, + { + Kinds: sdk.PRODUCER, + Name: "meta", + Spec: []*sdk.Spec{}, + ProvideProducer: func(parse sdk.Parser) (sdk.Producer, error) { + cfg := new(struct { + V byte `cty:"v"` + C int `cty:"c"` + }) + + if err := parse(cfg); err != nil { + return nil, err + } + + return func(send chan<- []byte, errs chan<- error) { + for i := 0; i < cfg.C; i++ { + send <- []byte(fmt.Sprintf(` + produce "mono" { + v = %d + }`, cfg.V)) + } + close(send) + }, nil + }, + }, + }, + } + + lib := NewLibrary([]*sdk.Plugin{pin}) + + group, diags := parse.NewFile("test-remote-producers", []byte(` + produce-from "meta" { + v = 100 + c = 5 + } + + produce-from "meta" { + v = 50 + c = 10 + }`)).Pipelines(lib.Ctx()) + if diags.HasErrors() { + t.Fatalf("failed to parse: %s", drawDiags(diags)) + } + + pline, err := BuildPipeline(group.Monify(), lib) + if err != nil { + t.Fatalf("failed to build: %s", err) + } + + tabs := make(map[byte]int, 2) + alt := pline.Transformer + pline.Transformer = func(in []byte) ([]byte, error) { + if v, ok := tabs[in[0]]; ok { + tabs[in[0]] = v + 1 + } else { + tabs[in[0]] = 1 + } + + return alt(in) + } + + if err := RunPipeline(pline); err != nil { + t.Fatalf("failed to run: %s", err) + } + + if tabs[100] != 5 { + t.Fatalf("tabs[100] mismatch: 5 != %d", tabs[100]) + } + + if tabs[50] != 10 { + t.Fatalf("tabs[50] mismatch: 10 != %d", tabs[50]) + + } +} + type testHook struct { fn func() } diff --git a/parse/pipeline.go b/parse/pipeline.go index e0f58df..7023d56 100644 --- a/parse/pipeline.go +++ b/parse/pipeline.go @@ -13,22 +13,6 @@ type pipelineParts struct { Transformers []*MoverDesc `hcl:"transform,block"` } -// TODO this should take a library.Ctx! it should look more like Literal -// update: this should be swap-in replaceable with Literal -func Partial(filename string, literal []byte, context *hcl.EvalContext) (*pipelineParts, hcl.Diagnostics) { - file, diags := hclparse.NewParser().ParseHCL(literal, filename) - if diags.HasErrors() { - return nil, diags - } - - resources := new(pipelineParts) - if diags := gohcl.DecodeBody(file.Body, context, resources); diags.HasErrors() { - return nil, diags - } - - return resources, nil -} - type MoverDesc struct { Kind string `hcl:"resource,label" cty:"resource"` Options map[string]cty.Value `hcl:",remain" cty:"options"`