Skip to content

Commit

Permalink
fix: lazy open reader and writer
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Dec 27, 2024
1 parent 3555873 commit e4e4320
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 11 deletions.
11 changes: 9 additions & 2 deletions pkg/node/manytoone.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ func (n *ManyToOneNode) forward(index int) port.Listener {
defer n.mu.RUnlock()

inReader := n.inPorts[index].Open(proc)
outWriter := n.outPort.Open(proc)
errWriter := n.errPort.Open(proc)
var outWriter *packet.Writer
var errWriter *packet.Writer

readGroup, _ := n.readGroups.LoadOrStore(proc, func() (*packet.ReadGroup, error) {
inReaders := make([]*packet.Reader, len(n.inPorts))
Expand All @@ -110,6 +110,13 @@ func (n *ManyToOneNode) forward(index int) port.Listener {
for inPck := range inReader.Read() {
n.tracer.Read(inReader, inPck)

if outWriter == nil {
outWriter = n.outPort.Open(proc)
}
if errWriter == nil {
errWriter = n.errPort.Open(proc)
}

if inPcks := readGroup.Read(inReader, inPck); len(inPcks) < len(n.inPorts) {
n.tracer.Reduce(inPck)
} else if outPck, errPck := n.action(proc, inPcks); errPck != nil {
Expand Down
16 changes: 11 additions & 5 deletions pkg/node/onetomany.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,21 @@ func (n *OneToManyNode) forward(proc *process.Process) {
defer n.mu.RUnlock()

inReader := n.inPort.Open(proc)
outWriters := make([]*packet.Writer, len(n.outPorts))
for i, outPort := range n.outPorts {
outWriters[i] = outPort.Open(proc)
}
errWriter := n.errPort.Open(proc)
outWriters := make([]*packet.Writer, 0, len(n.outPorts))
var errWriter *packet.Writer

for inPck := range inReader.Read() {
n.tracer.Read(inReader, inPck)

if len(outWriters) == 0 {
for _, outPort := range n.outPorts {
outWriters = append(outWriters, outPort.Open(proc))
}
}
if errWriter == nil {
errWriter = n.errPort.Open(proc)
}

if outPcks, errPck := n.action(proc, inPck); errPck != nil {
n.tracer.Transform(inPck, errPck)
n.tracer.Write(errWriter, errPck)
Expand Down
11 changes: 9 additions & 2 deletions pkg/node/onetoone.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,19 @@ func (n *OneToOneNode) Close() error {

func (n *OneToOneNode) forward(proc *process.Process) {
inReader := n.inPort.Open(proc)
outWriter := n.outPort.Open(proc)
errWriter := n.errPort.Open(proc)
var outWriter *packet.Writer
var errWriter *packet.Writer

for inPck := range inReader.Read() {
n.tracer.Read(inReader, inPck)

if outWriter == nil {
outWriter = n.outPort.Open(proc)
}
if errWriter == nil {
errWriter = n.errPort.Open(proc)
}

if outPck, errPck := n.action(proc, inPck); errPck != nil {
n.tracer.Transform(inPck, errPck)
n.tracer.Write(errWriter, errPck)
Expand Down
11 changes: 9 additions & 2 deletions pkg/symbol/cluster.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package symbol

import (
"github.com/siyul-park/uniflow/pkg/packet"
"sync"

"github.com/siyul-park/uniflow/pkg/node"
Expand Down Expand Up @@ -208,9 +209,12 @@ func (n *Cluster) Close() error {
func (n *Cluster) inbound(inPort *port.InPort, outPort *port.OutPort) port.Listener {
return port.ListenFunc(func(proc *process.Process) {
reader := inPort.Open(proc)
writer := outPort.Open(proc)
var writer *packet.Writer

for inPck := range reader.Read() {
if writer == nil {
writer = outPort.Open(proc)
}
if writer.Write(inPck) == 0 {
reader.Receive(inPck)
}
Expand All @@ -220,10 +224,13 @@ func (n *Cluster) inbound(inPort *port.InPort, outPort *port.OutPort) port.Liste

func (n *Cluster) outbound(inPort *port.InPort, outPort *port.OutPort) port.Listener {
return port.ListenFunc(func(proc *process.Process) {
reader := inPort.Open(proc)
var reader *packet.Reader
writer := outPort.Open(proc)

for backPck := range writer.Receive() {
if reader == nil {
reader = inPort.Open(proc)
}
reader.Receive(backPck)
}
})
Expand Down

0 comments on commit e4e4320

Please sign in to comment.