diff --git a/pkg/pipeline/pipeline_builder.go b/pkg/pipeline/pipeline_builder.go index 993fe98d9..2e3945e56 100644 --- a/pkg/pipeline/pipeline_builder.go +++ b/pkg/pipeline/pipeline_builder.go @@ -206,7 +206,7 @@ func isReceptor(p *pipelineEntry) bool { } func isSender(p *pipelineEntry) bool { - return p.stageType != StageWrite + return p.stageType != StageWrite && p.stageType != StageEncode } func (b *builder) getStageNode(pe *pipelineEntry, stageID string) (interface{}, error) { @@ -236,11 +236,13 @@ func (b *builder) getStageNode(pe *pipelineEntry, stageID string) (interface{}, } }) case StageEncode: - stage = node.AsTerminal(func(in <-chan []config.GenericMap) { + encode := node.AsTerminal(func(in <-chan []config.GenericMap) { for i := range in { pe.Encoder.Encode(i) } }) + b.terminalNodes = append(b.terminalNodes, encode) + stage = encode case StageTransform: stage = node.AsMiddle(func(in <-chan []config.GenericMap, out chan<- []config.GenericMap) { for i := range in {