Skip to content

Commit

Permalink
feat(node): query peers using ildcp
Browse files Browse the repository at this point in the history
  • Loading branch information
justmoon committed Oct 9, 2024
1 parent 784edf7 commit f986378
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,24 @@ import type { Reactor } from "@dassie/lib-reactive"
import { UnreachableCaseError } from "@dassie/lib-type-utils"

import { IlpAllocationSchemeSignal } from "../../config/computed/ilp-allocation-scheme"
import { MajorityNodeListSignal } from "../../peer-protocol/computed/majority-node-list"
import { NodeIlpAddressSignal } from "../computed/node-ilp-address"
import type { IlpAddress } from "../types/ilp-address"
import type { EndpointInfo } from "./send-packet"

export function GetEndpointIlpAddress(reactor: Reactor) {
const ilpAllocationSchemeSignal = reactor.use(IlpAllocationSchemeSignal)
const nodeIlpAddressSignal = reactor.use(NodeIlpAddressSignal)
const majorityNodeListSignal = reactor.use(MajorityNodeListSignal)

return function getEndpointIlpAddress(
endpointInfo: EndpointInfo,
): IlpAddress {
switch (endpointInfo.type) {
case "peer": {
return `${ilpAllocationSchemeSignal.read()}.das.${endpointInfo.nodeId}`
return majorityNodeListSignal.read().has(endpointInfo.nodeId) ?
`${ilpAllocationSchemeSignal.read()}.das.${endpointInfo.nodeId}`
: `${nodeIlpAddressSignal.read()}.${endpointInfo.nodeId}`
}
case "ildcp": {
return ILDCP_ADDRESS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,19 @@ import {
} from "../topics/prepared-ilp-packet"
import { getUniqueEndpointId } from "../utils/get-unique-endpoint-id"
import { PendingPacketsMap } from "../values/pending-packets-map"
import { CalculatePreparePacketOutcome } from "./calculate-prepare-packet-outcome"
import {
CalculatePreparePacketOutcome,
type PreparePacketOutcome,
} from "./calculate-prepare-packet-outcome"
import { GetEndpointIlpAddress } from "./get-endpoint-ilp-address"
import type { ProcessIncomingPacketParameters } from "./process-packet"
import { ScheduleTimeout } from "./schedule-timeout"
import { TriggerEarlyRejection } from "./trigger-early-rejection"

interface AdditionalPreparePacketParameters {
predeterminedOutcome?: PreparePacketOutcome | undefined
}

export const ProcessPreparePacket = (reactor: DassieReactor) => {
const preparedIlpPacketTopic = reactor.use(PreparedIlpPacketTopic)
const pendingPacketsMap = reactor.use(PendingPacketsMap)
Expand All @@ -40,7 +47,9 @@ export const ProcessPreparePacket = (reactor: DassieReactor) => {
parsedPacket,
serializedPacket,
requestId,
}: ProcessIncomingPacketParameters<typeof IlpType.Prepare>) {
predeterminedOutcome,
}: ProcessIncomingPacketParameters<typeof IlpType.Prepare> &
AdditionalPreparePacketParameters) {
const outgoingRequestId = Math.trunc(Math.random() * 0xff_ff_ff_ff)

const pendingTransfers: Transfer[] = []
Expand All @@ -64,10 +73,12 @@ export const ProcessPreparePacket = (reactor: DassieReactor) => {
requestId: outgoingRequestId,
})

const packetOutcome = calculatePreparePacketOutcome({
sourceEndpointInfo,
parsedPacket,
})
const packetOutcome =
predeterminedOutcome ??
calculatePreparePacketOutcome({
sourceEndpointInfo,
parsedPacket,
})

if (isFailure(packetOutcome)) {
triggerEarlyRejection({
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { IlpErrorCode } from "@dassie/lib-protocol-ilp"
import { delayWithAbortSignal } from "@dassie/lib-reactive"
import { isError } from "@dassie/lib-type-utils"
import { isError, isFailure } from "@dassie/lib-type-utils"

import type { DassieReactor } from "../../base/types/dassie-base"
import { connector as logger } from "../../logger/instances"
Expand All @@ -22,7 +22,9 @@ export const ScheduleTimeout = (reactor: DassieReactor) => {
timeoutAbort,
}: ScheduleTimeoutParameters) {
delayWithAbortSignal(reactor.base.clock, 5000, timeoutAbort.signal)
.then(() => {
.then((value) => {
if (isFailure(value)) return

logger.debug?.("ILP packet timed out", { requestId })
triggerLateRejection({
sourceEndpointInfo,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { nanoid } from "nanoid"

import {
type IlpPreparePacket,
type IlpResponsePacket,
IlpType,
serializeIlpPacket,
} from "@dassie/lib-protocol-ilp"
import { createDeferred } from "@dassie/lib-reactive"

import type { AccountPath } from "../../accounting/types/account-paths"
import type { DassieReactor } from "../../base/types/dassie-base"
import { OutstandingRequestsStore } from "../../local-ilp/stores/outstanding-requests"
import type { NodeId } from "../../peer-protocol/types/node-id"
import type { LocalEndpointInfo } from "../senders/send-local-packets"
import type { PeerEndpointInfo } from "../senders/send-peer-packets"
import { ProcessPreparePacket } from "./process-prepare-packet"

interface SendLinkLocalPacketParameters {
packet: IlpPreparePacket
peerId: NodeId
}

export function SendLinkLocalPacket(reactor: DassieReactor) {
const processPreparePacket = reactor.use(ProcessPreparePacket)
const outstandingRequestsStore = reactor.use(OutstandingRequestsStore)
return async function sendLinkLocalPacket({
packet,
peerId,
}: SendLinkLocalPacketParameters): Promise<IlpResponsePacket> {
const requestId = nanoid()
const deferred = createDeferred<IlpResponsePacket>()

if (packet.amount !== 0n) {
throw new Error(
"This function only supports link-local packets with zero amount",
)
}

outstandingRequestsStore.act.addRequest(requestId, deferred.resolve)

const parsedPacket = {
type: IlpType.Prepare,
data: packet,
}

const sourceEndpointInfo: LocalEndpointInfo = {
type: "local",
hint: "Link Local Sender",
localIlpAddressPart: "link-local",
}

const destinationEndpointInfo: PeerEndpointInfo = {
type: "peer",
nodeId: peerId,
accountPath: "invalid" as AccountPath,
}

processPreparePacket({
sourceEndpointInfo,
parsedPacket,
serializedPacket: serializeIlpPacket(parsedPacket),
requestId,
predeterminedOutcome: {
destinationEndpointInfo,
outgoingAmount: 0n,
outgoingExpiry: packet.expiresAt,
transfers: [],
},
})

return deferred
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { Failure } from "@dassie/lib-type-utils"

export default class UplinkAddressQueryFailure extends Failure {
readonly name = "UplinkAddressQueryFailure"

constructor(public readonly message: string) {
super()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import {
ILDCP_ADDRESS,
ILDCP_CONDITION,
parseIldcpResponse,
} from "@dassie/lib-protocol-ildcp"
import { IlpType, timestampToInterledgerTime } from "@dassie/lib-protocol-ilp"
import { isFailure } from "@dassie/lib-type-utils"

import type { DassieReactor } from "../../base/types/dassie-base"
import { MAXIMUM_HOLD_TIME } from "../../ilp-connector/constants/expiry-constraints"
import { SendLinkLocalPacket } from "../../ilp-connector/functions/send-link-local-packet"
import type { DassieIlpAddress } from "../../ilp-connector/types/ilp-address"
import UplinkAddressQueryFailure from "../failures/uplink-address-query"
import type { NodeId } from "../types/node-id"

export function QueryUplinkAddress(reactor: DassieReactor) {
const sendLinkLocalPacket = reactor.use(SendLinkLocalPacket)

return async function queryUplinkAddress(peerId: NodeId) {
const ildcpRequestPacket = {
amount: 0n,
destination: ILDCP_ADDRESS,
executionCondition: ILDCP_CONDITION,
expiresAt: timestampToInterledgerTime(Date.now() + MAXIMUM_HOLD_TIME),
data: new Uint8Array(),
}

const result = await sendLinkLocalPacket({
packet: ildcpRequestPacket,
peerId,
})

if (result.type === IlpType.Reject) {
return new UplinkAddressQueryFailure(
"ILDCP Request Was Rejected: " + result.data.message,
)
}

const ildcpResponse = parseIldcpResponse(result.data.data)

if (isFailure(ildcpResponse)) {
return new UplinkAddressQueryFailure(
"Failed to parse ILDCP response: " + ildcpResponse.message,
)
}

return ildcpResponse.address as DassieIlpAddress
}
}
2 changes: 2 additions & 0 deletions packages/app-dassie/src/peer-protocol/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { MaintainOwnNodeTableEntryActor } from "./maintain-own-node-table-entry"
import { MaintainPeeringRelationshipsActor } from "./maintain-peering-relationships"
import { PersistNodeTableActor } from "./persist-node-table"
import { PollNodeListHashesActor } from "./poll-node-list-hashes"
import { ReceiveAddressFromUplinkActor } from "./receive-address-from-uplink"
import { RefreshNodeStateActor } from "./refresh-node-state"
import { RegisterPeerHttpHandlerActor } from "./register-peer-http-handler"
import { SendHeartbeatsActor } from "./send-heartbeats"
Expand All @@ -29,6 +30,7 @@ export const PeerProtocolActor = () =>
sig.run(AddMajorityNodesActor)
sig.run(BroadcastStateUpdatesActor)
await sig.run(RefreshNodeStateActor)
sig.runMap(ReceiveAddressFromUplinkActor)

sig.runMap(CreatePeerLedgerEntriesActor)
})
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { createActor, createMapped } from "@dassie/lib-reactive"
import { isFailure, tell } from "@dassie/lib-type-utils"

import type { DassieReactor } from "../base/types/dassie-base"
import { peerProtocol as logger } from "../logger/instances"
import { PeersSignal } from "./computed/peers"
import { QueryUplinkAddress } from "./functions/query-uplink-address"
import { UplinkAddressesStore } from "./stores/uplink-addresses"

export const ReceiveAddressFromUplinkActor = (reactor: DassieReactor) => {
const queryUplinkAddress = reactor.use(QueryUplinkAddress)
const uplinkAddressesStore = reactor.use(UplinkAddressesStore)
return createMapped(reactor, PeersSignal, (peerId) =>
createActor((sig) => {
tell(async () => {
const uplinkAddress = await queryUplinkAddress(peerId)

// Ignore result if the actor is already disposed
if (sig.isDisposed) return

if (isFailure(uplinkAddress)) {
logger.error("failed to get address from uplink")
uplinkAddressesStore.act.updateAddress(peerId, undefined)
return
}

uplinkAddressesStore.act.updateAddress(peerId, uplinkAddress)
})
}),
)
}
20 changes: 20 additions & 0 deletions packages/app-dassie/src/peer-protocol/stores/uplink-addresses.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { enableMapSet, produce } from "immer"

import { createStore } from "@dassie/lib-reactive"

import type { IlpAddress } from "../../ilp-connector/types/ilp-address"
import type { NodeId } from "../types/node-id"

enableMapSet()

export const UplinkAddressesStore = () =>
createStore(new Map<NodeId, IlpAddress>()).actions({
updateAddress: (peer: NodeId, newAddress: IlpAddress | undefined) =>
produce((draft) => {
if (!newAddress) {
draft.delete(peer)
return
}
draft.set(peer, newAddress)
}),
})

0 comments on commit f986378

Please sign in to comment.