Skip to content

Commit

Permalink
node updates #9
Browse files Browse the repository at this point in the history
  • Loading branch information
lnashier authored May 13, 2024
2 parents c6311a7 + 050e396 commit 15f9006
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 40 deletions.
9 changes: 4 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ The `glow` is an idiomatic general purpose computational network framework.
- [x] Transit node
- [x] Broadcaster mode
- [x] Distributor mode
- [ ] Router mode
- [ ] ~~Router mode~~
- [x] Emitter mode
- [x] Filter
- [x] Sessions
Expand All @@ -25,9 +25,8 @@ The `glow` is an idiomatic general purpose computational network framework.
- [ ] Network integrity checks
- [x] Avoid cycles
- [x] Isolated nodes
- [ ] Statistics
- [x] Tally
- [ ] ~~Most & least used path~~
- [ ] Node response time
- [x] Link tally
- [ ] ~~Most & least used paths~~
- [ ] ~~Fastest & slowest paths~~
- [ ] ~~Network modifications while network is up (e.g. Remove link, Add Link)~~
- [ ] ~~ACK~~
4 changes: 2 additions & 2 deletions examples/abstracts/evenoddloop1seednet/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func Network() *glow.Network {
node1ID := keygen()
nodeInCounts.Store(node1ID, make([]int, 0))
nodeOutCounts.Store(node1ID, make([]int, 0))
n.AddNode(glow.EmitterFunc(func(ctx context.Context, in1 any, emit func(any)) error {
n.AddNode(glow.EmitFunc(func(ctx context.Context, in1 any, emit func(any)) error {
in := in1.(int)
inCounts, _ := nodeInCounts.Load(node1ID)
nodeInCounts.Store(node1ID, append(inCounts.([]int), in))
Expand All @@ -107,7 +107,7 @@ func Network() *glow.Network {
node2ID := keygen()
nodeInCounts.Store(node2ID, make([]int, 0))
nodeOutCounts.Store(node2ID, make([]int, 0))
n.AddNode(glow.EmitterFunc(func(ctx context.Context, in1 any, emit func(any)) error {
n.AddNode(glow.EmitFunc(func(ctx context.Context, in1 any, emit func(any)) error {
in := in1.(int)
inCounts, _ := nodeInCounts.Load(node2ID)
nodeInCounts.Store(node2ID, append(inCounts.([]int), in))
Expand Down
4 changes: 2 additions & 2 deletions examples/abstracts/fandistributsnet/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func addSeed(net *glow.Network, nodeID string, emitter bool, opt ...glow.NodeOpt
nodeOutCounts.Store(nodeID, make([]int, 0))

if emitter {
opt = append(opt, glow.EmitterFunc(func(ctx context.Context, _ any, emit func(any)) error {
opt = append(opt, glow.EmitFunc(func(ctx context.Context, _ any, emit func(any)) error {
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -143,7 +143,7 @@ func addNode(net *glow.Network, nodeID string, emitter bool, opt ...glow.NodeOpt
nodeOutCounts.Store(nodeID, make([]int, 0))

if emitter {
opt = append(opt, glow.EmitterFunc(func(ctx context.Context, in1 any, emit func(any)) error {
opt = append(opt, glow.EmitFunc(func(ctx context.Context, in1 any, emit func(any)) error {
in := in1.(int)
inCounts, _ := nodeInCounts.Load(nodeID)
nodeInCounts.Store(nodeID, append(inCounts.([]int), in))
Expand Down
3 changes: 3 additions & 0 deletions examples/wordcount/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ func main() {
return plug.ReadFile("test.txt", emit)
}).
Map(func(ctx context.Context, in any, emit func(any)) error {
// Receiver function (method) cannot have type parameters.
// This is the cleanest way at this moment.
// https://go.googlesource.com/proposal/+/refs/heads/master/design/43651-type-parameters.md#no-parameterized-methods
plug.Tokenize(ctx, in.(string), emit)
return nil
}).
Expand Down
4 changes: 2 additions & 2 deletions flow/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (s *Seq) Error() error {

func (s *Seq) seed(kind StepKind, sf func(ctx context.Context, emit func(any)) error, opts *opts) (*Step, error) {
nodeOpts := []glow.NodeOpt{
glow.EmitterFunc(func(ctx context.Context, _ any, emit func(any)) error {
glow.EmitFunc(func(ctx context.Context, _ any, emit func(any)) error {
return sf(ctx, emit)
}),
glow.Key(fmt.Sprintf("%s-%s", s.keygen(), kind)),
Expand All @@ -246,7 +246,7 @@ func (s *Seq) seed(kind StepKind, sf func(ctx context.Context, emit func(any)) e

func (s *Seq) transit(kind StepKind, tf func(ctx context.Context, in any, emit func(any)) error, opts *opts) (*Step, error) {
nodeOpts := []glow.NodeOpt{
glow.EmitterFunc(func(ctx context.Context, in any, emit func(any)) error {
glow.EmitFunc(func(ctx context.Context, in any, emit func(any)) error {
return tf(ctx, in, emit)
}),
glow.Key(fmt.Sprintf("%s-%s", s.keygen(), kind)),
Expand Down
16 changes: 4 additions & 12 deletions link.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,15 @@ func (l *Link) Uptime() time.Duration {
if l.deleted || l.paused {
return 0
}
if !l.x.start.IsZero() && !l.y.start.IsZero() {
stop := l.x.stop
if !l.x.session.start.IsZero() && !l.y.session.start.IsZero() {
stop := l.x.session.stop
if stop.IsZero() {
stop = l.y.stop
stop = l.y.session.stop
}
if stop.IsZero() {
stop = time.Now()
}
return stop.Sub(l.y.start)
return stop.Sub(l.y.session.start)
}

return 0
Expand Down Expand Up @@ -315,11 +315,3 @@ func (n *Network) refreshLinks(node *Node) {
}
}
}

func (n *Network) refreshNodes() {
for _, node := range n.Nodes() {
node.start = time.Time{}
node.stop = time.Time{}
n.refreshLinks(node)
}
}
48 changes: 31 additions & 17 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,34 +23,37 @@ import (
// In broadcaster mode, Node broadcasts all incoming data to all outgoing links.
// When the distributor flag is enabled, a Node distributes incoming data among its outgoing links.
// Distributor mode is not functional for isolated and terminus nodes.
// - By default, a Node operates in "push-pull" mode, where the Network pushes data to NodeFunc and waits for NodeFunc to return.
// This behavior can be changed to "push-push" by setting the emitter setting for the Node.
// In emitter mode, the Network pushes data to NodeFunc, and the Node emits data back to the Network through the supplied channel.
// Emitter mode is not functional for isolated and terminus nodes.
// - By default, a Node operates in "push-pull" mode: the Network pushes data to NodeFunc,
// and it waits for NodeFunc to return with output data, which is then forwarded to connected Node(s).
// This behavior can be changed to "push-push" by setting the EmitFunc for the Node.
// In emitter mode, the Network pushes data to EmitFunc, and the Node emits data back to the Network
// through the supplied callback emit function.
type Node struct {
key string
f func(context.Context, any) (any, error)
ef func(context.Context, any, func(any)) error
distributor bool
session nodeSession
}

// session
type nodeSession struct {
start time.Time
stop time.Time
}

type NodeOpt func(*Node)

func (n *Node) Key() string {
return n.key
}

func (n *Node) Uptime() time.Duration {
if n.stop.IsZero() {
return time.Since(n.start)
if n.session.stop.IsZero() {
return time.Since(n.session.start)
}
return n.stop.Sub(n.start)
return n.session.stop.Sub(n.session.start)
}

type NodeOpt func(*Node)

func (n *Node) apply(opt ...NodeOpt) {
for _, o := range opt {
o(n)
Expand All @@ -71,21 +74,25 @@ func Key(k string) NodeOpt {
}
}

// Distributor enables a Node to distribute incoming data among its outgoing links.
func Distributor() NodeOpt {
return func(n *Node) {
n.distributor = true
}
}

// NodeFunc is the function responsible for processing incoming data on the Node.
// NodeFunc is responsible for processing incoming data on the Node.
// Output from the Node is forwarded to downstream connected Node(s).
func NodeFunc(f func(ctx context.Context, data any) (any, error)) NodeOpt {
return func(n *Node) {
n.f = f
}
}

// EmitterFunc is similar to NodeFunc, but it additionally provides a callback where data can be emitted.
func EmitterFunc(f func(ctx context.Context, data any, emit func(any)) error) NodeOpt {
// EmitFunc handles processing incoming data on the Node.
// It provides a callback where output data can be optionally emitted.
// Emitted data is forwarded to downstream connected Node(s).
func EmitFunc(f func(ctx context.Context, data any, emit func(any)) error) NodeOpt {
return func(n *Node) {
n.ef = f
}
Expand Down Expand Up @@ -241,10 +248,10 @@ func (n *Network) nodeUp(ctx context.Context, node *Node) error {
return ErrIsolatedNodeFound
}

node.start = time.Now()
node.stop = time.Time{}
node.session.start = time.Now()
node.session.stop = time.Time{}
defer func() {
node.stop = time.Now()
node.session.stop = time.Now()
}()

var egressYs string
Expand Down Expand Up @@ -359,7 +366,7 @@ func (n *Network) nodeUp(ctx context.Context, node *Node) error {

nodeWg, nodeCtx := errgroup.WithContext(ctx)

// When transit-node is in emitter mod, Node emitter function is called for every incoming data point.
// When transit-node is in emitter mode, Node emitter function is called for every incoming data point.
// Transit-node can choose to emit as many data points and return control back to get next incoming data point.
nf := node.ef
if nf == nil {
Expand Down Expand Up @@ -514,3 +521,10 @@ func (n *Network) nodeUp(ctx context.Context, node *Node) error {

return nil
}

func (n *Network) refreshNodes() {
for _, node := range n.Nodes() {
node.session = nodeSession{}
n.refreshLinks(node)
}
}

0 comments on commit 15f9006

Please sign in to comment.