Skip to content

Commit

Permalink
made encode stage terminal
Browse files Browse the repository at this point in the history
  • Loading branch information
KalmanMeth committed May 1, 2022
1 parent d8a54c1 commit c71e9ed
Showing 1 changed file with 4 additions and 2 deletions.
6 changes: 4 additions & 2 deletions pkg/pipeline/pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func isReceptor(p *pipelineEntry) bool {
}

func isSender(p *pipelineEntry) bool {
return p.stageType != StageWrite
return p.stageType != StageWrite && p.stageType != StageEncode
}

func (b *builder) getStageNode(pe *pipelineEntry, stageID string) (interface{}, error) {
Expand Down Expand Up @@ -236,11 +236,13 @@ func (b *builder) getStageNode(pe *pipelineEntry, stageID string) (interface{},
}
})
case StageEncode:
stage = node.AsTerminal(func(in <-chan []config.GenericMap) {
encode := node.AsTerminal(func(in <-chan []config.GenericMap) {
for i := range in {
pe.Encoder.Encode(i)
}
})
b.terminalNodes = append(b.terminalNodes, encode)
stage = encode
case StageTransform:
stage = node.AsMiddle(func(in <-chan []config.GenericMap, out chan<- []config.GenericMap) {
for i := range in {
Expand Down

0 comments on commit c71e9ed

Please sign in to comment.