diff --git a/pkg/node/manytoone.go b/pkg/node/manytoone.go index 6f02c742..2b5bbd33 100644 --- a/pkg/node/manytoone.go +++ b/pkg/node/manytoone.go @@ -96,8 +96,8 @@ func (n *ManyToOneNode) forward(index int) port.Listener { defer n.mu.RUnlock() inReader := n.inPorts[index].Open(proc) - outWriter := n.outPort.Open(proc) - errWriter := n.errPort.Open(proc) + var outWriter *packet.Writer + var errWriter *packet.Writer readGroup, _ := n.readGroups.LoadOrStore(proc, func() (*packet.ReadGroup, error) { inReaders := make([]*packet.Reader, len(n.inPorts)) @@ -110,6 +110,13 @@ func (n *ManyToOneNode) forward(index int) port.Listener { for inPck := range inReader.Read() { n.tracer.Read(inReader, inPck) + if outWriter == nil { + outWriter = n.outPort.Open(proc) + } + if errWriter == nil { + errWriter = n.errPort.Open(proc) + } + if inPcks := readGroup.Read(inReader, inPck); len(inPcks) < len(n.inPorts) { n.tracer.Reduce(inPck) } else if outPck, errPck := n.action(proc, inPcks); errPck != nil { diff --git a/pkg/node/onetomany.go b/pkg/node/onetomany.go index 48fef2a6..8e4d1cd5 100644 --- a/pkg/node/onetomany.go +++ b/pkg/node/onetomany.go @@ -94,15 +94,21 @@ func (n *OneToManyNode) forward(proc *process.Process) { defer n.mu.RUnlock() inReader := n.inPort.Open(proc) - outWriters := make([]*packet.Writer, len(n.outPorts)) - for i, outPort := range n.outPorts { - outWriters[i] = outPort.Open(proc) - } - errWriter := n.errPort.Open(proc) + outWriters := make([]*packet.Writer, 0, len(n.outPorts)) + var errWriter *packet.Writer for inPck := range inReader.Read() { n.tracer.Read(inReader, inPck) + if len(outWriters) == 0 { + for _, outPort := range n.outPorts { + outWriters = append(outWriters, outPort.Open(proc)) + } + } + if errWriter == nil { + errWriter = n.errPort.Open(proc) + } + if outPcks, errPck := n.action(proc, inPck); errPck != nil { n.tracer.Transform(inPck, errPck) n.tracer.Write(errWriter, errPck) diff --git a/pkg/node/onetoone.go b/pkg/node/onetoone.go index f9773845..a78f933b 100644 --- a/pkg/node/onetoone.go +++ b/pkg/node/onetoone.go @@ -69,12 +69,19 @@ func (n *OneToOneNode) Close() error { func (n *OneToOneNode) forward(proc *process.Process) { inReader := n.inPort.Open(proc) - outWriter := n.outPort.Open(proc) - errWriter := n.errPort.Open(proc) + var outWriter *packet.Writer + var errWriter *packet.Writer for inPck := range inReader.Read() { n.tracer.Read(inReader, inPck) + if outWriter == nil { + outWriter = n.outPort.Open(proc) + } + if errWriter == nil { + errWriter = n.errPort.Open(proc) + } + if outPck, errPck := n.action(proc, inPck); errPck != nil { n.tracer.Transform(inPck, errPck) n.tracer.Write(errWriter, errPck) diff --git a/pkg/symbol/cluster.go b/pkg/symbol/cluster.go index 261fe892..b0fb9c8c 100644 --- a/pkg/symbol/cluster.go +++ b/pkg/symbol/cluster.go @@ -1,6 +1,7 @@ package symbol import ( + "github.com/siyul-park/uniflow/pkg/packet" "sync" "github.com/siyul-park/uniflow/pkg/node" @@ -208,9 +209,12 @@ func (n *Cluster) Close() error { func (n *Cluster) inbound(inPort *port.InPort, outPort *port.OutPort) port.Listener { return port.ListenFunc(func(proc *process.Process) { reader := inPort.Open(proc) - writer := outPort.Open(proc) + var writer *packet.Writer for inPck := range reader.Read() { + if writer == nil { + writer = outPort.Open(proc) + } if writer.Write(inPck) == 0 { reader.Receive(inPck) } @@ -220,10 +224,13 @@ func (n *Cluster) inbound(inPort *port.InPort, outPort *port.OutPort) port.Liste func (n *Cluster) outbound(inPort *port.InPort, outPort *port.OutPort) port.Listener { return port.ListenFunc(func(proc *process.Process) { - reader := inPort.Open(proc) + var reader *packet.Reader writer := outPort.Open(proc) for backPck := range writer.Receive() { + if reader == nil { + reader = inPort.Open(proc) + } reader.Receive(backPck) } })