Skip to content

Commit

Permalink
Merge pull request #5141 from WalletConnect/feat/link-mode
Browse files Browse the repository at this point in the history
feat: link mode
  • Loading branch information
ganchoradkov authored Sep 6, 2024
2 parents 67bdcd8 + 4bde231 commit 3f74884
Show file tree
Hide file tree
Showing 30 changed files with 937 additions and 169 deletions.
5 changes: 5 additions & 0 deletions packages/core/src/constants/relayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,8 @@ export const RELAYER_SDK_VERSION = "2.15.3";

// delay to wait before closing the transport connection after init if not active
export const RELAYER_TRANSPORT_CUTOFF = 10_000;

export const TRANSPORT_TYPES = {
link_mode: "link_mode",
relay: "relay",
} as const;
1 change: 1 addition & 0 deletions packages/core/src/constants/store.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export const STORE_STORAGE_VERSION = "0.3";

export const WALLETCONNECT_CLIENT_ID = "WALLETCONNECT_CLIENT_ID";
export const WALLETCONNECT_LINK_MODE_APPS = "WALLETCONNECT_LINK_MODE_APPS";
28 changes: 22 additions & 6 deletions packages/core/src/controllers/crypto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@ import {
validateEncoding,
validateDecoding,
isTypeOneEnvelope,
isTypeTwoEnvelope,
encodeTypeTwoEnvelope,
decodeTypeTwoEnvelope,
deserialize,
decodeTypeByte,
BASE16,
BASE64,
} from "@walletconnect/utils";
import { toString } from "uint8arrays";

Expand Down Expand Up @@ -108,28 +112,37 @@ export class Crypto implements ICrypto {
this.isInitialized();
const params = validateEncoding(opts);
const message = safeJsonStringify(payload);

if (isTypeTwoEnvelope(params)) {
return encodeTypeTwoEnvelope(message, opts?.encoding);
}

if (isTypeOneEnvelope(params)) {
const selfPublicKey = params.senderPublicKey;
const peerPublicKey = params.receiverPublicKey;
topic = await this.generateSharedKey(selfPublicKey, peerPublicKey);
}
const symKey = this.getSymKey(topic);
const { type, senderPublicKey } = params;
const result = encrypt({ type, symKey, message, senderPublicKey });
const result = encrypt({ type, symKey, message, senderPublicKey, encoding: opts?.encoding });
return result;
};

public decode: ICrypto["decode"] = async (topic, encoded, opts) => {
this.isInitialized();
const params = validateDecoding(encoded, opts);
if (isTypeTwoEnvelope(params)) {
const message = decodeTypeTwoEnvelope(encoded, opts?.encoding);
return safeJsonParse(message);
}
if (isTypeOneEnvelope(params)) {
const selfPublicKey = params.receiverPublicKey;
const peerPublicKey = params.senderPublicKey;
topic = await this.generateSharedKey(selfPublicKey, peerPublicKey);
}
try {
const symKey = this.getSymKey(topic);
const message = decrypt({ symKey, encoded });
const message = decrypt({ symKey, encoded, encoding: opts?.encoding });
const payload = safeJsonParse(message);
return payload;
} catch (error) {
Expand All @@ -140,13 +153,16 @@ export class Crypto implements ICrypto {
}
};

public getPayloadType: ICrypto["getPayloadType"] = (encoded) => {
const deserialized = deserialize(encoded);
public getPayloadType: ICrypto["getPayloadType"] = (encoded, encoding = BASE64) => {
const deserialized = deserialize({ encoded, encoding });
return decodeTypeByte(deserialized.type);
};

public getPayloadSenderPublicKey: ICrypto["getPayloadSenderPublicKey"] = (encoded) => {
const deserialized = deserialize(encoded);
public getPayloadSenderPublicKey: ICrypto["getPayloadSenderPublicKey"] = (
encoded,
encoding = BASE64,
) => {
const deserialized = deserialize({ encoded, encoding });
return deserialized.senderPublicKey
? toString(deserialized.senderPublicKey, BASE16)
: undefined;
Expand Down
8 changes: 6 additions & 2 deletions packages/core/src/controllers/pairing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import {
PAIRING_EVENTS,
EVENT_CLIENT_PAIRING_TRACES,
EVENT_CLIENT_PAIRING_ERRORS,
TRANSPORT_TYPES,
} from "../constants";
import { Store } from "../controllers/store";

Expand Down Expand Up @@ -107,7 +108,7 @@ export class Pairing implements IPairing {
});
this.core.expirer.set(topic, expiry);
await this.pairings.set(topic, pairing);
await this.core.relayer.subscribe(topic);
await this.core.relayer.subscribe(topic, { transportType: params?.transportType });

return { topic, uri };
};
Expand Down Expand Up @@ -288,11 +289,14 @@ export class Pairing implements IPairing {

private registerRelayerEvents() {
this.core.relayer.on(RELAYER_EVENTS.message, async (event: RelayerTypes.MessageEvent) => {
const { topic, message } = event;
const { topic, message, transportType } = event;

// Do not handle if the topic is not related to known pairing topics.
if (!this.pairings.keys.includes(topic)) return;

// Do not handle link-mode messages
if (transportType === TRANSPORT_TYPES.link_mode) return;

// messages of certain types should be ignored as they are handled by their respective SDKs
if (this.ignoredPayloadTypes.includes(this.core.crypto.getPayloadType(message))) return;

Expand Down
55 changes: 45 additions & 10 deletions packages/core/src/controllers/relayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@ import {
Logger,
} from "@walletconnect/logger";
import { RelayJsonRpc } from "@walletconnect/relay-api";
import { ONE_MINUTE, ONE_SECOND, THIRTY_SECONDS, toMiliseconds } from "@walletconnect/time";
import {
FIVE_MINUTES,
ONE_MINUTE,
ONE_SECOND,
THIRTY_SECONDS,
toMiliseconds,
} from "@walletconnect/time";
import {
ICore,
IMessageTracker,
Expand All @@ -38,6 +44,7 @@ import {
getBundleId,
getInternalError,
isNode,
calcExpiry,
} from "@walletconnect/utils";

import {
Expand All @@ -50,7 +57,7 @@ import {
RELAYER_DEFAULT_RELAY_URL,
SUBSCRIBER_EVENTS,
RELAYER_RECONNECT_TIMEOUT,
RELAYER_TRANSPORT_CUTOFF,
TRANSPORT_TYPES,
} from "../constants";
import { MessageTracker } from "./messages";
import { Publisher } from "./publisher";
Expand Down Expand Up @@ -118,15 +125,15 @@ export class Relayer extends IRelayer {
this.logger.trace(`Initialized`);
this.registerEventListeners();
await Promise.all([this.messages.init(), this.subscriber.init()]);
await this.transportOpen();
this.initialized = true;
setTimeout(async () => {
if (this.subscriber.topics.length === 0 && this.subscriber.pending.size === 0) {
this.logger.info(`No topics subscribed to after init, closing transport`);
await this.transportClose();
this.transportExplicitlyClosed = false;
// @ts-expect-error - .cached is private
if (this.subscriber.cached.length > 0) {
try {
await this.transportOpen();
} catch (e) {
this.logger.warn(e);
}
}, RELAYER_TRANSPORT_CUTOFF);
}
}

get context() {
Expand All @@ -151,11 +158,15 @@ export class Relayer extends IRelayer {
message,
// We don't have `publishedAt` from the relay server on outgoing, so use current time to satisfy type.
publishedAt: Date.now(),
transportType: TRANSPORT_TYPES.relay,
});
}

public async subscribe(topic: string, opts?: RelayerTypes.SubscribeOptions) {
this.isInitialized();
if (opts?.transportType === "relay") {
await this.toEstablishConnection();
}
let id = this.subscriber.topicMap.get(topic)?.[0] || "";
let resolvePromise: () => void;
const onSubCreated = (subscription: SubscriberTypes.Active) => {
Expand Down Expand Up @@ -279,9 +290,11 @@ export class Relayer extends IRelayer {
this.relayUrl = relayUrl;
await this.transportDisconnect();
}

// Always create new socket instance when trying to connect because if the socket was dropped due to `socket hang up` exception
// It wont be able to reconnect
await this.createProvider();

this.connectionAttemptInProgress = true;
this.transportExplicitlyClosed = false;
try {
Expand Down Expand Up @@ -353,6 +366,22 @@ export class Relayer extends IRelayer {
this.logger.trace(`Batch of ${sortedMessages.length} message events processed`);
}

public async onLinkMessageEvent(
messageEvent: RelayerTypes.MessageEvent,
opts: { sessionExists: boolean },
) {
const { topic } = messageEvent;

if (!opts.sessionExists) {
const expiry = calcExpiry(FIVE_MINUTES);
const pairing = { topic, expiry, relay: { protocol: "irn" }, active: false };
await this.core.pairing.pairings.set(topic, pairing);
}

this.events.emit(RELAYER_EVENTS.message, messageEvent);
await this.recordMessageEvent(messageEvent);
}

// ---------- Private ----------------------------------------------- //
/*
* In Node, we must detect when the connection is stalled and terminate it.
Expand Down Expand Up @@ -454,7 +483,13 @@ export class Relayer extends IRelayer {
if (!payload.method.endsWith(RELAYER_SUBSCRIBER_SUFFIX)) return;
const event = (payload as JsonRpcRequest<RelayJsonRpc.SubscriptionParams>).params;
const { topic, message, publishedAt, attestation } = event.data;
const messageEvent: RelayerTypes.MessageEvent = { topic, message, publishedAt, attestation };
const messageEvent: RelayerTypes.MessageEvent = {
topic,
message,
publishedAt,
transportType: TRANSPORT_TYPES.relay,
attestation,
};
this.logger.debug(`Emitting Relayer Payload`);
this.logger.trace({ type: "event", event: event.id, ...messageEvent });
this.events.emit(event.id, messageEvent);
Expand Down
40 changes: 32 additions & 8 deletions packages/core/src/controllers/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { HEARTBEAT_EVENTS } from "@walletconnect/heartbeat";
import { ErrorResponse, RequestArguments } from "@walletconnect/jsonrpc-types";
import { generateChildLogger, getLoggerContext, Logger } from "@walletconnect/logger";
import { RelayJsonRpc } from "@walletconnect/relay-api";
import { ONE_MINUTE, Watch, toMiliseconds } from "@walletconnect/time";
import { ONE_SECOND, ONE_MINUTE, Watch, toMiliseconds } from "@walletconnect/time";
import {
IRelayer,
ISubscriber,
Expand All @@ -27,6 +27,7 @@ import {
SUBSCRIBER_STORAGE_VERSION,
PENDING_SUB_RESOLUTION_TIMEOUT,
RELAYER_EVENTS,
TRANSPORT_TYPES,
} from "../constants";
import { SubscriberTopicMap } from "./topicmap";

Expand Down Expand Up @@ -61,7 +62,9 @@ export class Subscriber extends ISubscriber {
this.logger.trace(`Initialized`);
this.registerEventListeners();
this.clientId = await this.relayer.core.crypto.getClientId();
await this.restore();
}
this.initialized = true;
};

get context() {
Expand Down Expand Up @@ -91,15 +94,14 @@ export class Subscriber extends ISubscriber {
}

public subscribe: ISubscriber["subscribe"] = async (topic, opts) => {
await this.restartToComplete();
this.isInitialized();
this.logger.debug(`Subscribing Topic`);
this.logger.trace({ type: "method", method: "subscribe", params: { topic, opts } });
try {
const relay = getRelayProtocolName(opts);
const params = { topic, relay };
const params = { topic, relay, transportType: opts?.transportType };
this.pending.set(topic, params);
const id = await this.rpcSubscribe(topic, relay);
const id = await this.rpcSubscribe(topic, relay, opts?.transportType);
if (typeof id === "string") {
this.onSubscribe(id, params);
this.logger.debug(`Successfully Subscribed Topic`);
Expand Down Expand Up @@ -217,7 +219,14 @@ export class Subscriber extends ISubscriber {
}
}

private async rpcSubscribe(topic: string, relay: RelayerTypes.ProtocolOptions) {
private async rpcSubscribe(
topic: string,
relay: RelayerTypes.ProtocolOptions,
transportType: RelayerTypes.TransportType = TRANSPORT_TYPES.relay,
) {
if (transportType === TRANSPORT_TYPES.relay) {
await this.restartToComplete();
}
const api = getRelayProtocolApi(relay.protocol);
const request: RequestArguments<RelayJsonRpc.SubscribeParams> = {
method: api.subscribe,
Expand All @@ -228,13 +237,25 @@ export class Subscriber extends ISubscriber {
this.logger.debug(`Outgoing Relay Payload`);
this.logger.trace({ type: "payload", direction: "outgoing", request });
try {
const subId = hashMessage(topic + this.clientId);
// in link mode, allow the app to update its network state (i.e. active airplane mode) with small delay before attempting to subscribe
if (transportType === TRANSPORT_TYPES.link_mode) {
setTimeout(() => {
if (this.relayer.connected || this.relayer.connecting) {
this.relayer.request(request).catch((e) => this.logger.warn(e));
}
}, toMiliseconds(ONE_SECOND));
return subId;
}

const subscribe = await createExpiringPromise(
this.relayer.request(request).catch((e) => this.logger.warn(e)),
this.subscribeTimeout,
);
const result = await subscribe;

// return null to indicate that the subscription failed
return result ? hashMessage(topic + this.clientId) : null;
return result ? subId : null;
} catch (err) {
this.logger.debug(`Outgoing Relay Subscribe Payload stalled`);
this.relayer.events.emit(RELAYER_EVENTS.connection_stalled);
Expand All @@ -245,7 +266,7 @@ export class Subscriber extends ISubscriber {
private async rpcBatchSubscribe(subscriptions: SubscriberTypes.Params[]) {
if (!subscriptions.length) return;
const relay = subscriptions[0].relay;
const api = getRelayProtocolApi(relay.protocol);
const api = getRelayProtocolApi(relay!.protocol);
const request: RequestArguments<RelayJsonRpc.BatchSubscribeParams> = {
method: api.batchSubscribe,
params: {
Expand All @@ -268,7 +289,7 @@ export class Subscriber extends ISubscriber {
private async rpcBatchFetchMessages(subscriptions: SubscriberTypes.Params[]) {
if (!subscriptions.length) return;
const relay = subscriptions[0].relay;
const api = getRelayProtocolApi(relay.protocol);
const api = getRelayProtocolApi(relay!.protocol);
const request: RequestArguments<RelayJsonRpc.BatchFetchMessagesParams> = {
method: api.batchFetchMessages,
params: {
Expand Down Expand Up @@ -488,6 +509,9 @@ export class Subscriber extends ISubscriber {
}

private async restartToComplete() {
if (!this.relayer.connected && !this.relayer.connecting) {
await this.relayer.transportOpen();
}
if (!this.restartInProgress) return;

await new Promise<void>((resolve) => {
Expand Down
Loading

0 comments on commit 3f74884

Please sign in to comment.