Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: Support publishing and consuming chain storage stream cells #5942

Merged
merged 20 commits into from
Aug 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
75b3e38
test(vats): Add tests for forthcoming chainStorage append messages
gibson042 Aug 12, 2022
461204a
feat(vats): Add support for configuring chainStorage nodes as sequences
gibson042 Aug 12, 2022
35db0da
feat(casting): Update to consume stream cells
gibson042 Aug 12, 2022
52de535
feat(scripts): Update get-flattened-publication to consume stream cells
gibson042 Aug 12, 2022
6bf3878
chore(vats): Update typedef to support new StorageMessage method "app…
gibson042 Aug 12, 2022
09e165c
chore(casting): Fix parameter types for fake RPC server
gibson042 Aug 12, 2022
a427c59
feat(vats)!: Switch the "published" chain storage subtree to stream c…
gibson042 Aug 12, 2022
ade09fb
test(scripts): Add test files for get-flattened-publication.sh
gibson042 Aug 12, 2022
47fe9fa
fix(scripts): Fix flattening of falsy values
gibson042 Aug 12, 2022
da04888
style(casting): Accept review suggestions
gibson042 Aug 13, 2022
0b97c21
refactor(scripts): Improve error reporting
gibson042 Aug 14, 2022
a167271
test(scripts): Collapse DIFF_OPTS into DIFF
gibson042 Aug 14, 2022
38ce6ea
test(scripts): Add tests for repeated references of the same slot
gibson042 Aug 14, 2022
bf1f29a
fix(scripts): Default slot iface from prior use
gibson042 Aug 14, 2022
c786022
test(scripts): Add a scripts test runner
gibson042 Aug 14, 2022
929a07c
chore: Commit to stream cell values being strings
gibson042 Aug 17, 2022
40530fd
refactor: Rename stream cell "height" to "blockHeight"
gibson042 Aug 18, 2022
e4da515
refactor(casting): Synchronize stream cell detection with scripts/get…
gibson042 Aug 19, 2022
5328671
chore: Align with latest Tendermint RPC documentation
gibson042 Aug 19, 2022
2ce41da
refactor: Rename get-flattened-publication.sh "blockHeight" to "respo…
gibson042 Aug 19, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .github/workflows/test-scripts.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
name: Run scripts tests

on:
pull_request:
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
run-scripts-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- run: scripts/test/test.sh
36 changes: 36 additions & 0 deletions golang/cosmos/x/vstorage/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package keeper

import (
"bytes"
"encoding/json"
"strconv"
"strings"

sdk "github.com/cosmos/cosmos-sdk/types"
Expand All @@ -11,6 +13,16 @@ import (
"github.com/Agoric/agoric-sdk/golang/cosmos/x/vstorage/types"
)

// StreamCell is an envelope representing a sequence of values written at a path in a single block.
// It is persisted to storage as a { "blockHeight": "<digits>", "values": ["...", ...] } JSON text
// that off-chain consumers rely upon.
// Many of those consumers *also* rely upon the strings of "values" being valid JSON text
// (cf. scripts/get-flattened-publication.sh), but we do not enforce that in this package.
type StreamCell struct {
BlockHeight string `json:"blockHeight"`
Values []string `json:"values"`
}

// Keeper maintains the link to data storage and exposes getter/setter methods
// for the various parts of the state machine
type Keeper struct {
Expand Down Expand Up @@ -127,6 +139,30 @@ func (k Keeper) SetStorageAndNotify(ctx sdk.Context, path, value string) {
)
}

func (k Keeper) AppendStorageValueAndNotify(ctx sdk.Context, path, value string) error {
blockHeight := strconv.FormatInt(ctx.BlockHeight(), 10)

// Preserve correctly-formatted data within the current block,
// otherwise initialize a blank cell.
currentData := k.GetData(ctx, path)
var cell StreamCell
_ = json.Unmarshal([]byte(currentData), &cell)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error handling

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intentionally suppressed here because we populate cell manually if Unmarshal fails to do so.

if cell.BlockHeight != blockHeight {
cell = StreamCell{BlockHeight: blockHeight, Values: make([]string, 0, 1)}
}

// Append the new value.
cell.Values = append(cell.Values, value)

// Perform the write.
bz, err := json.Marshal(cell)
if err != nil {
return err
}
k.SetStorageAndNotify(ctx, path, string(bz))
return nil
}

func componentsToPath(components []string) string {
return strings.Join(components, types.PathSeparator)
}
Expand Down
8 changes: 8 additions & 0 deletions golang/cosmos/x/vstorage/vstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,15 @@ func (sh vstorageHandler) Receive(cctx *vm.ControllerContext, str string) (ret s
keeper.SetStorageAndNotify(cctx.Context, msg.Path, msg.Value)
return "true", nil

case "append":
err = keeper.AppendStorageValueAndNotify(cctx.Context, msg.Path, msg.Value)
if err != nil {
return "", err
}
return "true", nil

case "get":
// Note that "get" does not (currently) unwrap a StreamCell.
value := keeper.GetData(cctx.Context, msg.Path)
if value == "" {
return "null", nil
Expand Down
37 changes: 26 additions & 11 deletions packages/casting/src/follower-cosmjs.js
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ export const makeCosmjsFollower = (
* @param {import('./types').CastingChange} allegedChange
*/
const tryQueryAndUpdate = async allegedChange => {
const committer = prepareUpdateInOrder();
let committer = prepareUpdateInOrder();

// Make an unproven query if we have no alleged value.
const { values: allegedValues, blockHeight: allegedBlockHeight } =
Expand Down Expand Up @@ -334,18 +334,32 @@ export const makeCosmjsFollower = (
}
}
lastBuf = buf;
const data = decode(buf);
if (!unserializer) {
/** @type {T} */
const value = data;
committer.commit({ value });
return;
let streamCell = decode(buf);
// Upgrade a naked value to a JSON stream cell if necessary.
if (
streamCell.blockHeight === undefined ||
streamCell.values === undefined
) {
streamCell = { values: [JSON.stringify(streamCell)] };
}
const value = await E(unserializer).unserialize(data);
if (!committer.isValid()) {
return;
for (let i = 0; i < streamCell.values.length; i += 1) {
const data = JSON.parse(streamCell.values[i]);
const isLast = i + 1 === streamCell.values.length;
const value = /** @type {T} */ (
unserializer
? // eslint-disable-next-line no-await-in-loop,@jessie.js/no-nested-await
await E(unserializer).unserialize(data)
: data
);
// QUESTION: How would reach a point where this `isValid()` fails,
// and what is the proper handling?
if (!unserializer || committer.isValid()) {
committer.commit({ value });
if (!isLast) {
committer = prepareUpdateInOrder();
}
}
}
committer.commit({ value });
};

const changeFollower = E(leader).watchCasting(castingSpecP);
Expand All @@ -362,6 +376,7 @@ export const makeCosmjsFollower = (
return;
}
harden(allegedChange);
// eslint-disable-next-line @jessie.js/no-nested-await
await queryAndUpdateOnce(allegedChange);
}
};
Expand Down
48 changes: 37 additions & 11 deletions packages/casting/test/fake-rpc-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,19 @@ const fakeStatusResult = {
},
};

export const startFakeServer = (t, fakeValues, marshaller = makeMarshal()) => {
/** @typedef {Partial<import('ava').ExecutionContext<{cleanups: Array<() => void>}>> & {context}} FakeServerTestContext */
/**
* @param {FakeServerTestContext} t
* @param {Array<{any}>} fakeValues
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does {any} mean?

Copy link
Member Author

@gibson042 gibson042 Aug 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, it imposes no constraints beyond existence.

* @param {object} [options]
* @param {Marshaller} [options.marshaller]
* @param {number} [options.batchSize] count of stream-cell results per response, or 0/absent to return lone naked values
*/
export const startFakeServer = (t, fakeValues, options = {}) => {
const { log = console.log } = t;
lastPort += 1;
const PORT = lastPort;
Comment on lines 77 to 78
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aside: this usually tells me we should listen on 0 and ask the kernel for the port we were assigned.

const { marshaller = makeMarshal(), batchSize = 0 } = options;
return new Promise(resolve => {
log('starting http server on port', PORT);
const app = express();
Expand Down Expand Up @@ -97,6 +106,8 @@ export const startFakeServer = (t, fakeValues, marshaller = makeMarshal()) => {
buf.set(ascii, dataPrefix.length);
return toBase64(buf);
};
let blockHeight = 74863;
let responseValueBase64;
app.post('/tendermint-rpc', (req, res) => {
log('received', req.path, req.body, req.params);
const reply = result => {
Expand All @@ -114,10 +125,23 @@ export const startFakeServer = (t, fakeValues, marshaller = makeMarshal()) => {
break;
}
case 'abci_query': {
const value =
fakeValues.length === 0
? null
: encode(marshaller.serialize(fakeValues.shift()));
blockHeight += 2;
const values = fakeValues.splice(0, Math.max(1, batchSize));
if (values.length > 0) {
if (batchSize > 0) {
// Return a JSON stream cell.
const serializedValues = values.map(val =>
JSON.stringify(marshaller.serialize(val)),
);
responseValueBase64 = encode({
blockHeight: String(blockHeight - 1),
values: serializedValues,
});
} else {
// Return a single naked value.
responseValueBase64 = encode(marshaller.serialize(values[0]));
}
}
const result = {
response: {
code: 0,
Expand All @@ -127,9 +151,9 @@ export const startFakeServer = (t, fakeValues, marshaller = makeMarshal()) => {
key: Buffer.from(
'swingset/data:mailbox.agoric1foobarbaz',
).toString('base64'),
value,
value: responseValueBase64,
proofOps: null,
height: '74863',
height: String(blockHeight),
codespace: '',
},
};
Expand Down Expand Up @@ -226,10 +250,12 @@ export const develop = async () => {
unserialize({ body: jsonMarshalled, slots: [] }),
),
);
const mockT = {
log: console.log,
context: { cleanups: [] },
};
const mockT = /** @type {FakeServerTestContext} */ (
/** @type {unknown} */ ({
log: console.log,
context: { cleanups: [] },
})
);
const PORT = await startFakeServer(mockT, [...fakeValues]);
console.log(
`Try this in another terminal:
Expand Down
81 changes: 51 additions & 30 deletions packages/casting/test/test-mvp.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,60 @@ import {
import { delay } from '../src/defaults.js';
import { startFakeServer } from './fake-rpc-server.js';

test('happy path', async t => {
const expected = ['latest', 'later', 'done'];
t.plan(expected.length);
const PORT = await t.context.startServer(t, [...expected]);
/** @type {import('../src/types.js').LeaderOptions} */
const lo = {
retryCallback: null, // fail fast, no retries
keepPolling: () => delay(200).then(() => true), // poll really quickly
jitter: null, // no jitter
};
/** @type {import('../src/types.js').FollowerOptions} */
const so = {
proof: 'none',
};
// TODO: Replace with test.macro({title, exec}).
gibson042 marked this conversation as resolved.
Show resolved Hide resolved
const testHappyPath = (label, ...input) => {
// eslint-disable-next-line no-shadow
const title = label => `happy path ${label}`;
const makeExec =
({ fakeValues, options }) =>
async t => {
const expected = fakeValues;
t.plan(expected.length);
const PORT = await t.context.startFakeServer(t, [...expected], options);
/** @type {import('../src/types.js').LeaderOptions} */
const lo = {
retryCallback: null, // fail fast, no retries
keepPolling: () => delay(200).then(() => true), // poll really quickly
jitter: null, // no jitter
};
/** @type {import('../src/types.js').FollowerOptions} */
const so = {
proof: 'none',
};

// The rest of this test is taken almost verbatim from the README.md, with
// some minor modifications (testLeaderOptions and deepEqual).
const leader = makeLeader(`http://localhost:${PORT}/network-config`, lo);
const castingSpec = makeCastingSpec(':mailbox.agoric1foobarbaz');
const follower = await makeFollower(castingSpec, leader, so);
for await (const { value } of iterateLatest(follower)) {
t.log(`here's a mailbox value`, value);
// The rest of this test is taken almost verbatim from the README.md, with
// some minor modifications (testLeaderOptions and deepEqual).
const leader = makeLeader(`http://localhost:${PORT}/network-config`, lo);
const castingSpec = makeCastingSpec(':mailbox.agoric1foobarbaz');
const follower = await makeFollower(castingSpec, leader, so);
for await (const { value } of iterateLatest(follower)) {
t.log(`here's a mailbox value`, value);

// The rest here is to drive the test.
t.deepEqual(value, expected.shift());
if (expected.length === 0) {
break;
}
}
// The rest here is to drive the test.
t.deepEqual(value, expected.shift());
if (expected.length === 0) {
break;
}
}
};
test(title(label), makeExec(...input));
};

testHappyPath('naked values', {
fakeValues: ['latest', 'later', 'done'],
options: {},
});
testHappyPath('batchSize=1', {
fakeValues: ['latest', 'later', 'done'],
options: { batchSize: 1 },
});
testHappyPath('batchSize=2', {
fakeValues: ['latest', 'later', 'done'],
options: { batchSize: 2 },
});

test('bad network config', async t => {
const PORT = await t.context.startServer(t, []);
const PORT = await t.context.startFakeServer(t, []);
await t.throwsAsync(
() =>
makeLeader(`http://localhost:${PORT}/bad-network-config`, {
Expand All @@ -58,7 +79,7 @@ test('bad network config', async t => {
});

test('missing rpc server', async t => {
const PORT = await t.context.startServer(t, []);
const PORT = await t.context.startFakeServer(t, []);
await t.throwsAsync(
() =>
makeLeader(`http://localhost:${PORT}/missing-network-config`, {
Expand All @@ -83,7 +104,7 @@ test('unrecognized proof', async t => {

test.before(t => {
t.context.cleanups = [];
t.context.startServer = startFakeServer;
t.context.startFakeServer = startFakeServer;
});

test.after(t => {
Expand Down
2 changes: 1 addition & 1 deletion packages/notifier/src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@
* @property {(data: string) => void} setValue publishes some data
* @property {() => ERef<VStorageKey>} getStoreKey get the
* externally-reachable store key for this storage item
* @property {(subPath: string) => StorageNode} makeChildNode
* @property {(subPath: string, options?: {sequence?: boolean}) => StorageNode} makeChildNode
*/

/**
Expand Down
1 change: 1 addition & 0 deletions packages/vats/src/core/chain-behaviors.js
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ export const makeChainStorage = async ({
bridgeManager,
BRIDGE_ID.STORAGE,
ROOT_PATH,
{ sequence: true },
);
chainStorageP.resolve(rootNodeP);
};
Expand Down
Loading