Skip to content

Commit

Permalink
try it out
Browse files Browse the repository at this point in the history
  • Loading branch information
acdibble committed Jun 18, 2024
1 parent b7ad5cb commit e7dc4e5
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 69 deletions.
1 change: 1 addition & 0 deletions bouncer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"ethers": "6.8.0",
"md5": "^2.3.0",
"minimist": "^1.2.8",
"rxjs": "^7.8.1",
"tiny-secp256k1": "^2.2.1",
"toml": "^3.0.0",
"web3": "^1.9.0",
Expand Down
3 changes: 3 additions & 0 deletions bouncer/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

149 changes: 80 additions & 69 deletions bouncer/shared/utils/substrate.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import 'disposablestack/auto';
import { ApiPromise, WsProvider } from '@polkadot/api';
import { Observable, Subject } from 'rxjs';
import { deferredPromise } from '../utils';

// @ts-expect-error polyfilling
Expand Down Expand Up @@ -116,89 +117,104 @@ export type Event<T = any> = {
eventIndex: number;
};

async function* subscribeHeads({
chain,
finalized = false,
signal,
}: {
chain: SubstrateChain;
finalized?: boolean;
signal?: AbortSignal;
}) {
// take the correct substrate API
await using api = await apiMap[chain]();
// prepare a stack for cleanup
using stack = new DisposableStack();

// subscribe to the correct head based on the finalized flag
const subscribe = finalized
? api.rpc.chain.subscribeFinalizedHeads
: api.rpc.chain.subscribeNewHeads;

// async generator is pull-based, but the subscribe new heads is push-based
async function* observableToIterable<T>(observer: Observable<T>, signal?: AbortSignal) {
// async generator is pull-based, but the observable is push-based
// if the consumer takes too long, we need to buffer the events
const buffer: Event[][] = [];
const buffer: T[] = [];

// yield the first batch of events via a promise because it is asynchronous
let promise: Promise<Event[]> | undefined;
let resolve: ((value: Event[]) => void) | undefined;
let promise: Promise<T | null> | undefined;
let resolve: ((value: T | null) => void) | undefined;
let reject: ((error: Error) => void) | undefined;
({ resolve, promise, reject } = deferredPromise<T | null>());
let done = false;

signal?.addEventListener('abort', () => {
reject?.(new Error('Aborted'));
});

({ resolve, promise, reject } = deferredPromise<Event[]>());

const unsubscribe = await subscribe(async (header) => {
const historicApi = await api.at(header.hash);

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const rawEvents = (await historicApi.query.system.events()) as unknown as any[];
const events: Event[] = [];

// iterate over all the events, reshaping them for the consumer
rawEvents.forEach(({ event }, index) => {
events.push({
name: { section: event.section, method: event.method },
data: event.toHuman().data,
block: header.number.toNumber(),
eventIndex: index,
});
});
const complete = () => {
done = true;
resolve?.(null);
};
signal?.addEventListener('abort', complete, { once: true });

// if we haven't consumed the promise yet, resolve it and prepare the for
// the next batch, otherwise begin buffering the events
if (resolve) {
resolve(events);
promise = undefined;
resolve = undefined;
reject = undefined;
} else {
buffer.push(events);
}
const sub = observer.subscribe({
error: (error) => {
reject?.(error);
},
next: (value) => {
// if we haven't consumed the promise yet, resolve it and prepare the for
// the next batch, otherwise begin buffering the events
if (resolve) {
resolve(value);
promise = undefined;
resolve = undefined;
reject = undefined;
} else {
buffer.push(value);
}
},
complete,
});

// automatic cleanup!
stack.defer(unsubscribe);

while (true) {
while (!done) {
const next = await promise.catch(() => null);

// yield the first batch
if (next === null) break;
yield* next;
yield next;

// if the consume took too long, yield the buffered events
while (buffer.length !== 0) {
yield* buffer.shift()!;
yield buffer.shift()!;
}

// reset for the next batch
({ resolve, promise, reject } = deferredPromise<Event[]>());
({ resolve, promise, reject } = deferredPromise<T | null>());
}

sub.unsubscribe();
}

const subscribeHeads = getCachedDisposable(
async ({ chain, finalized = false }: { chain: SubstrateChain; finalized?: boolean }) => {
// prepare a stack for cleanup
const stack = new AsyncDisposableStack();
const api = stack.use(await apiMap[chain]());
// take the correct substrate API

// subscribe to the correct head based on the finalized flag
const subscribe = finalized
? api.rpc.chain.subscribeFinalizedHeads
: api.rpc.chain.subscribeNewHeads;
const subject = new Subject<Event>();

const unsubscribe = await subscribe(async (header) => {
const historicApi = await api.at(header.hash);

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const rawEvents = (await historicApi.query.system.events()) as unknown as any[];

// iterate over all the events, reshaping them for the consumer
rawEvents.forEach(({ event }, index) => {
subject.next({
name: { section: event.section, method: event.method },
data: event.toHuman().data,
block: header.number.toNumber(),
eventIndex: index,
});
});
});

// automatic cleanup!
stack.defer(unsubscribe);

return {
observable: subject as Observable<Event>,
[Symbol.asyncDispose]() {
return stack.disposeAsync();
},
};
},
);

type EventTest<T> = (event: Event<T>) => boolean;

interface BaseOptions<T> {
Expand Down Expand Up @@ -248,14 +264,9 @@ export function observeEvent<T = any>(

const controller = abortable ? new AbortController() : undefined;

const it = subscribeHeads({ chain, finalized });

controller?.signal.addEventListener('abort', () => {
/* eslint-disable-next-line @typescript-eslint/no-floating-promises */
it.return();
});

const findEvent = async () => {
await using subscription = await subscribeHeads({ chain, finalized });
const it = observableToIterable(subscription.observable, controller?.signal);
monitor += 1;
console.log({ 'monitoring events': monitor });
try {
Expand Down

0 comments on commit e7dc4e5

Please sign in to comment.