Skip to content

Commit

Permalink
Merge pull request #67 from ipfs/feat/streaming-set
Browse files Browse the repository at this point in the history
add a streaming CID set
  • Loading branch information
Stebalien committed Aug 11, 2018
2 parents 1543f4a + 3655c1c commit 83a7594
Showing 1 changed file with 35 additions and 0 deletions.
35 changes: 35 additions & 0 deletions set.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package cid

import (
"context"
)

// Set is a implementation of a set of Cids, that is, a structure
// to which holds a single copy of every Cids that is added to it.
type Set struct {
Expand Down Expand Up @@ -65,3 +69,34 @@ func (s *Set) ForEach(f func(c *Cid) error) error {
}
return nil
}

// StreamingSet is an extension of Set which allows to implement back-pressure
// for the Visit function
type StreamingSet struct {
Set *Set
New chan *Cid
}

// NewStreamingSet initializes and returns new Set.
func NewStreamingSet() *StreamingSet {
return &StreamingSet{
Set: NewSet(),
New: make(chan *Cid),
}
}

// Visitor creates new visitor which adds a Cids to the set and emits them to
// the set.New channel
func (s *StreamingSet) Visitor(ctx context.Context) func(c *Cid) bool {
return func(c *Cid) bool {
if s.Set.Visit(c) {
select {
case s.New <- c:
case <-ctx.Done():
}
return true
}

return false
}
}

0 comments on commit 83a7594

Please sign in to comment.