At Signal-AI we process and enrich millions of document a day.
Most of our components are written in Clojure and we leverage channels and asynchronous programming using the fantastic core.async library.
I want to share some findings around the usage of partition
when dealing with channels and in particular I want to highlight how memory usage can go out of hand. Finally I want to propose a solution to keep memory allocation under control.
Please note that there might be better and more elegant solutions. If so please reach out as I am eager to know :)
The following is a fabricated example to keep things as simple as possible.
Let's imagine we have a stream of content coming in. This stream contains data which varies in size and it is in the range of 1-100Kb. We want to process each document, run some business logic and finally write it to an S3 bucket. In order to minimise the S3 writes, we want to batch documents in bundles of 50000.
To give some context let's imagine we have a bunch of functions which take care of writing the stream to the target system. The next image shows some partial, boilerplate code.
Normally I would have functions like below where split-in-chunks
makes use of the transducer partition-all
. This function returns a channel where each element is a partition of data based on the partition size (50000 in this example).
The issue here is that partition-all
yields a new partition only when it is full or no more elements are available.
This means that the memory used is linear with the partition size, times the size of each element. Furthermore the memory used is also amplified by the size of each element in the channel. For instance:
- element size 10Kb -> partition size 500000Kb
- element size 100Kb -> partition size 5000000Kb
This type of implementation forced us to allocate more memory than what is actually needed. The code is also complex as we have to move from streams (using channels) to collections (generated by partition-all
) back to streams (using async/to-chan
).
For this reasons I started wondering if there is a better way and this is what I came up with:
This function takes an input channel and a partition size and returns straightaway a channel which will contain other channels of fixed size partition.
Basically instead of having a channel of fixed size collections we have a channel of fixed size channels.
The code to make use of partition-in-channels
function is slightly simplified too:
As a result of this some of our systems can run with one fourth of memory.
Thanks for making this far, if you have any comments or improvements feel free to contact me on Twitter. The source code is available at this Github repo: https://github.com/ed0t/async-partitioner