Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(asyncFlow): Stopgap E support #9519

Merged
merged 13 commits into from
Jun 28, 2024
281 changes: 257 additions & 24 deletions packages/async-flow/src/replay-membrane.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,26 @@
/* eslint-disable no-use-before-define */
import { Fail, X, b, makeError, q } from '@endo/errors';
import { isPromise } from '@endo/promise-kit';
import { Far, Remotable, getInterfaceOf } from '@endo/pass-style';
import {
Far,
Remotable,
getInterfaceOf,
getTag,
makeTagged,
passStyleOf,
} from '@endo/pass-style';
import { E } from '@endo/eventual-send';
import { throwLabeled } from '@endo/common/throw-labeled.js';
import { heapVowE } from '@agoric/vow/vat.js';
import { getMethodNames } from '@endo/eventual-send/utils.js';
import { objectMap } from '@endo/common/object-map.js';
import { isVow } from '@agoric/vow/src/vow-utils.js';
import { makeEquate } from './equate.js';
import { makeConvertKit } from './convert.js';

/**
* @import {PromiseKit} from '@endo/promise-kit'
* @import {Vow, VowTools} from '@agoric/vow'
* @import {Passable, PassableCap, CopyTagged} from '@endo/pass-style'
* @import {Vow, VowTools, VowKit} from '@agoric/vow'
* @import {LogStore} from '../src/log-store.js';
* @import {Bijection} from '../src/bijection.js';
* @import {Host, HostVow, LogEntry, Outcome} from '../src/types.js';
Expand All @@ -33,7 +43,7 @@ export const makeReplayMembrane = ({
watchWake,
panic,
}) => {
const { when, watch } = vowTools;
const { when, watch, makeVowKit } = vowTools;

const equate = makeEquate(bijection);

Expand Down Expand Up @@ -127,18 +137,50 @@ export const makeReplayMembrane = ({

// ///////////// Guest to Host or consume log ////////////////////////////////

/**
* The host is not supposed to expose host-side promises to the membrane,
* since they cannot be stored durably or survive upgrade. We cannot just
* automatically wrap any such host promises with host vows, because that
* would mask upgrade hazards if an upgrade happens before the vow settles.
* However, during the transition, the current host APIs called by
* orchestration still return many promises. We want to generate diagnostics
* when we encounter them, but for now, automatically convert them to
* host vow anyway, just so integration testing can proceed to reveal
* additional problems beyond these.
*
* @param {Passable} h
*/
const tolerateHostPromiseToVow = h => {
erights marked this conversation as resolved.
Show resolved Hide resolved
if (isPromise(h)) {
const e = Error('where warning happened');
console.log('Warning for now: vow expected, not promise', h, e);
// TODO remove this stopgap. Here for now because host-side
// promises are everywhere!
// Note: A good place to set a breakpoint, or to uncomment the
// `debugger;` line, to work around bundling.
// debugger;
return watch(h);
} else {
return h;
const passStyle = passStyleOf(h);
switch (passStyle) {
case 'promise': {
const e = Error('where warning happened');
console.log('Warning for now: vow expected, not promise', h, e);
// TODO remove this stopgap. Here for now because host-side
// promises are everywhere!
// Note: A good place to set a breakpoint, or to uncomment the
// `debugger;` line, to work around bundling.
// debugger;
return watch(h);
}
case 'copyRecord': {
const o = /** @type {object} */ (h);
return objectMap(o, tolerateHostPromiseToVow);
}
case 'copyArray': {
const a = /** @type {Array} */ (h);
return harden(a.map(tolerateHostPromiseToVow));
}
case 'tagged': {
const t = /** @type {CopyTagged} */ (h);
if (isVow(t)) {
return h;
}
return makeTagged(getTag(t), tolerateHostPromiseToVow(t.payload));
}
default: {
return h;
}
}
};

Expand All @@ -150,6 +192,7 @@ export const makeReplayMembrane = ({
: hostTarget(...hostArgs);
// This is a temporary kludge anyway. But note that it only
// catches the case where the promise is at the top of hostResult.
harden(hostResult);
hostResult = tolerateHostPromiseToVow(hostResult);
// Try converting here just to route the error correctly
hostToGuest(hostResult, `converting ${optVerb || 'host'} result`);
Expand Down Expand Up @@ -233,14 +276,193 @@ export const makeReplayMembrane = ({

// //////////////// Eventual Send ////////////////////////////////////////////

/**
* @param {PassableCap} hostTarget
* @param {string | undefined} optVerb
* @param {Passable[]} hostArgs
*/
const performSendOnly = (hostTarget, optVerb, hostArgs) => {
try {
optVerb
? heapVowE.sendOnly(hostTarget)[optVerb](...hostArgs)
: // eslint-disable-next-line @typescript-eslint/prefer-ts-expect-error
// @ts-ignore once we changed this from E to heapVowE,
// typescript started complaining that heapVowE(hostTarget)
// is not callable. I'm not sure if this is a just a typing bug
// in heapVowE or also reflects a runtime deficiency. But this
// case it not used yet anyway. We disable it
// with at-ts-ignore rather than at-ts-expect-error because
// the dependency-graph tests complains that the latter is unused.
heapVowE.sendOnly(hostTarget)(...hostArgs);
} catch (hostProblem) {
throw Panic`internal: eventual sendOnly synchrously failed ${hostProblem}`;
}
};

/**
* @param {PassableCap} hostTarget
* @param {string | undefined} optVerb
* @param {Passable[]} hostArgs
* @param {number} callIndex
* @param {VowKit} hostResultKit
* @param {Promise} guestReturnedP
* @returns {Outcome}
*/
const performSend = (
hostTarget,
optVerb,
hostArgs,
callIndex,
hostResultKit,
guestReturnedP,
) => {
const { vow, resolver } = hostResultKit;
try {
const hostPromise = optVerb
? heapVowE(hostTarget)[optVerb](...hostArgs)
: // eslint-disable-next-line @typescript-eslint/prefer-ts-expect-error
// @ts-ignore once we changed this from E to heapVowE,
// typescript started complaining that heapVowE(hostTarget)
// is not callable. I'm not sure if this is a just a typing bug
// in heapVowE or also reflects a runtime deficiency. But this
// case it not used yet anyway. We disable it
// with at-ts-ignore rather than at-ts-expect-error because
// the dependency-graph tests complains that the latter is unused.
heapVowE(hostTarget)(...hostArgs);
resolver.resolve(hostPromise); // TODO does this always work?
} catch (hostProblem) {
throw Panic`internal: eventual send synchrously failed ${hostProblem}`;
}
try {
const entry = harden(['doReturn', callIndex, vow]);
log.pushEntry(entry);
const guestPromise = makeGuestForHostVow(vow, guestReturnedP);
// Note that `guestPromise` is not registered in the bijection since
// guestReturnedP is already the guest for vow. Rather, the handler
// returns guestPromise to resolve guestReturnedP to guestPromise.
doReturn(callIndex, vow);
return harden({
kind: 'return',
result: guestPromise,
});
} catch (problem) {
throw panic(problem);
}
};

const guestHandler = harden({
applyMethodSendOnly(guestTarget, optVerb, guestArgs) {
const callIndex = log.getIndex();
if (stopped || !bijection.hasGuest(guestTarget)) {
Fail`Sent from a previous run: ${guestTarget}`;
}
try {
const guestEntry = harden([
'checkSendOnly',
guestTarget,
optVerb,
guestArgs,
callIndex,
]);
if (log.isReplaying()) {
const entry = log.nextEntry();
try {
equate(guestEntry, entry);
} catch (equateErr) {
// TODO consider Richard Gibson's suggestion for a better way
// to keep track of the error labeling.
throwLabeled(
equateErr,
`replay ${callIndex}:
${q(guestEntry)}
vs ${q(entry)}
`,
);
}
} else {
const entry = guestToHost(guestEntry);
log.pushEntry(entry);
const [_op, hostTarget, _optVerb, hostArgs, _callIndex] = entry;
performSendOnly(hostTarget, optVerb, hostArgs);
}
} catch (fatalError) {
throw panic(fatalError);
}
},
applyMethod(guestTarget, optVerb, guestArgs, guestReturnedP) {
if (optVerb === undefined) {
throw Panic`guest eventual call not yet supported: ${guestTarget}(${b(guestArgs)}) -> ${b(guestReturnedP)}`;
} else {
throw Panic`guest eventual send not yet supported: ${guestTarget}.${b(optVerb)}(${b(guestArgs)}) -> ${b(guestReturnedP)}`;
const callIndex = log.getIndex();
if (stopped || !bijection.hasGuest(guestTarget)) {
Fail`Sent from a previous run: ${guestTarget}`;
}
const hostResultKit = makeVowKit();
const g = bijection.unwrapInit(guestReturnedP, hostResultKit.vow);
erights marked this conversation as resolved.
Show resolved Hide resolved
g === guestReturnedP ||
Fail`internal: guestReturnedP should not unwrap: ${g} vs ${guestReturnedP}`;
/** @type {Outcome} */
let outcome;
try {
const guestEntry = harden([
'checkSend',
guestTarget,
optVerb,
guestArgs,
callIndex,
]);
if (log.isReplaying()) {
const entry = log.nextEntry();
try {
equate(guestEntry, entry);
} catch (equateErr) {
// TODO consider Richard Gibson's suggestion for a better way
// to keep track of the error labeling.
throwLabeled(
equateErr,
`replay ${callIndex}:
${q(guestEntry)}
vs ${q(entry)}
`,
);
}
outcome = /** @type {Outcome} */ (nestInterpreter(callIndex));
} else {
const entry = guestToHost(guestEntry);
log.pushEntry(entry);
const [_op, hostTarget, _optVerb, hostArgs, _callIndex] = entry;
nestInterpreter(callIndex);
outcome = performSend(
hostTarget,
optVerb,
hostArgs,
callIndex,
hostResultKit,
guestReturnedP,
);
}
} catch (fatalError) {
throw panic(fatalError);
}

switch (outcome.kind) {
case 'return': {
return outcome.result;
}
case 'throw': {
throw outcome.problem;
}
default: {
// @ts-expect-error TS correctly knows this case would be outside
// the type. But that's what we want to check.
throw Panic`unexpected outcome kind ${q(outcome.kind)}`;
}
}
},
applyFunctionSendOnly(guestTarget, guestArgs) {
return guestHandler.applyMethodSendOnly(
guestTarget,
undefined,
guestArgs,
);
},
applyFunction(guestTarget, guestArgs, guestReturnedP) {
return guestHandler.applyMethod(
guestTarget,
Expand All @@ -249,6 +471,9 @@ export const makeReplayMembrane = ({
guestReturnedP,
);
},
getSendOnly(guestTarget, prop) {
throw Panic`guest eventual getSendOnly not yet supported: ${guestTarget}.${b(prop)}`;
},
get(guestTarget, prop, guestReturnedP) {
throw Panic`guest eventual get not yet supported: ${guestTarget}.${b(prop)} -> ${b(guestReturnedP)}`;
},
Expand Down Expand Up @@ -340,13 +565,21 @@ export const makeReplayMembrane = ({

/**
* @param {Vow} hVow
* @returns {unknown}
* @param {Promise} [promiseKey]
* If provided, use this promise as the key in the guestPromiseMap
* rather than the returned promise. This only happens when the
* promiseKey ends up forwarded to the returned promise anyway, so
* associating it with this resolve/reject pair is not incorrect.
* It is needed when `promiseKey` is also entered into the bijection
* paired with hVow.
* @returns {Promise}
*/
const makeGuestForHostVow = hVow => {
const makeGuestForHostVow = (hVow, promiseKey = undefined) => {
hVow = tolerateHostPromiseToVow(hVow);
isVow(hVow) || Fail`vow expected ${hVow}`;
const { promise, resolve, reject } = makeGuestPromiseKit();
guestPromiseMap.set(promise, harden({ resolve, reject }));
promiseKey ??= promise;
guestPromiseMap.set(promiseKey, harden({ resolve, reject }));

watchWake(hVow);

Expand All @@ -370,7 +603,7 @@ export const makeReplayMembrane = ({
hVow,
async hostFulfillment => {
await log.promiseReplayDone(); // should never reject
if (!stopped && guestPromiseMap.get(promise) !== 'settled') {
if (!stopped && guestPromiseMap.get(promiseKey) !== 'settled') {
/** @type {LogEntry} */
const entry = harden(['doFulfill', hVow, hostFulfillment]);
log.pushEntry(entry);
Expand All @@ -385,7 +618,7 @@ export const makeReplayMembrane = ({
},
async hostReason => {
await log.promiseReplayDone(); // should never reject
if (!stopped && guestPromiseMap.get(promise) !== 'settled') {
if (!stopped && guestPromiseMap.get(promiseKey) !== 'settled') {
/** @type {LogEntry} */
const entry = harden(['doReject', hVow, hostReason]);
log.pushEntry(entry);
Expand Down
Loading
Loading