-
Notifications
You must be signed in to change notification settings - Fork 1
/
pump.go
382 lines (311 loc) · 10.1 KB
/
pump.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
/*
Package pump provides a minimalist framework for composing data processing pipelines.
The pipelines are type-safe, impose little overhead, and can be composed either statically,
or dynamically (for example, as a function of configuration). A running pipeline stops on and
returns the first error encountered.
The package defines two generic types:
- Data generator Gen[T]: a callback-based ("push") iterator that supplies a stream of data of
any type T, and
- Pipeline stage Stage[T,U]: a function that invokes input generator Gen[T], does whatever processing
it is programmed to do, and feeds the supplied callback with data items of type U.
The package also provides a basic set of functions for composing pipeline stages and binding stages
to generators, as well as support for pipelining and parallel execution.
*/
package pump
import (
"context"
"errors"
"iter"
"runtime"
"sync"
"sync/atomic"
)
// generate chain functions
//go:generate ./gen-chains chain.go
/*
Gen is a generic push iterator, or a generator. When invoked with a user-provided callback, it
is expected to iterate its data source invoking the callback once per each data item. It is also
expected to stop on the first error either from the callback, or stumbled over internally. It is up
to the user to develop their own generators, because it's not possible to provide a generic code
for all possible data sources. Also, there is one caveat: some generators can be run only once
(for example, those sourcing data from a socket), so please structure your code accordingly.
*/
type Gen[T any] func(func(T) error) error
// Iter constructs a new iterator from the given generator.
func (src Gen[T]) Iter() It[T] {
return It[T]{src: src}
}
/*
Bind takes an existing generator of some type T and returns a new generator of some type U that
does T -> U conversion via the given stage function.
*/
func Bind[T, U any](src Gen[T], stage Stage[T, U]) Gen[U] {
return func(yield func(U) error) error {
return stage(src, yield)
}
}
/*
It is an iterator over the given generator. Its main purpose is to provide
a function to range over using a for loop. Since the release of Go v1.23 everybody
does range-over-function, so me too. Given some type T and a generator "src" of type
Gen[T], we can then do:
it := src.Iter()
for item := range it.All {
// process item
}
if it.Err != nil { ... }
A generator like "src" is typically constructed as some input generator bound
to a processing stage using Bind() function.
*/
type It[T any] struct {
Err error // error returned from the pipeline
src Gen[T]
}
// All is the function to range over using a for loop.
func (it *It[T]) All(yield func(T) bool) {
it.Err = it.src(func(item T) (err error) {
if !yield(item) {
err = ErrStop
}
return
})
if errors.Is(it.Err, ErrStop) {
it.Err = nil
}
}
/*
Stage is a generic type (a function) representing a pipeline stage. For any given types T and U,
the function takes a generator of type Gen[T] and a callback of type func(U) error. When invoked,
it is expected to run the generator, do whatever processing it is programmed to do, also calling
the callback function once per each data element produced. Stage function is expected to stop at
and return the first error (if any) from either the callback, or from the iteration itself. The
signature of the function is designed to allow for full control over when and how the source
generator is invoked. For example, suppose we want to have a pipeline stage where processing of
each input item involves database queries, and we also want to establish a database connection
before the iteration, and close it afterwards. This can be achieved using the following stage
function (for some already defined types T and U):
func process(src pump.Gen[T], yield func(U) error) error {
conn, err := connectToDatabase()
if err != nil {
return err
}
defer conn.Close()
return src(func(item T) error { // this actually invokes the source generator
// produce a result of type U
result, err := produceResult(item, conn)
if err != nil {
return err
}
// pass the result further down the pipeline
return yield(result)
})
}
*/
type Stage[T, U any] func(Gen[T], func(U) error) error
// Filter creates a stage function that filters input items according to the given predicate.
func Filter[T any](pred func(T) bool) Stage[T, T] {
return func(src Gen[T], yield func(T) error) error {
return src(func(item T) (err error) {
if pred(item) {
err = yield(item)
}
return
})
}
}
// Map creates a stage function that converts each data element via the given function.
func Map[T, U any](fn func(T) U) Stage[T, U] {
return func(src Gen[T], yield func(U) error) error {
return src(func(item T) error {
return yield(fn(item))
})
}
}
// MapE creates a stage function that converts each data element via the given function,
// stopping on the first error encountered, if any.
func MapE[T, U any](fn func(T) (U, error)) Stage[T, U] {
return func(src Gen[T], yield func(U) error) error {
return src(func(item T) error {
tmp, err := fn(item)
if err != nil {
return err
}
return yield(tmp)
})
}
}
// Pipe is a stage function that runs its source in a separate goroutine.
func Pipe[T any](src Gen[T], yield func(T) error) error {
return pipeCtx(context.Background(), src, yield)
}
// PipeCtx creates a stage function that runs its source in a separate goroutine.
// The lifetime of the pipe is managed via the given context.
func PipeCtx[T any](ctx context.Context) Stage[T, T] {
return func(src Gen[T], yield func(T) error) error {
return pipeCtx(ctx, src, yield)
}
}
// capacity of the channel between pipe stages
const chanCap = 32
// pipe implementation
func pipeCtx[T any](ctx context.Context, src Gen[T], yield func(T) error) error {
return execInCtx(ctx, func(env *pipeEnv) error {
return readChan(startFeeder(env, src), yield)
})
}
// Parallel constructs a stage function that invokes the given stage from n
// goroutines in parallel. The value of n has the upper bound of 100 * runtime.NumCPU().
// Zero or negative value of n corresponds to runtime.NumCPU(). This stage does not
// preserve the order of data items.
func Parallel[T, U any](n int, stage Stage[T, U]) Stage[T, U] {
return ParallelCtx(context.Background(), n, stage)
}
// ParallelCtx constructs a stage function that invokes the given stage from n
// goroutines in parallel, under control of the given context. The value of n has
// the upper bound of 100 * runtime.NumCPU(). Zero or negative value of n corresponds
// to runtime.NumCPU(). This stage does not preserve the order of data items.
func ParallelCtx[T, U any](ctx context.Context, n int, stage Stage[T, U]) Stage[T, U] {
// ensure realistic value for n
np := runtime.NumCPU()
if n <= 0 {
n = np
} else {
n = min(n, np*100)
}
// the stage
return func(src Gen[T], yield func(U) error) error {
return execInCtx(ctx, func(env *pipeEnv) error {
// feeder channel
feeder := startFeeder(env, src)
// generator from feeder
gen := func(fn func(T) error) error { return readChan(feeder, fn) }
// collector channel
collector := make(chan U, chanCap)
// collector sink
sink := toChan(env, collector)
// start workers
refCount := int32(n)
env.wg.Add(n)
for i := 0; i < n; i++ {
go func() {
defer func() {
if atomic.AddInt32(&refCount, -1) == 0 {
close(collector)
}
env.wg.Done()
}()
env.safe(stage(gen, sink))
}()
}
// run
return readChan(collector, yield)
})
}
}
// synchronisation pack for pipes
type pipeEnv struct {
ctx context.Context
cancel context.CancelCauseFunc
wg sync.WaitGroup
}
// error checking
func (env *pipeEnv) safe(err error) {
if err != nil {
env.cancel(err)
}
}
// create pipe environment and run the given function in it
func execInCtx(ctx context.Context, fn func(*pipeEnv) error) error {
// environment
var env pipeEnv
// context
env.ctx, env.cancel = context.WithCancelCause(ctx)
// cancel the context upon completion
defer func() {
if p := recover(); p != nil {
env.cancel(errPanic)
panic(p)
}
env.cancel(nil)
}()
// run
env.safe(fn(&env))
// wait for all threads to terminate
env.wg.Wait()
// all done
return context.Cause(env.ctx)
}
// start feeder thread
func startFeeder[T any](env *pipeEnv, src Gen[T]) <-chan T {
// feeder channel
feeder := make(chan T, chanCap)
// start feeder thread
env.wg.Add(1)
go func() {
defer func() {
close(feeder)
env.wg.Done()
}()
env.safe(src(toChan(env, feeder)))
}()
return feeder
}
// yield from the channel
func readChan[T any](ch <-chan T, yield func(T) error) (err error) {
for item := range ch {
if err = yield(item); err != nil {
break
}
}
return
}
// construct function that yields to the channel
func toChan[T any](env *pipeEnv, ch chan<- T) func(T) error {
return func(item T) error {
select {
case ch <- item:
return nil
case <-env.ctx.Done():
return env.ctx.Err()
}
}
}
var errPanic = errors.New("pipeline panicked")
// ErrStop signals early exit from range over function loop. It is not stored in
// It.Err, but within a stage function in some (probably, rare) situations it may be
// treated as a special case.
var ErrStop = errors.New("pipeline cancelled")
// FromSeq constructs a generator from the given iterator.
func FromSeq[T any](src iter.Seq[T]) Gen[T] {
return func(yield func(T) error) (err error) {
for item := range src {
if err = yield(item); err != nil {
break
}
}
return
}
}
// FromSlice constructs a generator that reads data from the given slice, in order.
// In Go v1.23 it saves a few nanoseconds per iteration when compared to FromSeq(slices.Values(src)).
func FromSlice[S ~[]T, T any](src S) Gen[T] {
return func(yield func(T) error) (err error) {
for _, item := range src {
if err = yield(item); err != nil {
break
}
}
return
}
}
// All constructs a generator that invokes all the given generators one after another, in order.
func All[T any](srcs ...Gen[T]) Gen[T] {
return func(yield func(T) error) (err error) {
for _, src := range srcs {
if err = src(yield); err != nil {
break
}
}
return
}
}