diff --git a/README.md b/README.md index a19f7e2..cf7c917 100644 --- a/README.md +++ b/README.md @@ -11,9 +11,9 @@ If you have another common use case you would like to see covered by this packag ## Cookbook -* [How to run a pipeline until the container is killed](https://github.com/deliveryhero/pipeline#PipelineShutsDownWhenContainerIsKilled) -* [How to shut down a pipeline when there is a error](https://github.com/deliveryhero/pipeline#PipelineShutsDownOnError) -* [How to shut down a pipeline after it has finished processing a batch of data](https://github.com/deliveryhero/pipeline#PipelineShutsDownWhenInputChannelIsClosed) +* [How to run a pipeline until the container is killed](https://github.com/deliveryhero/pipeline#pipelineshutsdownwhencontaineriskilled) +* [How to shut down a pipeline when there is a error](https://github.com/deliveryhero/pipeline#pipelineshutsdownonerror) +* [How to shut down a pipeline after it has finished processing a batch of data](https://github.com/deliveryhero/pipeline#pipelineshutsdownwheninputchannelisclosed) ## Functions @@ -232,6 +232,7 @@ ProcessBatchConcurrently fans the in channel out to multiple batch Processors ru then it fans the out channels of the batch Processors back into a single out chan ```golang + // Create a context that times out after 5 seconds ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -253,19 +254,17 @@ p = pipeline.ProcessBatchConcurrently(ctx, 2, 2, time.Minute, pipeline.NewProces for result := range p { fmt.Printf("result: %d\n", result) } -``` - Output: +// Example Output: +// result: 1 +// result: 2 +// result: 3 +// result: 5 +// error: could not process [7 8], context deadline exceeded +// error: could not process [4 6], context deadline exceeded +// error: could not process [9], context deadline exceeded ``` -result: 1 -result: 2 -result: 3 -result: 5 -error: could not process [7 8], context deadline exceeded -error: could not process [4 6], context deadline exceeded -error: could not process [9], context deadline exceeded -``` ### func [ProcessConcurrently](/process.go#L26) @@ -275,6 +274,7 @@ ProcessConcurrently fans the in channel out to multiple Processors running concu then it fans the out channels of the Processors back into a single out chan ```golang + // Create a context that times out after 5 seconds ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -296,19 +296,17 @@ p = pipeline.ProcessConcurrently(ctx, 2, pipeline.NewProcessor(func(ctx context. for result := range p { log.Printf("result: %d\n", result) } -``` - Output: +// Example Output: +// result: 2 +// result: 1 +// result: 4 +// result: 3 +// error: could not process 6, process was canceled +// error: could not process 5, process was canceled +// error: could not process 7, context deadline exceeded ``` -result: 2 -result: 1 -result: 4 -result: 3 -error: could not process 6, process was canceled -error: could not process 5, process was canceled -error: could not process 7, context deadline exceeded -``` ### func [Split](/split.go#L4) diff --git a/cancel_test.go b/cancel_test.go index 59f5e13..e241760 100644 --- a/cancel_test.go +++ b/cancel_test.go @@ -41,7 +41,7 @@ func TestCancel(t *testing.T) { // Start canceling the pipeline about half way through the test ctx, cancel := context.WithTimeout(context.Background(), testDuration/2) defer cancel() - for i := range Cancel[int](ctx, canceled, in) { + for i := range Cancel(ctx, canceled, in) { logf("%d", i) } diff --git a/collect_test.go b/collect_test.go index 023ee87..c52abca 100644 --- a/collect_test.go +++ b/collect_test.go @@ -124,7 +124,7 @@ func TestCollect(t *testing.T) { defer cancel() // Collect responses - collect := Collect[int](ctx, test.args.maxSize, test.args.maxDuration, in) + collect := Collect(ctx, test.args.maxSize, test.args.maxDuration, in) timeout := time.After(maxTestDuration) var outs [][]int var isOpen bool diff --git a/process_batch_example_test.go b/process_batch_example_test.go index 842afc3..3edfd7f 100644 --- a/process_batch_example_test.go +++ b/process_batch_example_test.go @@ -61,7 +61,7 @@ func ExampleProcessBatchConcurrently() { fmt.Printf("result: %d\n", result) } - // Output: + // Example Output: // result: 1 // result: 2 // result: 3 diff --git a/process_example_test.go b/process_example_test.go index 8fff02c..f70a1f8 100644 --- a/process_example_test.go +++ b/process_example_test.go @@ -61,7 +61,7 @@ func ExampleProcessConcurrently() { log.Printf("result: %d\n", result) } - // Output: + // Example Output: // result: 2 // result: 1 // result: 4