Skip to content

Commit

Permalink
feat: remote procedure call through inxedb and rxdb
Browse files Browse the repository at this point in the history
  • Loading branch information
sanchezcarlosjr committed Jul 16, 2023
1 parent f0fae1b commit e0dad91
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 11 deletions.
52 changes: 41 additions & 11 deletions src/app/notebook/shell/documentObserver.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import {concatMap, filter, firstValueFrom, Observable, tap} from "rxjs";
import {concatMap, distinctUntilChanged, filter, firstValueFrom, from, Observable, skip, tap} from "rxjs";
import {RxDatabase} from "rxdb/dist/types/types";
import {RxDocument} from "rxdb";
import {randomCouchString} from "rxdb/plugins/utils";

type EventLoop = { arguments: any, id: string }[];

export class DocumentObserver {
private tasks: Promise<any>[] = [];
public readonly rpc = new Proxy({}, {
get: (target, prop, receiver) =>
async (args: any) => this.call(prop.toString(), args)
});

constructor(
private documentId: string,
Expand All @@ -13,12 +20,7 @@ export class DocumentObserver {
}

wait() {
return new Promise<boolean>(
resolve => queueMicrotask(async () => {
await Promise.allSettled(this.tasks);
resolve(true);
})
);
return Promise.allSettled(this.tasks);
}

static setup(db: Observable<RxDatabase>, documentId: string = 'environment', collection: string = 'view') {
Expand All @@ -45,11 +47,39 @@ export class DocumentObserver {
);
}

// TODO: This should be a decorator. https://github.com/tc39/proposal-decorators
// TODO: We must implement Leader election, coordination, synchronization, loader balancer and self-stabilization algorithm as well as E2E encryption.
// Scheduler.
async remoteProcedure(path: string = '', callback: (params: any[]) => any) {
const document = DocumentObserver.setup(this.db, path, this.collection);
await document.set("queue", []);
return (document.get('queue') as Observable<EventLoop>).pipe(
filter(x => x && x.length > 0),
distinctUntilChanged((prev, curr) => prev.length === curr.length),
concatMap(async queue => {
const request = queue[queue.length-1];
const response = await callback(request.arguments);
await document.set(request.id, response);
}
)
)
}

async call(method:string, args?: any) {
if (args && 'toJs' in args && typeof args.toJs === 'function') {
args = args.toJs();
}
const document = DocumentObserver.setup(this.db, method, this.collection);
const id = randomCouchString(10);
await document.push('queue', {id, arguments: args});
return firstValueFrom(document.get(id).pipe(skip(1)));
}

createProxy() {
return new Proxy(this, {
get(target: DocumentObserver, p: string | symbol, receiver: any): any {
if (typeof p === 'string' && p[p.length - 1] === '$') {
return target.get(p.slice(0, -1));
get(target: DocumentObserver, property: string | symbol, receiver: any): any {
if (typeof property === 'string' && property[property.length - 1] === '$') {
return target.get(property.slice(0, -1));
}
// @ts-ignore
return Reflect.get(...arguments);
Expand Down Expand Up @@ -86,7 +116,7 @@ export class DocumentObserver {
return result._data[p];
}

inc(p: string | symbol, newValue: any) {
inc(p: string | symbol, newValue: number = 1) {
return this.set(p, newValue, '$inc');
}

Expand Down
7 changes: 7 additions & 0 deletions src/app/notebook/shell/process.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,7 @@ class ProcessWorker {
}).pipe(shareReplay(1));
this.environmentObserver = new DocumentObserver("environment", environment.db);
environment.environment = this.environmentObserver.createProxy();
environment.setupDocumentObserver = (document: string) => DocumentObserver.setup(environment.db, document);
environment.hos = new HostOperatingSystem(environment);
environment.hfs = new HostFilesystem(environment.hos);
environment.exit = (message: string = "0") => {
Expand All @@ -807,6 +808,12 @@ class ProcessWorker {
environment.P = P;
environment.editor = new EditorJS(this.environment);
environment.from = from;
environment.sum = (acc: any, value: any) => {
acc.push(value);
return acc;
}
environment.reduceWithSum = reduce(environment.sum , []);
environment.scanWithSum = scan(environment.sum, []);
environment.Pattern = Pattern;
environment.isMatching = isMatching;
environment.match = match;
Expand Down

0 comments on commit e0dad91

Please sign in to comment.