Skip to content

Commit

Permalink
feat(casting): Update to consume stream cells
Browse files Browse the repository at this point in the history
Fixes #5366
  • Loading branch information
gibson042 committed Aug 12, 2022
1 parent 5b76010 commit 60e0d6f
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 48 deletions.
41 changes: 30 additions & 11 deletions packages/casting/src/follower-cosmjs.js
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ export const makeCosmjsFollower = (
* @param {import('./types').CastingChange} allegedChange
*/
const tryQueryAndUpdate = async allegedChange => {
const committer = prepareUpdateInOrder();
let committer = prepareUpdateInOrder();

// Make an unproven query if we have no alleged value.
const { values: allegedValues, blockHeight: allegedBlockHeight } =
Expand Down Expand Up @@ -334,18 +334,36 @@ export const makeCosmjsFollower = (
}
}
lastBuf = buf;
const data = decode(buf);
if (!unserializer) {
/** @type {T} */
const value = data;
committer.commit({ value });
return;
let streamCell = decode(buf);
// Upgrade a naked value to a JSON stream cell if necessary.
if (!streamCell.height || !streamCell.values) {
streamCell = { values: [JSON.stringify(streamCell)] };
}
const value = await E(unserializer).unserialize(data);
if (!committer.isValid()) {
return;
for (let i = 0; i < streamCell.values.length; i += 1) {
const data = JSON.parse(streamCell.values[i]);
const last = i + 1 === streamCell.values.length;
if (!unserializer) {
/** @type {T} */
const value = data;
committer.commit({ value });
if (!last) {
committer = prepareUpdateInOrder();
}
// eslint-disable-next-line no-continue
continue;
}
// eslint-disable-next-line no-await-in-loop,@jessie.js/no-nested-await
const value = await E(unserializer).unserialize(data);
if (!committer.isValid()) {
// QUESTION: How would we get here, and what is the proper handling?
// eslint-disable-next-line no-continue
continue;
}
committer.commit({ value });
if (!last) {
committer = prepareUpdateInOrder();
}
}
committer.commit({ value });
};

const changeFollower = E(leader).watchCasting(castingSpecP);
Expand All @@ -362,6 +380,7 @@ export const makeCosmjsFollower = (
return;
}
harden(allegedChange);
// eslint-disable-next-line @jessie.js/no-nested-await
await queryAndUpdateOnce(allegedChange);
}
};
Expand Down
37 changes: 30 additions & 7 deletions packages/casting/test/fake-rpc-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,18 @@ const fakeStatusResult = {
},
};

export const startFakeServer = (t, fakeValues, marshaller = makeMarshal()) => {
/**
* @param {Assertions} t
* @param {Array<{any}>} fakeValues
* @param {object} [options]
* @param {Marshaller} [options.marshaller]
* @param {number} [options.batchSize] count of stream-cell results per response, or 0/absent to return lone naked values
*/
export const startFakeServer = (t, fakeValues, options = {}) => {
const { log = console.log } = t;
lastPort += 1;
const PORT = lastPort;
const { marshaller = makeMarshal(), batchSize = 0 } = options;
return new Promise(resolve => {
log('starting http server on port', PORT);
const app = express();
Expand Down Expand Up @@ -97,6 +105,8 @@ export const startFakeServer = (t, fakeValues, marshaller = makeMarshal()) => {
buf.set(ascii, dataPrefix.length);
return toBase64(buf);
};
let height = 74863;
let responseValueBase64;
app.post('/tendermint-rpc', (req, res) => {
log('received', req.path, req.body, req.params);
const reply = result => {
Expand All @@ -114,10 +124,23 @@ export const startFakeServer = (t, fakeValues, marshaller = makeMarshal()) => {
break;
}
case 'abci_query': {
const value =
fakeValues.length === 0
? null
: encode(marshaller.serialize(fakeValues.shift()));
height += 2;
const values = fakeValues.splice(0, Math.max(1, batchSize));
if (values.length > 0) {
if (batchSize > 0) {
// Return a JSON stream cell.
const serializedValues = values.map(val =>
JSON.stringify(marshaller.serialize(val)),
);
responseValueBase64 = encode({
height: String(height - 1),
values: serializedValues,
});
} else {
// Return a single naked value.
responseValueBase64 = encode(marshaller.serialize(values[0]));
}
}
const result = {
response: {
code: 0,
Expand All @@ -127,9 +150,9 @@ export const startFakeServer = (t, fakeValues, marshaller = makeMarshal()) => {
key: Buffer.from(
'swingset/data:mailbox.agoric1foobarbaz',
).toString('base64'),
value,
value: responseValueBase64,
proofOps: null,
height: '74863',
height: String(height),
codespace: '',
},
};
Expand Down
81 changes: 51 additions & 30 deletions packages/casting/test/test-mvp.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,60 @@ import {
import { delay } from '../src/defaults.js';
import { startFakeServer } from './fake-rpc-server.js';

test('happy path', async t => {
const expected = ['latest', 'later', 'done'];
t.plan(expected.length);
const PORT = await t.context.startServer(t, [...expected]);
/** @type {import('../src/types.js').LeaderOptions} */
const lo = {
retryCallback: null, // fail fast, no retries
keepPolling: () => delay(200).then(() => true), // poll really quickly
jitter: null, // no jitter
};
/** @type {import('../src/types.js').FollowerOptions} */
const so = {
proof: 'none',
};
// TODO: Replace with test.macro({title, exec}).
const testHappyPath = (label, ...input) => {
// eslint-disable-next-line no-shadow
const title = label => `happy path ${label}`;
const makeExec =
({ fakeValues, options }) =>
async t => {
const expected = fakeValues;
t.plan(expected.length);
const PORT = await t.context.startFakeServer(t, [...expected], options);
/** @type {import('../src/types.js').LeaderOptions} */
const lo = {
retryCallback: null, // fail fast, no retries
keepPolling: () => delay(200).then(() => true), // poll really quickly
jitter: null, // no jitter
};
/** @type {import('../src/types.js').FollowerOptions} */
const so = {
proof: 'none',
};

// The rest of this test is taken almost verbatim from the README.md, with
// some minor modifications (testLeaderOptions and deepEqual).
const leader = makeLeader(`http://localhost:${PORT}/network-config`, lo);
const castingSpec = makeCastingSpec(':mailbox.agoric1foobarbaz');
const follower = await makeFollower(castingSpec, leader, so);
for await (const { value } of iterateLatest(follower)) {
t.log(`here's a mailbox value`, value);
// The rest of this test is taken almost verbatim from the README.md, with
// some minor modifications (testLeaderOptions and deepEqual).
const leader = makeLeader(`http://localhost:${PORT}/network-config`, lo);
const castingSpec = makeCastingSpec(':mailbox.agoric1foobarbaz');
const follower = await makeFollower(castingSpec, leader, so);
for await (const { value } of iterateLatest(follower)) {
t.log(`here's a mailbox value`, value);

// The rest here is to drive the test.
t.deepEqual(value, expected.shift());
if (expected.length === 0) {
break;
}
}
// The rest here is to drive the test.
t.deepEqual(value, expected.shift());
if (expected.length === 0) {
break;
}
}
};
test(title(label), makeExec(...input));
};

testHappyPath('naked values', {
fakeValues: ['latest', 'later', 'done'],
options: {},
});
testHappyPath('batchSize=1', {
fakeValues: ['latest', 'later', 'done'],
options: { batchSize: 1 },
});
testHappyPath('batchSize=2', {
fakeValues: ['latest', 'later', 'done'],
options: { batchSize: 2 },
});

test('bad network config', async t => {
const PORT = await t.context.startServer(t, []);
const PORT = await t.context.startFakeServer(t, []);
await t.throwsAsync(
() =>
makeLeader(`http://localhost:${PORT}/bad-network-config`, {
Expand All @@ -58,7 +79,7 @@ test('bad network config', async t => {
});

test('missing rpc server', async t => {
const PORT = await t.context.startServer(t, []);
const PORT = await t.context.startFakeServer(t, []);
await t.throwsAsync(
() =>
makeLeader(`http://localhost:${PORT}/missing-network-config`, {
Expand All @@ -83,7 +104,7 @@ test('unrecognized proof', async t => {

test.before(t => {
t.context.cleanups = [];
t.context.startServer = startFakeServer;
t.context.startFakeServer = startFakeServer;
});

test.after(t => {
Expand Down

0 comments on commit 60e0d6f

Please sign in to comment.