Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow retransmission requests for singular segments #22

Merged
merged 8 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ coverage/
node_modules/

# Created by github pipeline
gha-creds-*.json
gha-creds-*.json

*.sqlite3
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Specify platform to be amd because arm doesn't work
FROM --platform=linux/amd64 node:18-alpine as builder
FROM --platform=linux/amd64 node:20-alpine as builder

Check warning on line 2 in Dockerfile

View workflow job for this annotation

GitHub Actions / Publish

The 'as' keyword should match the case of the 'from' keyword

FromAsCasing: 'as' and 'FROM' keywords' casing do not match More info: https://docs.docker.com/go/dockerfile/rule/from-as-casing/

Check warning on line 2 in Dockerfile

View workflow job for this annotation

GitHub Actions / Publish

FROM --platform flag should not use a constant value

FromPlatformFlagConstDisallowed: FROM --platform flag should not use constant value "linux/amd64" More info: https://docs.docker.com/go/dockerfile/rule/from-platform-flag-const-disallowed/

Check warning on line 2 in Dockerfile

View workflow job for this annotation

GitHub Actions / Merge PR

The 'as' keyword should match the case of the 'from' keyword

FromAsCasing: 'as' and 'FROM' keywords' casing do not match More info: https://docs.docker.com/go/dockerfile/rule/from-as-casing/

Check warning on line 2 in Dockerfile

View workflow job for this annotation

GitHub Actions / Merge PR

FROM --platform flag should not use a constant value

FromPlatformFlagConstDisallowed: FROM --platform flag should not use constant value "linux/amd64" More info: https://docs.docker.com/go/dockerfile/rule/from-platform-flag-const-disallowed/

RUN apk upgrade --no-cache

Expand All @@ -20,7 +20,7 @@
yarn install --production --frozen-lockfile

# Specify platform to be amd because arm doesn't work
FROM --platform=linux/amd64 alpine:3 as runner

Check warning on line 23 in Dockerfile

View workflow job for this annotation

GitHub Actions / Publish

The 'as' keyword should match the case of the 'from' keyword

FromAsCasing: 'as' and 'FROM' keywords' casing do not match More info: https://docs.docker.com/go/dockerfile/rule/from-as-casing/

Check warning on line 23 in Dockerfile

View workflow job for this annotation

GitHub Actions / Publish

FROM --platform flag should not use a constant value

FromPlatformFlagConstDisallowed: FROM --platform flag should not use constant value "linux/amd64" More info: https://docs.docker.com/go/dockerfile/rule/from-platform-flag-const-disallowed/

Check warning on line 23 in Dockerfile

View workflow job for this annotation

GitHub Actions / Merge PR

The 'as' keyword should match the case of the 'from' keyword

FromAsCasing: 'as' and 'FROM' keywords' casing do not match More info: https://docs.docker.com/go/dockerfile/rule/from-as-casing/

Check warning on line 23 in Dockerfile

View workflow job for this annotation

GitHub Actions / Merge PR

FROM --platform flag should not use a constant value

FromPlatformFlagConstDisallowed: FROM --platform flag should not use constant value "linux/amd64" More info: https://docs.docker.com/go/dockerfile/rule/from-platform-flag-const-disallowed/

ENV LOGGER=false

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"typescript": "^5.4.5"
},
"dependencies": {
"@hoprnet/uhttp-lib": "^3.3.0",
"@hoprnet/uhttp-lib": "^3.7.2",
"debug": "^4.3.4",
"sqlite3": "^5.1.7",
"ws": "^8.18.0"
Expand Down
25 changes: 25 additions & 0 deletions src/db.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import sqlite3 from 'sqlite3';

export type DB = sqlite3.Database;

export function setup(dbFile: string): Promise<DB> {
return new Promise((res, rej) => {
const db: sqlite3.Database = new sqlite3.Database(dbFile, (err) => {
if (err) {
return rej(`Error creating db: ${err}`);
}
return res(db);
});
});
}

export function close(db: DB): Promise<void> {
return new Promise((res, rej) => {
db.close((err) => {
if (err) {
return rej(`Error closing db: ${err}`);
}
return res();
});
});
}
181 changes: 131 additions & 50 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import {
Utils,
} from '@hoprnet/uhttp-lib';

import log from './logger';
import * as DB from './db';
import * as RequestStore from './request-store';
import * as ResponseSegmentStore from './response-segment-store';
import Version from './version';
import log from './logger';

// WebSocket heartbeats
const HeartBeatInterval = 30e3; // 30 sec
Expand All @@ -28,7 +30,9 @@ const SetupRelayPeriod = 1e3 * 60 * 15; // 15 min
// reconnect timeout for the websocket after closure
const SocketReconnectTimeout = 3e3; // 3 sec
// Time period in which counters for crypto are considered valid
const ValidCounterPeriod = 1e3 * 60 * 60; // 1hour
const ValidCounterPeriod = 1e3 * 60 * 60; // 1 hour
// Time period to keep segments for potential retransfer
const ValidResponseSegmentPeriod = 1e3 * 60 * 10; // 10 min

type State = {
socket?: WebSocket;
Expand All @@ -37,7 +41,7 @@ type State = {
peerId: string;
cache: SegmentCache.Cache;
deleteTimer: Map<string, ReturnType<typeof setTimeout>>; // deletion timer of requests in segment cache
requestStore: RequestStore.RequestStore;
db: DB.DB;
relays: string[];
heartbeatInterval?: ReturnType<typeof setInterval>;
};
Expand All @@ -54,6 +58,7 @@ type Ops = {
accessToken: string;
discoveryPlatform?: OpsDP;
dbFile: string;
pinnedFetch: typeof globalThis.fetch;
};

type Msg = {
Expand All @@ -66,7 +71,8 @@ async function start(ops: Ops) {
try {
const state = await setup(ops);
setupSocket(state, ops);
removeExpired(state);
removeExpiredRequests(state);
removeExpiredSegmentResponses(state);
setupRelays(state, ops);
} catch (err) {
log.error(
Expand All @@ -79,13 +85,16 @@ async function start(ops: Ops) {
}

async function setup(ops: Ops): Promise<State> {
const requestStore = await RequestStore.setup(ops.dbFile).catch((err) => {
log.error('error setting up request store: %o', err);
const db = await DB.setup(ops.dbFile).catch((err) => {
log.error('error setting up database: %o', err);
});
if (!requestStore) {
throw new Error('No request store');
if (!db) {
throw new Error('No database');
Comment on lines +88 to +92
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve Error Handling When Setting Up the Database

Currently, if DB.setup(ops.dbFile) fails, the error is logged but not re-thrown, and the code proceeds to check if db is undefined to throw a generic error. This can mask the original error and make debugging more difficult. It's better to re-throw the original error or include its message when throwing the new error to provide clearer error information.

Here's a suggested fix:

async function setup(ops: Ops): Promise<State> {
    const db = await DB.setup(ops.dbFile).catch((err) => {
        log.error('error setting up database: %o', err);
+       throw err;
    });
-   if (!db) {
-       throw new Error('No database');
-   }
    await RequestStore.setup(db);
    await ResponseSegmentStore.setup(db);
    log.verbose('set up DB at', ops.dbFile);
    // ...
}

Committable suggestion was skipped due to low confidence.

}

await RequestStore.setup(db);
await ResponseSegmentStore.setup(db);

log.verbose('set up DB at', ops.dbFile);

const resPeerId = await NodeApi.accountAddresses(ops).catch((err: Error) => {
Expand All @@ -96,7 +105,8 @@ async function setup(ops: Ops): Promise<State> {
}

const { hopr: peerId } = resPeerId;
const cache = SegmentCache.init();
// we don't care for missing segments reminder in our cache
const cache = SegmentCache.init(function () {});
const deleteTimer = new Map();

const logOpts: Record<string, string> = {
Expand All @@ -114,7 +124,7 @@ async function setup(ops: Ops): Promise<State> {
privateKey: Utils.hexStringToBytes(ops.privateKey),
publicKey: Utils.hexStringToBytes(ops.publicKey),
peerId,
requestStore,
db,
relays: [],
};
}
Expand Down Expand Up @@ -156,27 +166,48 @@ function setupSocket(state: State, ops: Ops) {
state.socket = socket;
}

function removeExpired(state: State) {
RequestStore.removeExpired(state.requestStore, ValidCounterPeriod)
function removeExpiredSegmentResponses(state: State) {
ResponseSegmentStore.removeExpired(state.db, Date.now() - ValidResponseSegmentPeriod)
.then(() => {
log.info('successfully removed expired response segments from db');
})
.catch((err) => {
log.error('error during removeExpiredSegmentResponses: %o', err);
})
.finally(() => {
scheduleRemoveExpiredSegmentResponses(state);
});
}

function removeExpiredRequests(state: State) {
RequestStore.removeExpired(state.db, Date.now() - ValidCounterPeriod)
.then(() => {
log.info('successfully removed expired requests from store');
log.info('successfully removed expired requests from db');
})
.catch((err) => {
log.error('error during removeExpired: %o', err);
log.error('error during removeExpiredRequests: %o', err);
})
.finally(() => {
scheduleRemoveExpired(state);
scheduleRemoveExpiredRequests(state);
});
}

function scheduleRemoveExpired(state: State) {
function scheduleRemoveExpiredSegmentResponses(state: State) {
const logM = ValidResponseSegmentPeriod / 1000 / 60;
log.info('scheduling next remove expired response segments in %dm', logM);
setTimeout(() => {
removeExpiredSegmentResponses(state);
}, ValidResponseSegmentPeriod);
}

function scheduleRemoveExpiredRequests(state: State) {
// schedule next run somehwere between 1h and 1h and 10m
const next = ValidCounterPeriod + Math.floor(Math.random() * 10 * 60e3);
const logH = Math.floor(next / 1000 / 60 / 60);
const logM = Math.round(next / 1000 / 60) - logH * 60;

log.info('scheduling next remove expired requests in %dh%dm', logH, logM);
setTimeout(() => removeExpired(state), next);
setTimeout(() => removeExpiredRequests(state), next);
}

async function setupRelays(state: State, ops: Ops) {
Expand Down Expand Up @@ -243,6 +274,11 @@ function onMessage(state: State, ops: Ops) {
return onInfoReq(state, ops, msg);
}

// determine if segment retransfer
if (msg.body.startsWith('resg;')) {
return onRetransferSegmentsReq(state, ops, msg);
}

// determine if valid segment
const segRes = Segment.fromMessage(msg.body);
if (Res.isErr(segRes)) {
Expand Down Expand Up @@ -306,7 +342,7 @@ function onPingReq(state: State, ops: Ops, msg: Msg) {

function onInfoReq(state: State, ops: Ops, msg: Msg) {
log.info('received info req:', msg.body);
// info-originPeerId-hops
// info-originPeerId-hops-manualRelay
const [, recipient, hopsStr, reqRel] = msg.body.split('-');
const hops = parseInt(hopsStr, 10);
const conn = { ...ops, hops };
Expand All @@ -333,6 +369,40 @@ function onInfoReq(state: State, ops: Ops, msg: Msg) {
});
}

function onRetransferSegmentsReq(state: State, ops: Ops, msg: Msg) {
log.info('received retransfer segments req:', msg.body);
// resg;originPeerId;hops;requestId;segmentNrs
const [, recipient, hopsStr, requestId, rawSegNrs] = msg.body.split(';');
const hops = parseInt(hopsStr, 10);
const conn = { ...ops, hops };
const segNrs = rawSegNrs.split(',').map((s) => parseInt(s));

Comment on lines +374 to +379
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add input validation when parsing message in onRetransferSegmentsReq

The code assumes that msg.body.split(';') will always return an array with at least five elements. If the message is malformed or doesn't conform to the expected format, accessing array elements without checking could result in undefined values and potential runtime errors. It's advisable to add input validation to ensure that all expected fields are present and valid before using them.

Apply this diff to add input validation:

 function onRetransferSegmentsReq(state: State, ops: Ops, msg: Msg) {
     log.info('received retransfer segments req:', msg.body);
-    // resg;originPeerId;hops;requestId;segmentNrs
-    const [, recipient, hopsStr, requestId, rawSegNrs] = msg.body.split(';');
+    // Expected format: resg;originPeerId;hops;requestId;segmentNrs
+    const parts = msg.body.split(';');
+    if (parts.length < 5) {
+        log.error('Malformed retransfer segments request message');
+        return;
+    }
+    const [, recipient, hopsStr, requestId, rawSegNrs] = parts;
     const hops = parseInt(hopsStr, 10);
     const conn = { ...ops, hops };
     const segNrs = rawSegNrs.split(',').map((s) => parseInt(s));
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// resg;originPeerId;hops;requestId;segmentNrs
const [, recipient, hopsStr, requestId, rawSegNrs] = msg.body.split(';');
const hops = parseInt(hopsStr, 10);
const conn = { ...ops, hops };
const segNrs = rawSegNrs.split(',').map((s) => parseInt(s));
// Expected format: resg;originPeerId;hops;requestId;segmentNrs
const parts = msg.body.split(';');
if (parts.length < 5) {
log.error('Malformed retransfer segments request message');
return;
}
const [, recipient, hopsStr, requestId, rawSegNrs] = parts;
const hops = parseInt(hopsStr, 10);
const conn = { ...ops, hops };
const segNrs = rawSegNrs.split(',').map((s) => parseInt(s));

Comment on lines +375 to +379
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Validate Message Structure and Inputs in onRetransferSegmentsReq

The function onRetransferSegmentsReq splits msg.body without validating its structure. If msg.body doesn't have the expected format, accessing array elements may result in undefined, leading to runtime errors. Similarly, parsing hopsStr and segment numbers without validation may result in NaN or unexpected behavior.

Consider adding validation to ensure that msg.body splits into the expected number of parts and that parsed values are valid:

function onRetransferSegmentsReq(state: State, ops: Ops, msg: Msg) {
    log.info('received retransfer segments req:', msg.body);
    // resg;originPeerId;hops;requestId;segmentNrs
    const parts = msg.body.split(';');
+   if (parts.length !== 5) {
+       log.error('Invalid resg message format: %s', msg.body);
+       return;
+   }
    const [, recipient, hopsStr, requestId, rawSegNrs] = parts;
    const hops = parseInt(hopsStr, 10);
+   if (isNaN(hops)) {
+       log.error('Invalid hops value in resg message: %s', hopsStr);
+       return;
+   }
    const conn = { ...ops, hops };
    const segNrs = rawSegNrs.split(',').map((s) => parseInt(s));
+   if (segNrs.some(isNaN)) {
+       log.error('Invalid segment numbers in resg message: %s', rawSegNrs);
+       return;
+   }
    // ...
}

Committable suggestion was skipped due to low confidence.

log.verbose('reading segment nrs for %s from db: %o', requestId, segNrs);
ResponseSegmentStore.all(state.db, requestId, segNrs)
.then((segments) => {
segments.forEach((seg, idx) => {
setTimeout(() => {
const segLog = Segment.prettyPrint(seg);
log.verbose('retransferring %s to %s', segLog, recipient);
NodeApi.sendMessage(conn, {
recipient,
tag: msg.tag,
message: Segment.toMessage(seg),
})
.then((r) => {
log.verbose('retransfer response %s: %o', segLog, r);
})
.catch((err: Error) => {
log.error('error retransferring %s: %o', Segment.prettyPrint(seg), err);
});
}, idx * 10);
});
})
.catch((err) => {
log.error('error reading response segments: %o', err);
});
}

async function completeSegmentsEntry(
state: State,
ops: Ops,
Expand Down Expand Up @@ -382,7 +452,7 @@ async function completeSegmentsEntry(
}

// check uuid
const res = await RequestStore.addIfAbsent(state.requestStore, requestId, counter);
const res = await RequestStore.addIfAbsent(state.db, requestId, counter);
if (res === RequestStore.AddRes.Duplicate) {
log.info('duplicate request id:', requestId);
// duplicate fail resp
Expand All @@ -391,22 +461,19 @@ async function completeSegmentsEntry(

// do actual endpoint request
const fetchStartedAt = performance.now();
const resFetch = await EndpointApi.fetchUrl(reqPayload.endpoint, reqPayload).catch(
(err: Error) => {
log.error(
'error doing RPC req on %s with %o: %o',
reqPayload.endpoint,
reqPayload,
err,
);
// HTTP critical fail response
const resp: Payload.RespPayload = {
type: Payload.RespType.Error,
reason: err.toString(),
};
return sendResponse(sendParams, resp);
},
);
const resFetch = await EndpointApi.fetchUrl(
ops.pinnedFetch,
reqPayload.endpoint,
reqPayload,
).catch((err: Error) => {
log.error('error doing RPC req on %s with %o: %o', reqPayload.endpoint, reqPayload, err);
// HTTP critical fail response
const resp: Payload.RespPayload = {
type: Payload.RespType.Error,
reason: err.toString(),
};
return sendResponse(sendParams, resp);
});
if (!resFetch) {
return;
}
Expand Down Expand Up @@ -511,36 +578,47 @@ function sendResponse(
};

// queue segment sending for all of them
segments.forEach((seg: Segment.Segment) => {
NodeApi.sendMessage(conn, {
recipient: entryPeerId,
tag,
message: Segment.toMessage(seg),
}).catch((err: Error) => {
log.error('error sending %s: %o', Segment.prettyPrint(seg), err);
// remove relay if it fails
state.relays = state.relays.filter((r) => r !== relay);
});
const ts = Date.now();
segments.forEach((seg: Segment.Segment, idx) => {
const segLog = Segment.prettyPrint(seg);
log.verbose('putting %s into db at %d ', segLog, ts);
ResponseSegmentStore.put(state.db, seg, ts);
setTimeout(() => {
log.verbose('sending %s to %s', segLog, entryPeerId);
NodeApi.sendMessage(conn, {
recipient: entryPeerId,
tag,
message: Segment.toMessage(seg),
})
.then((r) => {
log.verbose('response %s: %o', segLog, r);
})
.catch((err: Error) => {
log.error('error sending %s: %o', Segment.prettyPrint(seg), err);
// remove relay if it fails
state.relays = state.relays.filter((r) => r !== relay);
});
Comment on lines +597 to +600
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding retry logic before removing a relay from the list

Currently, any error encountered while sending a segment results in the relay being immediately removed from state.relays. This might be too aggressive, as transient network issues could cause relays to be removed unnecessarily. Consider implementing a retry mechanism or tracking error counts to remove a relay only after multiple consecutive failures.

Comment on lines +597 to +600
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle Potential Undefined 'relay' When Filtering Relays

In the error handling of sendResponse, if sending a message fails, the code attempts to remove the relay from state.relays. However, since relay may be undefined, this could inadvertently modify state.relays incorrectly.

It's advisable to check if relay is defined before attempting to remove it:

.catch((err: Error) => {
    log.error('error sending %s: %o', Segment.prettyPrint(seg), err);
    // remove relay if it fails
+   if (relay) {
        state.relays = state.relays.filter((r) => r !== relay);
+   }
});
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
log.error('error sending %s: %o', Segment.prettyPrint(seg), err);
// remove relay if it fails
state.relays = state.relays.filter((r) => r !== relay);
});
log.error('error sending %s: %o', Segment.prettyPrint(seg), err);
// remove relay if it fails
if (relay) {
state.relays = state.relays.filter((r) => r !== relay);
}
});

}, idx * 10);
});

if (ops.discoveryPlatform) {
reportToDiscoveryPlatform({
ops,
cacheEntry,
opsDP: ops.discoveryPlatform,
reqPayload,
segments,
});
}
}

async function reportToDiscoveryPlatform({
ops,
cacheEntry,
opsDP,
reqPayload,
segments,
}: {
cacheEntry: SegmentCache.Entry;
opsDP: OpsDP;
ops: Ops;
reqPayload: Payload.ReqPayload;
segments: Segment.Segment[];
}) {
Expand Down Expand Up @@ -570,9 +648,11 @@ async function reportToDiscoveryPlatform({
type: 'response',
};

const dp = ops.discoveryPlatform as OpsDP;
const conn = {
discoveryPlatformEndpoint: opsDP.endpoint,
nodeAccessToken: opsDP.nodeAccessToken,
discoveryPlatformEndpoint: dp.endpoint,
nodeAccessToken: dp.nodeAccessToken,
pinnedFetch: ops.pinnedFetch,
};
DpApi.postQuota(conn, quotaRequest).catch((err) => {
log.error('error recording request quota: %o', err);
Expand Down Expand Up @@ -644,5 +724,6 @@ if (require.main === module) {
accessToken: process.env.UHTTP_EA_HOPRD_ACCESS_TOKEN,
discoveryPlatform,
dbFile: process.env.UHTTP_EA_DATABASE_FILE,
pinnedFetch: globalThis.fetch.bind(globalThis),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure Compatibility of 'globalThis.fetch' Across Node.js Versions

The use of globalThis.fetch may not be supported in Node.js versions prior to 18. If this code is intended to run in environments with older Node.js versions, globalThis.fetch will be undefined, leading to runtime errors.

Consider adding a polyfill or using a library like node-fetch to ensure fetch is available:

+import fetch from 'node-fetch';

start({
    privateKey: process.env.UHTTP_EA_PRIVATE_KEY,
    publicKey: process.env.UHTTP_EA_PUBLIC_KEY,
    // ...
-   pinnedFetch: globalThis.fetch.bind(globalThis),
+   pinnedFetch: fetch,
});

Alternatively, ensure that the runtime environment is Node.js v18 or later where globalThis.fetch is available.

Committable suggestion was skipped due to low confidence.

});
}
Loading
Loading