From 60e0d6fe0a47247a3d933d9a9dabca6dcef8a30f Mon Sep 17 00:00:00 2001 From: Richard Gibson Date: Fri, 12 Aug 2022 05:39:23 -0400 Subject: [PATCH] feat(casting): Update to consume stream cells Fixes #5366 --- packages/casting/src/follower-cosmjs.js | 41 ++++++++---- packages/casting/test/fake-rpc-server.js | 37 +++++++++-- packages/casting/test/test-mvp.js | 81 +++++++++++++++--------- 3 files changed, 111 insertions(+), 48 deletions(-) diff --git a/packages/casting/src/follower-cosmjs.js b/packages/casting/src/follower-cosmjs.js index 0a95a278f6e7..66f40b340365 100644 --- a/packages/casting/src/follower-cosmjs.js +++ b/packages/casting/src/follower-cosmjs.js @@ -305,7 +305,7 @@ export const makeCosmjsFollower = ( * @param {import('./types').CastingChange} allegedChange */ const tryQueryAndUpdate = async allegedChange => { - const committer = prepareUpdateInOrder(); + let committer = prepareUpdateInOrder(); // Make an unproven query if we have no alleged value. const { values: allegedValues, blockHeight: allegedBlockHeight } = @@ -334,18 +334,36 @@ export const makeCosmjsFollower = ( } } lastBuf = buf; - const data = decode(buf); - if (!unserializer) { - /** @type {T} */ - const value = data; - committer.commit({ value }); - return; + let streamCell = decode(buf); + // Upgrade a naked value to a JSON stream cell if necessary. + if (!streamCell.height || !streamCell.values) { + streamCell = { values: [JSON.stringify(streamCell)] }; } - const value = await E(unserializer).unserialize(data); - if (!committer.isValid()) { - return; + for (let i = 0; i < streamCell.values.length; i += 1) { + const data = JSON.parse(streamCell.values[i]); + const last = i + 1 === streamCell.values.length; + if (!unserializer) { + /** @type {T} */ + const value = data; + committer.commit({ value }); + if (!last) { + committer = prepareUpdateInOrder(); + } + // eslint-disable-next-line no-continue + continue; + } + // eslint-disable-next-line no-await-in-loop,@jessie.js/no-nested-await + const value = await E(unserializer).unserialize(data); + if (!committer.isValid()) { + // QUESTION: How would we get here, and what is the proper handling? + // eslint-disable-next-line no-continue + continue; + } + committer.commit({ value }); + if (!last) { + committer = prepareUpdateInOrder(); + } } - committer.commit({ value }); }; const changeFollower = E(leader).watchCasting(castingSpecP); @@ -362,6 +380,7 @@ export const makeCosmjsFollower = ( return; } harden(allegedChange); + // eslint-disable-next-line @jessie.js/no-nested-await await queryAndUpdateOnce(allegedChange); } }; diff --git a/packages/casting/test/fake-rpc-server.js b/packages/casting/test/fake-rpc-server.js index 0db8d1ba8bac..118c3646ce5c 100644 --- a/packages/casting/test/fake-rpc-server.js +++ b/packages/casting/test/fake-rpc-server.js @@ -64,10 +64,18 @@ const fakeStatusResult = { }, }; -export const startFakeServer = (t, fakeValues, marshaller = makeMarshal()) => { +/** + * @param {Assertions} t + * @param {Array<{any}>} fakeValues + * @param {object} [options] + * @param {Marshaller} [options.marshaller] + * @param {number} [options.batchSize] count of stream-cell results per response, or 0/absent to return lone naked values + */ +export const startFakeServer = (t, fakeValues, options = {}) => { const { log = console.log } = t; lastPort += 1; const PORT = lastPort; + const { marshaller = makeMarshal(), batchSize = 0 } = options; return new Promise(resolve => { log('starting http server on port', PORT); const app = express(); @@ -97,6 +105,8 @@ export const startFakeServer = (t, fakeValues, marshaller = makeMarshal()) => { buf.set(ascii, dataPrefix.length); return toBase64(buf); }; + let height = 74863; + let responseValueBase64; app.post('/tendermint-rpc', (req, res) => { log('received', req.path, req.body, req.params); const reply = result => { @@ -114,10 +124,23 @@ export const startFakeServer = (t, fakeValues, marshaller = makeMarshal()) => { break; } case 'abci_query': { - const value = - fakeValues.length === 0 - ? null - : encode(marshaller.serialize(fakeValues.shift())); + height += 2; + const values = fakeValues.splice(0, Math.max(1, batchSize)); + if (values.length > 0) { + if (batchSize > 0) { + // Return a JSON stream cell. + const serializedValues = values.map(val => + JSON.stringify(marshaller.serialize(val)), + ); + responseValueBase64 = encode({ + height: String(height - 1), + values: serializedValues, + }); + } else { + // Return a single naked value. + responseValueBase64 = encode(marshaller.serialize(values[0])); + } + } const result = { response: { code: 0, @@ -127,9 +150,9 @@ export const startFakeServer = (t, fakeValues, marshaller = makeMarshal()) => { key: Buffer.from( 'swingset/data:mailbox.agoric1foobarbaz', ).toString('base64'), - value, + value: responseValueBase64, proofOps: null, - height: '74863', + height: String(height), codespace: '', }, }; diff --git a/packages/casting/test/test-mvp.js b/packages/casting/test/test-mvp.js index d3a3578030ec..e7c7cccef863 100644 --- a/packages/casting/test/test-mvp.js +++ b/packages/casting/test/test-mvp.js @@ -12,39 +12,60 @@ import { import { delay } from '../src/defaults.js'; import { startFakeServer } from './fake-rpc-server.js'; -test('happy path', async t => { - const expected = ['latest', 'later', 'done']; - t.plan(expected.length); - const PORT = await t.context.startServer(t, [...expected]); - /** @type {import('../src/types.js').LeaderOptions} */ - const lo = { - retryCallback: null, // fail fast, no retries - keepPolling: () => delay(200).then(() => true), // poll really quickly - jitter: null, // no jitter - }; - /** @type {import('../src/types.js').FollowerOptions} */ - const so = { - proof: 'none', - }; +// TODO: Replace with test.macro({title, exec}). +const testHappyPath = (label, ...input) => { + // eslint-disable-next-line no-shadow + const title = label => `happy path ${label}`; + const makeExec = + ({ fakeValues, options }) => + async t => { + const expected = fakeValues; + t.plan(expected.length); + const PORT = await t.context.startFakeServer(t, [...expected], options); + /** @type {import('../src/types.js').LeaderOptions} */ + const lo = { + retryCallback: null, // fail fast, no retries + keepPolling: () => delay(200).then(() => true), // poll really quickly + jitter: null, // no jitter + }; + /** @type {import('../src/types.js').FollowerOptions} */ + const so = { + proof: 'none', + }; - // The rest of this test is taken almost verbatim from the README.md, with - // some minor modifications (testLeaderOptions and deepEqual). - const leader = makeLeader(`http://localhost:${PORT}/network-config`, lo); - const castingSpec = makeCastingSpec(':mailbox.agoric1foobarbaz'); - const follower = await makeFollower(castingSpec, leader, so); - for await (const { value } of iterateLatest(follower)) { - t.log(`here's a mailbox value`, value); + // The rest of this test is taken almost verbatim from the README.md, with + // some minor modifications (testLeaderOptions and deepEqual). + const leader = makeLeader(`http://localhost:${PORT}/network-config`, lo); + const castingSpec = makeCastingSpec(':mailbox.agoric1foobarbaz'); + const follower = await makeFollower(castingSpec, leader, so); + for await (const { value } of iterateLatest(follower)) { + t.log(`here's a mailbox value`, value); - // The rest here is to drive the test. - t.deepEqual(value, expected.shift()); - if (expected.length === 0) { - break; - } - } + // The rest here is to drive the test. + t.deepEqual(value, expected.shift()); + if (expected.length === 0) { + break; + } + } + }; + test(title(label), makeExec(...input)); +}; + +testHappyPath('naked values', { + fakeValues: ['latest', 'later', 'done'], + options: {}, +}); +testHappyPath('batchSize=1', { + fakeValues: ['latest', 'later', 'done'], + options: { batchSize: 1 }, +}); +testHappyPath('batchSize=2', { + fakeValues: ['latest', 'later', 'done'], + options: { batchSize: 2 }, }); test('bad network config', async t => { - const PORT = await t.context.startServer(t, []); + const PORT = await t.context.startFakeServer(t, []); await t.throwsAsync( () => makeLeader(`http://localhost:${PORT}/bad-network-config`, { @@ -58,7 +79,7 @@ test('bad network config', async t => { }); test('missing rpc server', async t => { - const PORT = await t.context.startServer(t, []); + const PORT = await t.context.startFakeServer(t, []); await t.throwsAsync( () => makeLeader(`http://localhost:${PORT}/missing-network-config`, { @@ -83,7 +104,7 @@ test('unrecognized proof', async t => { test.before(t => { t.context.cleanups = []; - t.context.startServer = startFakeServer; + t.context.startFakeServer = startFakeServer; }); test.after(t => {