Skip to content

Commit

Permalink
fix(chain-storage): tighten up deduplication
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelfig committed Jun 3, 2022
1 parent d467a19 commit 528e040
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 53 deletions.
6 changes: 3 additions & 3 deletions packages/chain-streams/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
This [Agoric](https://agoric.com) Chain Streams package consumes data server
publication leaders in a flexible, future-proof way.

TL;DR: The quick way to consume a mailbox stream is to do:
TL;DR: You can run `yarn demo`, or to consume a mailbox stream do:
```sh
npx agoric stream http://devnet.agoric.net/network-config :mailbox.agoric1foobarbaz
npx agoric stream -Bhttp://devnet.agoric.net/network-config :mailbox.agoric1foobarbaz -otext
```

An example of following an on-chain mailbox in codd (using this package) is:
Expand Down Expand Up @@ -38,7 +38,7 @@ The `streamOpts` argument in `makeChainStream(leader, key, streamOpts)` provides
- the `integrity` option, which has three possibilities:
- `'strict'` - release data only after proving it was validated (may incur waits for one block's data to be validated in the next block),
- `'optimistic'` (default) - release data immediately, but may crash the stream in the future if an already-released value could not be proven,
- `'trusting'` - release data immediately without validation
- `'none'` - release data immediately without validation
- the `decode` option is a function to translate `buf: Uint8Array` into `data: string`
- (default) - interpret buf as a utf-8 string, then `JSON.parse` it
- the `unserializer` option can be
Expand Down
2 changes: 1 addition & 1 deletion packages/chain-streams/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"repository": "https://github.com/Agoric/agoric-sdk",
"scripts": {
"build": "exit 0",
"dev": "node -e 'import(\"./test/fake-rpc-server.js\").then(ns => ns.develop())'",
"demo": "node -e 'import(\"./test/fake-rpc-server.js\").then(ns => ns.develop())'",
"test": "ava",
"test:c8": "c8 $C8_OPTIONS ava --config=ava-nesm.config.js",
"test:xs": "exit 0",
Expand Down
2 changes: 0 additions & 2 deletions packages/chain-streams/src/leader.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import { DEFAULT_RETRY_CALLBACK } from './defaults.js';
import { shuffle } from './shuffle.js';
import { makePollingWatcher } from './watcher.js';

const { details: X } = assert;

/**
* Create a chain leader that rotates through a list of endpoints.
*
Expand Down
67 changes: 36 additions & 31 deletions packages/chain-streams/src/stream-cosmjs.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export const integrityToQueryVerifier = harden({
// Crash hard if we can't prove.
return getProvenValue().catch(crash);
},
trusting: async (_getProvenValue, _crash, getAllegedValue) => {
none: async (_getProvenValue, _crash, getAllegedValue) => {
// Fast and loose.
return getAllegedValue();
},
Expand Down Expand Up @@ -119,8 +119,7 @@ export const makeChainStream = async (leader, storeKey, options = {}) => {
const makeQuerier =
method =>
/**
* @param {number} [_height]
* @param height
* @param {number} [height]
* @returns {Promise<unknown>}
*/
async height => {
Expand Down Expand Up @@ -161,19 +160,35 @@ export const makeChainStream = async (leader, storeKey, options = {}) => {
};

const crash = err => {
console.error(`PROOF VERIFICATION FAILURE; crashing follower`, err);
fail(err);
if (crasher) {
E(crasher).crash(err);
E(crasher).crash(
`PROOF VERIFICATION FAILURE; crashing follower`,
err,
);
} else {
console.error(`PROOF VERIFICATION FAILURE; crashing follower`, err);
}
};

/** @type {Uint8Array} */
const retryOrFail = err =>
E(leader)
.retry(err)
.catch(e => {
fail(e);
throw e;
});

let lastTicket = 0n;
let lastBuf;

/**
* @param {import('./types').ChainStoreChange} allegedChange
*/
const queryAndUpdateOnce = async allegedChange => {
lastTicket += 1n;
const ticket = lastTicket;

// Make an unproven query if we have no alleged value.
const { values: allegedValues, blockHeight: allegedBlockHeight } =
allegedChange;
Expand All @@ -182,52 +197,42 @@ export const makeChainStream = async (leader, storeKey, options = {}) => {
? () => Promise.resolve(allegedValues.at(-1))
: () => getUnprovenValueAtHeight(allegedBlockHeight);
const getProvenValue = () => getProvenValueAtHeight(allegedBlockHeight);

const buf = await queryVerifier(getProvenValue, crash, getAllegedValue);
if (buf.length === 0) {
return true;
if (ticket !== lastTicket) {
// Out of order!
return;
}
if (lastBuf && buf.length === lastBuf.length) {
// We may have the same value.
let i = 0;
while (i < buf.length && buf[i] === lastBuf[i]) {
i += 1;
}
if (i === buf.length) {
// We do! We do have the same value!
return true;
if (lastBuf) {
if (buf.length === lastBuf.length) {
if (buf.every((v, i) => v === lastBuf[i])) {
// Duplicate!
return;
}
}
}
lastBuf = buf;
const data = decode(buf);
if (!unserializer) {
updater.updateState(/** @type {T} */ (data));
return true;
return;
}
const value = await E(unserializer).unserialize(data);
updater.updateState(value);
return true;
};

const allegedKeyChanges = E(leader).watchStoreKey(storeKey);
const changeStream = E(leader).watchStoreKey(storeKey);
const queryWhenKeyChanges = async () => {
for await (const allegedChange of iterateLatest(allegedKeyChanges)) {
for await (const allegedChange of iterateLatest(changeStream)) {
if (finished) {
return;
}
harden(allegedChange);
const keepGoing = await queryAndUpdateOnce(allegedChange).catch(
async e => {
// Keep going after backoff.
await E(leader).retry(e);
return true;
},
);
if (!keepGoing) {
return;
}
await queryAndUpdateOnce(allegedChange).catch(retryOrFail);
}
};

await queryAndUpdateOnce({ values: [], storeKey }).catch(retryOrFail);
queryWhenKeyChanges().catch(fail);

// Ensure we get at least one value before returning.
Expand Down
4 changes: 2 additions & 2 deletions packages/chain-streams/src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ export {};

/**
* @typedef {object} Crasher
* @property {(err: any) => void} crash
* @property {(...args: unknown[]) => void} crash
*/

/**
* @typedef {object} ChainStreamOptions
* @property {null | import('@endo/far').FarRef<Unserializer>} [unserializer]
* @property {(buf: Uint8Array) => any} [decode]
* @property {'strict'|'optimistic'|'trusting'} [integrity]
* @property {'strict'|'optimistic'|'none'} [integrity]
* @property {import('@endo/far').FarRef<Crasher>} [crasher]
*/

Expand Down
6 changes: 3 additions & 3 deletions packages/chain-streams/test/fake-rpc-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ export const startFakeServer = (t, fakeValues) => {

export const jsonPairs = harden([
// Justin is the same as the JSON encoding but without unnecessary quoting
['{"@qclass":"undefined"}', 'undefined'],
['[1,2]', '[1,2]'],
['{"foo":1}', '{foo:1}'],
['{"a":1,"b":2}', '{a:1,b:2}'],
Expand Down Expand Up @@ -222,9 +221,10 @@ export const develop = async () => {
};
const PORT = await startFakeServer(mockT, [...fakeValues]);
console.log(
`Try in another terminal: agoric follow http://localhost:${PORT}/network-config`,
`Try this in another terminal:
agoric stream :fake.path --bootstrap=http://localhost:${PORT}/network-config --sleep=0.5 --integrity=none`,
);
console.log(`Control-C to interrupt...`);
console.warn(`Control-C to interrupt...`);
// Wait forever.
await new Promise(() => {});
};
21 changes: 10 additions & 11 deletions packages/chain-streams/test/test-mvp.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,25 @@ test('happy path', async t => {
t.plan(expected.length);
const PORT = await t.context.startServer(t, [...expected]);
/** @type {import('../src/types.js').ChainLeaderOptions} */
const testLeaderOptions = {
const lo = {
retryCallback: null, // fail fast, no retries
keepPolling: () => delay(200).then(() => true), // poll really quickly
};
/** @type {import('../src/types.js').ChainStreamOptions} */
const testStreamOptions = {
integrity: 'trusting',
const so = {
integrity: 'none',
};

// The rest of this test is taken almost verbatim from the README.md, with
// some minor modifications (testLeaderOptions and deepEqual).
const leader = makeLeader(
`http://localhost:${PORT}/network-config`,
testLeaderOptions,
);
const leader = makeLeader(`http://localhost:${PORT}/network-config`, lo);
const storeKey = makeStoreKey(':mailbox.agoric1foobarbaz');
const stream = await makeChainStream(leader, storeKey, testStreamOptions);
for await (const obj of iterateLatest(stream)) {
t.log('got', obj);
t.deepEqual(obj, expected.shift());
const stream = await makeChainStream(leader, storeKey, so);
for await (const mailbox of iterateLatest(stream)) {
t.log(`here's a mailbox object`, mailbox);

// The rest here is to drive the test.
t.deepEqual(mailbox, expected.shift());
if (expected.length === 0) {
break;
}
Expand Down

0 comments on commit 528e040

Please sign in to comment.