From c71e9ed8313d0ae003afa37a55608d51ec68674a Mon Sep 17 00:00:00 2001 From: Kalman Meth Date: Sun, 1 May 2022 11:42:48 +0300 Subject: [PATCH] made encode stage terminal --- pkg/pipeline/pipeline_builder.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/pipeline/pipeline_builder.go b/pkg/pipeline/pipeline_builder.go index 993fe98d9..2e3945e56 100644 --- a/pkg/pipeline/pipeline_builder.go +++ b/pkg/pipeline/pipeline_builder.go @@ -206,7 +206,7 @@ func isReceptor(p *pipelineEntry) bool { } func isSender(p *pipelineEntry) bool { - return p.stageType != StageWrite + return p.stageType != StageWrite && p.stageType != StageEncode } func (b *builder) getStageNode(pe *pipelineEntry, stageID string) (interface{}, error) { @@ -236,11 +236,13 @@ func (b *builder) getStageNode(pe *pipelineEntry, stageID string) (interface{}, } }) case StageEncode: - stage = node.AsTerminal(func(in <-chan []config.GenericMap) { + encode := node.AsTerminal(func(in <-chan []config.GenericMap) { for i := range in { pe.Encoder.Encode(i) } }) + b.terminalNodes = append(b.terminalNodes, encode) + stage = encode case StageTransform: stage = node.AsMiddle(func(in <-chan []config.GenericMap, out chan<- []config.GenericMap) { for i := range in {