diff --git a/packages/vats/src/lib-chainPublish.js b/packages/vats/src/lib-chainPublish.js new file mode 100644 index 00000000000..d16bb20a14f --- /dev/null +++ b/packages/vats/src/lib-chainPublish.js @@ -0,0 +1,46 @@ +// @ts-check + +import { E } from '@endo/far'; + +/** + * Publish results of an async iterable into a chain storage node as an array + * serialized using JSON.stringify or an optionally provided function (e.g., + * leveraging a serialize function from makeMarshal). + * + * @param {AsyncIterator} iterable + * @param {ReturnType} chainStorageNode + * @param {{ timerService: ERef, serialize?: (obj: any) => string }} powers + */ +export async function publishToChainNode( + iterable, + chainStorageNode, + { timerService, serialize = JSON.stringify }, +) { + let nextIndex = 0n; + let isNewBlock = true; + let oldResults = []; + let results = []; + for await (const result of iterable) { + if (isNewBlock) { + isNewBlock = false; + oldResults = results; + results = []; + E(timerService) + .delay(1n) + .then(() => { + isNewBlock = true; + }); + } + // To avoid loss when detecting the new block *after* already consuming + // results produced within it, we associate each result with an index and + // include "old" results in the batch. + // Downstream consumers are expected to deduplicate by index. + // We represent the index as a string to maintain compatibility with + // JSON.stringify and to avoid overly inflating the data size (e.g. with + // "@qclass" objects from makeMarshal serialize functions). + results.push([String(nextIndex), result]); + nextIndex += 1n; + const batch = harden(oldResults.slice().concat(results)); + E(chainStorageNode).setValue(serialize(batch)); + } +}