Skip to content

Latest commit

 

History

History

pipe

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Type Safe Channels (pipe)

Go's concurrency features simplify the creation of streaming data pipelines that effectively utilize I/O and multiple CPUs. Go Concurrency Patterns: Pipelines and cancellation explains the topic in-depth. Despite the simplicity, the boilerplate code is required to build channels and establish "chrome" for its management. This module offers consistent operation over channels to form a processing pipelines in a clean and effective manner.

The module support sequential pipe and parallel fork type safe channels for building pipelines. The module consider channels as "sequential data structure" trying to derive semantic from stream interface

Quick Example

Example below is most simplest illustration of composition processing pipeline.

import (
  "github.com/fogfish/golem/pipe"
)

func main() {
  ctx, close := context.WithCancel(context.Background())

  // Generate sequence of integers
  ints := pipe.StdErr(pipe.Unfold(ctx, cap, 0,
    func(x int) (int, error) { return x + 1, nil },
  ))

  // Limit sequence of integers
  ints10 := pipe.TakeWhile(ctx, ints,
    func(x int) bool { return x <= 10 },
  )

  // Calculate squares
  sqrt := pipe.StdErr(pipe.Map(ctx, ints10,
    func(x int) (int, error) { return x * x, nil },
  ))

  // Numbers to string
  vals := pipe.StdErr(pipe.Map(ctx, sqrt,
    func(x int) (string, error) { return strconv.Itoa(x), nil },
  ))

  // Output strings
  <-pipe.ForEach(ctx, vals,
    func(x string) { fmt.Printf("==> %s\n", x) },
  )

  close()
}

Stream interface

Supported features

  • emit takes a function that emits data at a specified frequency to the channel.
  • filter returns a newly-allocated channel that contains only those elements X of the input channel for which predicate is true.
  • foreach applies function for each message in the channel.
  • map applies function over channel messages, emits result to new channel.
  • fold applies a monoid operation to the values in a channel. The final value is emitted though return channel when the end of the input channel is reached.
  • join concatenate channels, returns newly-allocated channel composed of elements copied from input channels.
  • partition partitions channel in two channels according to a predicate.
  • take returns a newly-allocated channel containing the first n elements of the input channel.
  • takeWhile returns a newly-allocated channel that contains those elements from channel while predicate returns true.
  • unfold the fundamental recursive constructor, it applies a function to each previous seed element in turn to determine the next element.

Not supported feature

  • drop returns the suffix of the input channel that starts at the next element after the first n elements.
  • dropWhile drops elements from channel while predicate returns true and returns remaining channel suffix.
  • split partitions channel into two channels. The split behaves as if it is defined as consequent take, drop.
  • splitWhile partitions channel into two channels according to predicate. The splitWhile behaves as if it is defined as consequent takeWhile, dropWhile.
  • flatten reduces dimension of channel of channels.
  • scan accumulates the partial folds of an input channel into a newly-allocated channel.
  • zip takes one or more input channels and returns a newly-allocated channel in which each element is a product of the corresponding elements of the input channels. The output channel is as long as the shortest input stream.
  • zipWith takes one or more input channels and returns a newly-allocated channel, each element produced by composition function that map list of input heads to new head. The output stream is as long as the longest input stream.