From 54ee6f92e777b926eae908fda9f75f8f55eb2d37 Mon Sep 17 00:00:00 2001 From: Kirill Tishenkov Date: Mon, 22 Jan 2024 16:00:47 +0100 Subject: [PATCH] NCR-14625 add Apply --- apply.go | 40 ++++++++++++++++++++++++++++++++++ apply_test.go | 57 +++++++++++++++++++++++++++++++++++++++++++++++++ collect_test.go | 1 - 3 files changed, 97 insertions(+), 1 deletion(-) create mode 100644 apply.go create mode 100644 apply_test.go diff --git a/apply.go b/apply.go new file mode 100644 index 0000000..5fe8795 --- /dev/null +++ b/apply.go @@ -0,0 +1,40 @@ +package pipeline + +import "context" + +type apply[A, B, C any] struct { + a Processor[A, []B] + b Processor[B, C] +} + +func (j *apply[A, B, C]) Process(ctx context.Context, a A) ([]C, error) { + bs, err := j.a.Process(ctx, a) + if err != nil { + j.a.Cancel(a, err) + return []C{}, err + } + + cs := make([]C, 0, len(bs)) + + for i := range bs { + c, err := j.b.Process(ctx, bs[i]) + if err != nil { + j.b.Cancel(bs[i], err) + return cs, err + } + + cs = append(cs, c) + } + + return cs, nil +} + +func (j *apply[A, B, C]) Cancel(_ A, _ error) {} + +// Apply connects two processes, applying the second to each item of the first output +func Apply[A, B, C any]( + a Processor[A, []B], + b Processor[B, C], +) Processor[A, []C] { + return &apply[A, B, C]{a, b} +} diff --git a/apply_test.go b/apply_test.go new file mode 100644 index 0000000..54cf973 --- /dev/null +++ b/apply_test.go @@ -0,0 +1,57 @@ +package pipeline + +import ( + "context" + "strings" + "testing" +) + +func TestLoopApply(t *testing.T) { + transform := NewProcessor(func(_ context.Context, s string) ([]string, error) { + return strings.Split(s, ","), nil + }, nil) + + double := NewProcessor(func(_ context.Context, s string) (string, error) { + return s + s, nil + }, nil) + + addLeadingZero := NewProcessor(func(_ context.Context, s string) (string, error) { + return "0" + s, nil + }, nil) + + looper := Apply( + transform, + Sequence( + double, + addLeadingZero, + double, + ), + ) + + gotCount := 0 + input := "1,2,3,4,5" + want := []string{"011011", "022022", "033033", "044044", "055055"} + + for out := range Process(context.Background(), looper, Emit(input)) { + for j := range out { + gotCount++ + if !contains(want, out[j]) { + t.Errorf("does not contains got=%v, want=%v", out[j], want) + } + } + } + + if gotCount != len(want) { + t.Errorf("total results got=%v, want=%v", gotCount, len(want)) + } +} + +func contains(s []string, e string) bool { + for i := range s { + if s[i] == e { + return true + } + } + + return false +} diff --git a/collect_test.go b/collect_test.go index c52abca..7f66a2c 100644 --- a/collect_test.go +++ b/collect_test.go @@ -152,7 +152,6 @@ func TestCollect(t *testing.T) { if !reflect.DeepEqual(test.want.out, outs) { t.Errorf("out = %v, want %v", outs, test.want.out) } - }) } }