Skip to content

Commit

Permalink
client-web: improve subscription management
Browse files Browse the repository at this point in the history
  • Loading branch information
franzos committed Aug 23, 2023
1 parent 09ee6e5 commit 0e55dc9
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 56 deletions.
4 changes: 0 additions & 4 deletions client-web/src/layouts/primary.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,6 @@ export function PrimaryLayout() {
};

useEffect(() => {
const init = async () => {
await update();
};
init();
const statsUpdateInterval = setInterval(update, 1000);

return () => clearInterval(statsUpdateInterval);
Expand Down
9 changes: 8 additions & 1 deletion client-web/src/routes/subscriptions.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ export function SubscriptionsRoute() {
<Td>
<Tooltip label={sub.id}>{`${sub.id.substring(0, 3)}..`}</Tooltip>
</Td>
<Td>
<Tooltip label={sub.relayId}>{`${sub.relayId.substring(
0,
3
)}..`}</Tooltip>
</Td>
<Td>{sub.filters.kinds && kindsToName(sub.filters.kinds)}</Td>
<Td>
<Button
Expand All @@ -109,7 +115,7 @@ export function SubscriptionsRoute() {
<Td>
<Button
size={"sm"}
onClick={() => useNClient.getState().unsubscribe(sub.id)}
onClick={() => useNClient.getState().unsubscribe([sub.id])}
>
Unsubscribe
</Button>
Expand All @@ -125,6 +131,7 @@ export function SubscriptionsRoute() {
<Thead>
<Tr>
<Th>ID</Th>
<Th>Relay</Th>
<Th>Kind</Th>
<Th>Filter</Th>
<Th>View</Th>
Expand Down
2 changes: 1 addition & 1 deletion client-web/src/state/client-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export interface NClient extends NClientBase {
relayEvents: WebSocketEvent[];

getSubscriptions: () => Promise<RelaySubscription[]>;
unsubscribe: (id: string) => Promise<void>;
unsubscribe: (ids: string[]) => Promise<void>;
unsubscribeAll: () => Promise<void>;

keystore: "none" | "localstore" | "nos2x" | "download";
Expand Down
45 changes: 20 additions & 25 deletions client-web/src/state/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,12 @@ export const useNClient = create<NClient>((set, get) => ({
};

const throttledEvents = throttle(processEvents, 100);

worker.addEventListener("message", throttledEvents);

const following = await get().store.getAllUsersFollowing();
if (following) {
set({ followingUserIds: following.map((u) => u.user.pubkey) });
}
},
connected: false,
connect: async (relays?: Relay[]) => {
Expand Down Expand Up @@ -134,9 +138,9 @@ export const useNClient = create<NClient>((set, get) => ({
subscribe: async (payload: SubscriptionRequest) => {
return get().store.subscribe(payload);
},
unsubscribe: async (id: string) => {
console.log(`Unsubscribing ${id}`);
return get().store.unsubscribe(id);
unsubscribe: async (ids: string[]) => {
console.log(`Unsubscribing ${ids}`);
return get().store.unsubscribe(ids);
},
unsubscribeAll: async () => {
console.log(`Unsubscribing all`);
Expand Down Expand Up @@ -512,17 +516,15 @@ export const useNClient = create<NClient>((set, get) => ({
setViewSubscription: async (view: string, filters: NFilters) => {
const subs = await get().getSubscriptions();

const sameView = subs.find((s) => s.options && s.options.view === view);
if (sameView) {
console.log(`Already subscribed to view ${view}`);
return;
}

const subIds = [];
for (const sub of subs) {
if (sub.options && sub.options.view) {
await get().unsubscribe(sub.id);
subIds.push(sub.id);
}
}
if (subIds.length > 0) {
await get().unsubscribe(subIds);
}

const relays = await get().getRelays();

Expand Down Expand Up @@ -617,36 +619,29 @@ export const useNClient = create<NClient>((set, get) => ({
await get().requestInformation(item, {
timeout: 10000,
timeoutAt: Date.now() + 10000,
view,
})
);
}
}, 5000);
}, 4000);
},

/**
* Remove a subscription related to a view
* @param view
* @returns
*/
removeViewSubscription: async (view?: string) => {
removeViewSubscription: async (view: string) => {
const subs = await get().getSubscriptions();

console.log(`Remove view subscription ${view}`);

if (!view) {
const unsubPromises = subs.map((sub) => get().unsubscribe(sub.id));
await Promise.all(unsubPromises);
return;
}

const subsToUnsubscribe = subs.filter(
const filteredSubs = subs.filter(
(sub) => sub.options && sub.options.view === view
);

const unsubPromises = subsToUnsubscribe.map((sub) =>
get().unsubscribe(sub.id)
);

await Promise.all(unsubPromises);
if (filteredSubs.length === 0) {
await get().unsubscribe(filteredSubs.map((sub) => sub.id));
}
},
}));
2 changes: 1 addition & 1 deletion client-web/src/state/worker-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export interface NClientWorker extends NClientBase {
eventsPublishingQueue: PublishingQueueItem[];

getSubscriptions: () => RelaySubscription[];
unsubscribe: (id: string) => void;
unsubscribe: (ids: string[]) => void;
unsubscribeAll: () => void;

count: (payload: CountRequest) => Subscription[] | undefined;
Expand Down
8 changes: 3 additions & 5 deletions client-web/src/state/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ class WorkerClass implements NClientWorker {
return this.client?.subscribe(payload);
}

unsubscribe(id: string) {
return this.client?.unsubscribe(id);
unsubscribe(ids: string[]) {
return this.client?.unsubscribe(ids);
}

unsubscribeAll() {
Expand Down Expand Up @@ -665,9 +665,7 @@ class WorkerClass implements NClientWorker {

if (subscriptions.length > 0) {
setTimeout(() => {
for (const subscription of subscriptions) {
this.unsubscribe(subscription.id);
}
this.unsubscribe(subscriptions.map((sub) => sub.id));
}, timeout);
}
}
Expand Down
33 changes: 14 additions & 19 deletions packages/common/src/classes/relay-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,17 +120,6 @@ export class RelayClientBase {
}

const subscriptions = this.sendSubscribe(payload);

// // LEGACY: Track commands on relay connection
// for (const sub of subscriptions) {
// const relay = this.relays.find((r) => r.id === sub.connectionId);
// if (relay) {
// relay.addCommand({
// connectionId: sub.connectionId,
// subscriptionId: sub.id,
// });
// }
// }
return subscriptions;
}

Expand All @@ -139,17 +128,23 @@ export class RelayClientBase {
* @param subscriptionId
* @param relayId: if unsubscribing from a specific relay
*/
unsubscribe(subscriptionId: string) {
unsubscribe(subscriptionIds: string[]) {
for (const relay of this.relays) {
if (!relay.isReady("read")) {
continue;
}

const sub = relay.getSubscription(subscriptionId);
if (sub) {
const subs = relay.getSubscriptions();
if (!subs || subs.length === 0) {
continue;
}

const filtered = subs.filter((s) => subscriptionIds.includes(s.id));

for (const sub of filtered) {
const message: ClientClose = {
type: CLIENT_MESSAGE_TYPE.CLOSE,
subscriptionId,
subscriptionId: sub.id,
};

if (sub.options && sub.options.timeout) {
Expand All @@ -159,7 +154,7 @@ export class RelayClientBase {
relay.ws.sendMessage(
JSON.stringify([message.type, message.subscriptionId])
);
relay.removeSubscription(subscriptionId);
relay.removeSubscription(sub.id);
} catch (e) {
console.error(e);
}
Expand All @@ -172,9 +167,9 @@ export class RelayClientBase {
if (relay.isReady("read")) {
continue;
}
const subscriptions = relay.getSubscriptions();
for (const sub of subscriptions) {
this.unsubscribe(sub.id);
const subs = relay.getSubscriptions();
if (subs && subs.length > 0) {
this.unsubscribe(subs.map((s) => s.id));
}
}
}
Expand Down
1 change: 1 addition & 0 deletions packages/common/src/types/subscriptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export interface SubscriptionOptions {
* for ex. `welcome` or `profile:<uuid>
*/
view?: string;
unsubscribeOnEose?: boolean;
}

/**
Expand Down

0 comments on commit 0e55dc9

Please sign in to comment.