From fad77d586d21031ed9d986830e594d31e0f2434a Mon Sep 17 00:00:00 2001 From: Ronny Esterluss Date: Tue, 15 Oct 2024 12:53:44 +0200 Subject: [PATCH 1/8] initial sqlite tables --- src/db.ts | 25 ++++++++++++++ src/request-store.ts | 31 +++-------------- src/response-store.ts | 77 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 107 insertions(+), 26 deletions(-) create mode 100644 src/db.ts create mode 100644 src/response-store.ts diff --git a/src/db.ts b/src/db.ts new file mode 100644 index 0000000..7f2d19d --- /dev/null +++ b/src/db.ts @@ -0,0 +1,25 @@ +import sqlite3 from 'sqlite3'; + +export type DB = sqlite3.Database; + +export function setup(dbFile: string): Promise { + return new Promise((res, rej) => { + const db: sqlite3.Database = new sqlite3.Database(dbFile, (err) => { + if (err) { + return rej(`Error creating db: ${err}`); + } + return res(db); + }); + }); +} + +export function close(db: DB): Promise { + return new Promise((res, rej) => { + db.close((err) => { + if (err) { + return rej(`Error closing db: ${err}`); + } + return res(); + }); + }); +} diff --git a/src/request-store.ts b/src/request-store.ts index 8138224..9fed044 100644 --- a/src/request-store.ts +++ b/src/request-store.ts @@ -1,22 +1,12 @@ -import sqlite3 from 'sqlite3'; - -export type RequestStore = { - db: sqlite3.Database; -}; +import type { DB } from './db'; export enum AddRes { Success, Duplicate, } -export function setup(dbFile: string): Promise { +export function setup(db: DB): Promise { return new Promise((res, rej) => { - const db = new sqlite3.Database(dbFile, (err) => { - if (err) { - return rej(`Error creating db: ${err}`); - } - }); - db.serialize(() => { db.run( 'CREATE TABLE IF NOT EXISTS request_store (uuid TEXT PRIMARY KEY, counter INTEGER)', @@ -33,14 +23,14 @@ export function setup(dbFile: string): Promise { if (err) { return rej(`Error creating index request_store_counter_index: ${err}`); } - return res({ db }); + return res(); }, ); }); }); } -export function addIfAbsent({ db }: RequestStore, id: string, counter: number): Promise { +export function addIfAbsent(db: DB, id: string, counter: number): Promise { return new Promise((res, rej) => { db.run( 'INSERT INTO request_store (uuid, counter) VALUES ($id, $counter);', @@ -63,7 +53,7 @@ export function addIfAbsent({ db }: RequestStore, id: string, counter: number): }); } -export function removeExpired({ db }: RequestStore, olderThan: number): Promise { +export function removeExpired(db: DB, olderThan: number): Promise { return new Promise((res, rej) => { db.run( 'DELETE FROM request_store where counter < $counter;', @@ -77,14 +67,3 @@ export function removeExpired({ db }: RequestStore, olderThan: number): Promise< ); }); } - -export function close({ db }: RequestStore): Promise { - return new Promise((res, rej) => { - db.close((err) => { - if (err) { - return rej(`Error closing db: ${err}`); - } - return res(); - }); - }); -} diff --git a/src/response-store.ts b/src/response-store.ts new file mode 100644 index 0000000..cbaf67e --- /dev/null +++ b/src/response-store.ts @@ -0,0 +1,77 @@ +import type { DB } from './db'; + +export function setup(db: DB) { + return new Promise((res, rej) => { + // requestId - uuid + // segment nr - integer, indexed + // segment body - blob, content + // inserted_at - integer, timestamp, indexed + db.serialize(() => { + db.run( + 'CREATE TABLE IF NOT EXISTS response_store (uuid TEXT PRIMARY KEY, nr INTEGER, body BLOB, inserted_at INTEGER)', + (err) => { + if (err) { + return rej(`Error creating table response_store: ${err}`); + } + }, + ); + + db.run( + 'CREATE INDEX IF NOT EXISTS response_store_inserted_at_index ON response_store (inserted_at)', + (err) => { + if (err) { + return rej(`Error creating index response_store_inserted_at_index: ${err}`); + } + }, + ); + + db.run( + 'CREATE INDEX IF NOT EXISTS response_store_nr_index ON response_store (nr)', + (err) => { + if (err) { + return rej(`Error creating index response_store_nr_index: ${err}`); + } + return res({ db }); + }, + ); + }); + }); +} + +export function put(db: DB, requestId: string, segment: segment): Promise { + return new Promise((res, rej) => { + db.run( + 'INSERT INTO request_store (uuid, counter) VALUES ($id, $counter);', + { + $id: id, + $counter: counter, + }, + (err) => { + if (err) { + const errStr = String(err).toLowerCase(); + const cts = ['sqlite_constraint', 'unique', 'request_store.uuid']; + if (cts.every((c) => errStr.includes(c))) { + return res(AddRes.Duplicate); + } + return rej(`Error inserting into request_store: ${err}`); + } + return res(AddRes.Success); + }, + ); + }); +} + +export function removeExpired(db: DB, olderThan: number): Promise { + return new Promise((res, rej) => { + db.run( + 'DELETE FROM response_store where inserted_at < $counter;', + { $counter: olderThan }, + (err) => { + if (err) { + return rej(`Error deleting from response_store: ${err}`); + } + return res(); + }, + ); + }); +} From 285b5d0383d18ed212d7f6e662f200819337e240 Mon Sep 17 00:00:00 2001 From: Ronny Esterluss Date: Tue, 15 Oct 2024 14:02:25 +0200 Subject: [PATCH 2/8] exit app can now retransfer requested segmetns --- src/index.ts | 117 ++++++++++++++++++++++++++-------- src/response-segment-store.ts | 115 +++++++++++++++++++++++++++++++++ src/response-store.ts | 77 ---------------------- 3 files changed, 205 insertions(+), 104 deletions(-) create mode 100644 src/response-segment-store.ts delete mode 100644 src/response-store.ts diff --git a/src/index.ts b/src/index.ts index 98bc03c..498c0c5 100644 --- a/src/index.ts +++ b/src/index.ts @@ -13,9 +13,11 @@ import { Utils, } from '@hoprnet/uhttp-lib'; -import log from './logger'; +import * as DB from './db'; import * as RequestStore from './request-store'; +import * as ResponseSegmentStore from './response-segment-store'; import Version from './version'; +import log from './logger'; // WebSocket heartbeats const HeartBeatInterval = 30e3; // 30 sec @@ -28,7 +30,9 @@ const SetupRelayPeriod = 1e3 * 60 * 15; // 15 min // reconnect timeout for the websocket after closure const SocketReconnectTimeout = 3e3; // 3 sec // Time period in which counters for crypto are considered valid -const ValidCounterPeriod = 1e3 * 60 * 60; // 1hour +const ValidCounterPeriod = 1e3 * 60 * 60; // 1 hour +// Time period to keep segments for potential retransfer +const ValidResponseSegmentPeriod = 1e3 * 60 * 10; // 10 min type State = { socket?: WebSocket; @@ -37,7 +41,7 @@ type State = { peerId: string; cache: SegmentCache.Cache; deleteTimer: Map>; // deletion timer of requests in segment cache - requestStore: RequestStore.RequestStore; + db: DB.DB; relays: string[]; heartbeatInterval?: ReturnType; }; @@ -66,7 +70,8 @@ async function start(ops: Ops) { try { const state = await setup(ops); setupSocket(state, ops); - removeExpired(state); + removeExpiredRequests(state); + removeExpiredSegmentResponses(state); setupRelays(state, ops); } catch (err) { log.error( @@ -79,13 +84,16 @@ async function start(ops: Ops) { } async function setup(ops: Ops): Promise { - const requestStore = await RequestStore.setup(ops.dbFile).catch((err) => { - log.error('error setting up request store: %o', err); + const db = await DB.setup(ops.dbFile).catch((err) => { + log.error('error setting up database: %o', err); }); - if (!requestStore) { - throw new Error('No request store'); + if (!db) { + throw new Error('No database'); } + await RequestStore.setup(db); + await ResponseSegmentStore.setup(db); + log.verbose('set up DB at', ops.dbFile); const resPeerId = await NodeApi.accountAddresses(ops).catch((err: Error) => { @@ -114,7 +122,7 @@ async function setup(ops: Ops): Promise { privateKey: Utils.hexStringToBytes(ops.privateKey), publicKey: Utils.hexStringToBytes(ops.publicKey), peerId, - requestStore, + db, relays: [], }; } @@ -156,27 +164,46 @@ function setupSocket(state: State, ops: Ops) { state.socket = socket; } -function removeExpired(state: State) { - RequestStore.removeExpired(state.requestStore, ValidCounterPeriod) +function removeExpiredSegmentResponses(state: State) { + ResponseSegmentStore.removeExpired(state.db, Date.now() - ValidResponseSegmentPeriod) .then(() => { - log.info('successfully removed expired requests from store'); + log.info('successfully removed expired response segments from db'); }) .catch((err) => { - log.error('error during removeExpired: %o', err); + log.error('error during removeExpiredSegmentResponses: %o', err); }) .finally(() => { - scheduleRemoveExpired(state); + scheduleRemoveExpiredSegmentResponses(state); }); } -function scheduleRemoveExpired(state: State) { +function removeExpiredRequests(state: State) { + RequestStore.removeExpired(state.db, Date.now() - ValidCounterPeriod) + .then(() => { + log.info('successfully removed expired requests from db'); + }) + .catch((err) => { + log.error('error during removeExpiredRequests: %o', err); + }) + .finally(() => { + scheduleRemoveExpiredRequests(state); + }); +} + +function scheduleRemoveExpiredSegmentResponses(state: State) { + setTimeout(function () { + removeExpiredSegmentResponses(state); + }, ValidResponseSegmentPeriod); +} + +function scheduleRemoveExpiredRequests(state: State) { // schedule next run somehwere between 1h and 1h and 10m const next = ValidCounterPeriod + Math.floor(Math.random() * 10 * 60e3); const logH = Math.floor(next / 1000 / 60 / 60); const logM = Math.round(next / 1000 / 60) - logH * 60; log.info('scheduling next remove expired requests in %dh%dm', logH, logM); - setTimeout(() => removeExpired(state), next); + setTimeout(() => removeExpiredRequests(state), next); } async function setupRelays(state: State, ops: Ops) { @@ -243,6 +270,11 @@ function onMessage(state: State, ops: Ops) { return onInfoReq(state, ops, msg); } + // determine if segment retransfer + if (msg.body.startsWith('resg-')) { + return onRetransferSegmentsReq(state, ops, msg); + } + // determine if valid segment const segRes = Segment.fromMessage(msg.body); if (Res.isErr(segRes)) { @@ -306,7 +338,7 @@ function onPingReq(state: State, ops: Ops, msg: Msg) { function onInfoReq(state: State, ops: Ops, msg: Msg) { log.info('received info req:', msg.body); - // info-originPeerId-hops + // info-originPeerId-hops-manualRelay const [, recipient, hopsStr, reqRel] = msg.body.split('-'); const hops = parseInt(hopsStr, 10); const conn = { ...ops, hops }; @@ -333,6 +365,33 @@ function onInfoReq(state: State, ops: Ops, msg: Msg) { }); } +function onRetransferSegmentsReq(state: State, ops: Ops, msg: Msg) { + log.info('received retransfer segments req:', msg.body); + // resg-originPeerId-hops-requestId-segmentNrs + const [, recipient, hopsStr, requestId, rawSegNrs] = msg.body.split('-'); + const hops = parseInt(hopsStr, 10); + const conn = { ...ops, hops }; + const segNrs = rawSegNrs.split(',').map(parseInt); + + ResponseSegmentStore.all(state.db, requestId, segNrs) + .then((segments) => { + segments.map((seg, idx) => { + setTimeout(() => { + NodeApi.sendMessage(conn, { + recipient, + tag: msg.tag, + message: Segment.toMessage(seg), + }).catch((err: Error) => { + log.error('error retransferring %s: %o', Segment.prettyPrint(seg), err); + }); + }, idx); + }); + }) + .catch((err) => { + log.error('error reading response segments: %o', err); + }); +} + async function completeSegmentsEntry( state: State, ops: Ops, @@ -511,16 +570,20 @@ function sendResponse( }; // queue segment sending for all of them - segments.forEach((seg: Segment.Segment) => { - NodeApi.sendMessage(conn, { - recipient: entryPeerId, - tag, - message: Segment.toMessage(seg), - }).catch((err: Error) => { - log.error('error sending %s: %o', Segment.prettyPrint(seg), err); - // remove relay if it fails - state.relays = state.relays.filter((r) => r !== relay); - }); + const ts = Date.now(); + segments.forEach((seg: Segment.Segment, idx) => { + setTimeout(function () { + ResponseSegmentStore.put(state.db, seg, ts); + NodeApi.sendMessage(conn, { + recipient: entryPeerId, + tag, + message: Segment.toMessage(seg), + }).catch((err: Error) => { + log.error('error sending %s: %o', Segment.prettyPrint(seg), err); + // remove relay if it fails + state.relays = state.relays.filter((r) => r !== relay); + }); + }, idx); }); if (ops.discoveryPlatform) { diff --git a/src/response-segment-store.ts b/src/response-segment-store.ts new file mode 100644 index 0000000..00922de --- /dev/null +++ b/src/response-segment-store.ts @@ -0,0 +1,115 @@ +import { Segment } from '@hoprnet/uhttp-lib'; + +import type { DB } from './db'; + +export function setup(db: DB) { + return new Promise((res, rej) => { + // requestId - uuid + // segment nr - integer, indexed + // segment body - blob, content + // inserted_at - integer, timestamp, indexed + db.serialize(() => { + db.run( + [ + 'CREATE TABLE IF NOT EXISTS response_segment_store', + '(request_id TEXT NOT NULL,', + 'nr INTEGER NOT NULL,', + 'total_count INTEGER NOT NULL,', + 'body BLOB,', + 'inserted_at INTEGER,', + 'PRIMARY KEY (request_id, nr))', + ].join(' '), + (err) => { + if (err) { + return rej(`Error creating table response_segment_store: ${err}`); + } + }, + ); + + db.run( + 'CREATE INDEX IF NOT EXISTS response_store_inserted_at_index ON response_segment_store (inserted_at)', + (err) => { + if (err) { + return rej(`Error creating index response_store_inserted_at_index: ${err}`); + } + }, + ); + + db.run( + 'CREATE INDEX IF NOT EXISTS response_store_nr_index ON response_segment_store (nr)', + (err) => { + if (err) { + return rej(`Error creating index response_store_nr_index: ${err}`); + } + return res({ db }); + }, + ); + }); + }); +} + +export function put(db: DB, segment: Segment.Segment, insertedAt: number): Promise { + return new Promise((res, rej) => { + db.run( + [ + 'INSERT INTO response_segment_store (request_id, nr, total_count, body, inserted_at)', + 'VALUES ($requestId, $nr, $totalCount, $body, $insertedAt);', + ].join(' '), + { + $requestId: segment.requestId, + $nr: segment.nr, + $totalCount: segment.totalCount, + $body: segment.body, + $insertedAt: insertedAt, + }, + (err) => { + if (err) { + return rej(`Error inserting segment into response_segment_store: ${err}`); + } + return res(); + }, + ); + }); +} + +export function all(db: DB, requestId: string, segmentNrs: number[]): Promise { + return new Promise((res, rej) => { + db.all( + [ + 'SELECT nr, body, total_count FROM response_segment_store', + 'WHERE request_id = $requestId and nr in $nrs', + ].join(' '), + { $requestId: requestId, $nrs: segmentNrs }, + function (err, rows) { + if (err) { + return rej(`Error selecting segment from response_segment_store: ${err}`); + } + const segments = rows.map((rawRow) => { + const row = rawRow as { nr: number; total_count: number; body: string }; + return { + requestId, + nr: row.nr, + totalCount: row.total_count, + body: row.body, + }; + }); + return res(segments); + }, + ); + }); +} + +export function removeExpired(db: DB, olderThan: number): Promise { + return new Promise((res, rej) => { + db.run( + 'DELETE FROM response_segment_store where inserted_at < $olderThan;', + { $olderThan: olderThan }, + (err) => { + if (err) { + return rej(`Error deleting from response_segment_store: ${err}`); + } + return res(); + }, + ); + }); +} diff --git a/src/response-store.ts b/src/response-store.ts deleted file mode 100644 index cbaf67e..0000000 --- a/src/response-store.ts +++ /dev/null @@ -1,77 +0,0 @@ -import type { DB } from './db'; - -export function setup(db: DB) { - return new Promise((res, rej) => { - // requestId - uuid - // segment nr - integer, indexed - // segment body - blob, content - // inserted_at - integer, timestamp, indexed - db.serialize(() => { - db.run( - 'CREATE TABLE IF NOT EXISTS response_store (uuid TEXT PRIMARY KEY, nr INTEGER, body BLOB, inserted_at INTEGER)', - (err) => { - if (err) { - return rej(`Error creating table response_store: ${err}`); - } - }, - ); - - db.run( - 'CREATE INDEX IF NOT EXISTS response_store_inserted_at_index ON response_store (inserted_at)', - (err) => { - if (err) { - return rej(`Error creating index response_store_inserted_at_index: ${err}`); - } - }, - ); - - db.run( - 'CREATE INDEX IF NOT EXISTS response_store_nr_index ON response_store (nr)', - (err) => { - if (err) { - return rej(`Error creating index response_store_nr_index: ${err}`); - } - return res({ db }); - }, - ); - }); - }); -} - -export function put(db: DB, requestId: string, segment: segment): Promise { - return new Promise((res, rej) => { - db.run( - 'INSERT INTO request_store (uuid, counter) VALUES ($id, $counter);', - { - $id: id, - $counter: counter, - }, - (err) => { - if (err) { - const errStr = String(err).toLowerCase(); - const cts = ['sqlite_constraint', 'unique', 'request_store.uuid']; - if (cts.every((c) => errStr.includes(c))) { - return res(AddRes.Duplicate); - } - return rej(`Error inserting into request_store: ${err}`); - } - return res(AddRes.Success); - }, - ); - }); -} - -export function removeExpired(db: DB, olderThan: number): Promise { - return new Promise((res, rej) => { - db.run( - 'DELETE FROM response_store where inserted_at < $counter;', - { $counter: olderThan }, - (err) => { - if (err) { - return rej(`Error deleting from response_store: ${err}`); - } - return res(); - }, - ); - }); -} From babf83c542f42e12c374893e6ef469edc74f2ba8 Mon Sep 17 00:00:00 2001 From: Ronny Esterluss Date: Tue, 15 Oct 2024 14:22:16 +0200 Subject: [PATCH 3/8] fix compilaion errors --- src/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/index.ts b/src/index.ts index 498c0c5..403143b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -441,7 +441,7 @@ async function completeSegmentsEntry( } // check uuid - const res = await RequestStore.addIfAbsent(state.requestStore, requestId, counter); + const res = await RequestStore.addIfAbsent(state.db, requestId, counter); if (res === RequestStore.AddRes.Duplicate) { log.info('duplicate request id:', requestId); // duplicate fail resp From 8d43c28b344a5ec047b9ea7b31da036ee9c3f4f7 Mon Sep 17 00:00:00 2001 From: Ronny Esterluss Date: Tue, 15 Oct 2024 16:25:58 +0200 Subject: [PATCH 4/8] gently nudging node towards performance breakdown --- .gitignore | 4 ++- src/index.ts | 45 +++++++++++++++++--------- src/response-segment-store.ts | 61 ++++++++++++++++++----------------- 3 files changed, 64 insertions(+), 46 deletions(-) diff --git a/.gitignore b/.gitignore index f2b5ea3..458e5a5 100644 --- a/.gitignore +++ b/.gitignore @@ -28,4 +28,6 @@ coverage/ node_modules/ # Created by github pipeline -gha-creds-*.json \ No newline at end of file +gha-creds-*.json + +*.sqlite3 diff --git a/src/index.ts b/src/index.ts index 403143b..131bc49 100644 --- a/src/index.ts +++ b/src/index.ts @@ -271,7 +271,7 @@ function onMessage(state: State, ops: Ops) { } // determine if segment retransfer - if (msg.body.startsWith('resg-')) { + if (msg.body.startsWith('resg;')) { return onRetransferSegmentsReq(state, ops, msg); } @@ -367,23 +367,31 @@ function onInfoReq(state: State, ops: Ops, msg: Msg) { function onRetransferSegmentsReq(state: State, ops: Ops, msg: Msg) { log.info('received retransfer segments req:', msg.body); - // resg-originPeerId-hops-requestId-segmentNrs - const [, recipient, hopsStr, requestId, rawSegNrs] = msg.body.split('-'); + // resg;originPeerId;hops;requestId;segmentNrs + const [, recipient, hopsStr, requestId, rawSegNrs] = msg.body.split(';'); const hops = parseInt(hopsStr, 10); const conn = { ...ops, hops }; - const segNrs = rawSegNrs.split(',').map(parseInt); + const segNrs = rawSegNrs.split(',').map((s) => parseInt(s)); + console.log('segNrs', segNrs); + log.verbose('reading segment nrs for %s from db: %o', requestId, segNrs); ResponseSegmentStore.all(state.db, requestId, segNrs) .then((segments) => { - segments.map((seg, idx) => { + segments.forEach((seg, idx) => { setTimeout(() => { + const segLog = Segment.prettyPrint(seg); + log.verbose('retransferring %s to %s', segLog, recipient); NodeApi.sendMessage(conn, { recipient, tag: msg.tag, message: Segment.toMessage(seg), - }).catch((err: Error) => { - log.error('error retransferring %s: %o', Segment.prettyPrint(seg), err); - }); + }) + .then((r) => { + log.verbose('retransfer response %s: %o', segLog, r); + }) + .catch((err: Error) => { + log.error('error retransferring %s: %o', Segment.prettyPrint(seg), err); + }); }, idx); }); }) @@ -572,17 +580,24 @@ function sendResponse( // queue segment sending for all of them const ts = Date.now(); segments.forEach((seg: Segment.Segment, idx) => { - setTimeout(function () { - ResponseSegmentStore.put(state.db, seg, ts); + const segLog = Segment.prettyPrint(seg); + log.verbose('putting %s into db at %d ', segLog, ts); + ResponseSegmentStore.put(state.db, seg, ts); + setTimeout(() => { + log.verbose('sending %s to %s', segLog, entryPeerId); NodeApi.sendMessage(conn, { recipient: entryPeerId, tag, message: Segment.toMessage(seg), - }).catch((err: Error) => { - log.error('error sending %s: %o', Segment.prettyPrint(seg), err); - // remove relay if it fails - state.relays = state.relays.filter((r) => r !== relay); - }); + }) + .then((r) => { + log.verbose('response %s: %o', segLog, r); + }) + .catch((err: Error) => { + log.error('error sending %s: %o', Segment.prettyPrint(seg), err); + // remove relay if it fails + state.relays = state.relays.filter((r) => r !== relay); + }); }, idx); }); diff --git a/src/response-segment-store.ts b/src/response-segment-store.ts index 00922de..bcbfc5f 100644 --- a/src/response-segment-store.ts +++ b/src/response-segment-store.ts @@ -53,15 +53,9 @@ export function put(db: DB, segment: Segment.Segment, insertedAt: number): Promi db.run( [ 'INSERT INTO response_segment_store (request_id, nr, total_count, body, inserted_at)', - 'VALUES ($requestId, $nr, $totalCount, $body, $insertedAt);', + 'VALUES (?, ?, ?, ?, ?);', ].join(' '), - { - $requestId: segment.requestId, - $nr: segment.nr, - $totalCount: segment.totalCount, - $body: segment.body, - $insertedAt: insertedAt, - }, + [segment.requestId, segment.nr, segment.totalCount, segment.body, insertedAt], (err) => { if (err) { return rej(`Error inserting segment into response_segment_store: ${err}`); @@ -73,29 +67,36 @@ export function put(db: DB, segment: Segment.Segment, insertedAt: number): Promi } export function all(db: DB, requestId: string, segmentNrs: number[]): Promise { + // manually handle IN operator behaviour + const placeholders = segmentNrs.map(() => '?').join(','); + const params: Record = { 1: requestId }; + for (let i = 0; i < segmentNrs.length; i++) { + // sql indexes start at 1 - which was already used for requestId + params[i + 2] = segmentNrs[i]; + } + const select = [ + 'SELECT nr, body, total_count FROM response_segment_store', + `WHERE request_id = ? and nr in (${placeholders})`, + ].join(' '); return new Promise((res, rej) => { - db.all( - [ - 'SELECT nr, body, total_count FROM response_segment_store', - 'WHERE request_id = $requestId and nr in $nrs', - ].join(' '), - { $requestId: requestId, $nrs: segmentNrs }, - function (err, rows) { - if (err) { - return rej(`Error selecting segment from response_segment_store: ${err}`); - } - const segments = rows.map((rawRow) => { - const row = rawRow as { nr: number; total_count: number; body: string }; - return { - requestId, - nr: row.nr, - totalCount: row.total_count, - body: row.body, - }; - }); - return res(segments); - }, - ); + db.all(select, params, function (err: any, rows: any) { + if (err) { + return rej(`Error selecting segment from response_segment_store: ${err}`); + } + if (!rows) { + return rej('Unable to find any segments matching missing segments request'); + } + const segments = rows.map((rawRow: any) => { + const row = rawRow as { nr: number; total_count: number; body: string }; + return { + requestId, + nr: row.nr, + totalCount: row.total_count, + body: row.body, + }; + }); + return res(segments); + }); }); } From 06d2f1447d932dc18d4f5449931034e873b10eb8 Mon Sep 17 00:00:00 2001 From: Ronny Esterluss Date: Tue, 15 Oct 2024 20:37:49 +0200 Subject: [PATCH 5/8] scheduled more lenient transfer --- src/index.ts | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/index.ts b/src/index.ts index 131bc49..5b1fc9c 100644 --- a/src/index.ts +++ b/src/index.ts @@ -191,7 +191,9 @@ function removeExpiredRequests(state: State) { } function scheduleRemoveExpiredSegmentResponses(state: State) { - setTimeout(function () { + const logM = ValidResponseSegmentPeriod / 1000 / 60; + log.info('scheduling next remove expired response segments in %dm', logM); + setTimeout(() => { removeExpiredSegmentResponses(state); }, ValidResponseSegmentPeriod); } @@ -372,7 +374,6 @@ function onRetransferSegmentsReq(state: State, ops: Ops, msg: Msg) { const hops = parseInt(hopsStr, 10); const conn = { ...ops, hops }; const segNrs = rawSegNrs.split(',').map((s) => parseInt(s)); - console.log('segNrs', segNrs); log.verbose('reading segment nrs for %s from db: %o', requestId, segNrs); ResponseSegmentStore.all(state.db, requestId, segNrs) @@ -392,7 +393,7 @@ function onRetransferSegmentsReq(state: State, ops: Ops, msg: Msg) { .catch((err: Error) => { log.error('error retransferring %s: %o', Segment.prettyPrint(seg), err); }); - }, idx); + }, idx * 10); }); }) .catch((err) => { @@ -598,7 +599,7 @@ function sendResponse( // remove relay if it fails state.relays = state.relays.filter((r) => r !== relay); }); - }, idx); + }, idx * 10); }); if (ops.discoveryPlatform) { From 28d63498801af8596fc99fb4317ea42f107265c7 Mon Sep 17 00:00:00 2001 From: Ronny Esterluss Date: Wed, 16 Oct 2024 10:48:20 +0200 Subject: [PATCH 6/8] updating to retransfer uhttp --- package.json | 2 +- src/index.ts | 46 ++++++++++++++++++++++++---------------------- yarn.lock | 8 ++++---- 3 files changed, 29 insertions(+), 27 deletions(-) diff --git a/package.json b/package.json index 1a647b3..3fb780c 100644 --- a/package.json +++ b/package.json @@ -32,7 +32,7 @@ "typescript": "^5.4.5" }, "dependencies": { - "@hoprnet/uhttp-lib": "^3.3.0", + "@hoprnet/uhttp-lib": "^3.7.2", "debug": "^4.3.4", "sqlite3": "^5.1.7", "ws": "^8.18.0" diff --git a/src/index.ts b/src/index.ts index 5b1fc9c..3ae5275 100644 --- a/src/index.ts +++ b/src/index.ts @@ -58,6 +58,7 @@ type Ops = { accessToken: string; discoveryPlatform?: OpsDP; dbFile: string; + pinnedFetch: typeof globalThis.fetch; }; type Msg = { @@ -104,7 +105,8 @@ async function setup(ops: Ops): Promise { } const { hopr: peerId } = resPeerId; - const cache = SegmentCache.init(); + // we don't care for missing segments reminder in our cache + const cache = SegmentCache.init(function () {}); const deleteTimer = new Map(); const logOpts: Record = { @@ -459,22 +461,19 @@ async function completeSegmentsEntry( // do actual endpoint request const fetchStartedAt = performance.now(); - const resFetch = await EndpointApi.fetchUrl(reqPayload.endpoint, reqPayload).catch( - (err: Error) => { - log.error( - 'error doing RPC req on %s with %o: %o', - reqPayload.endpoint, - reqPayload, - err, - ); - // HTTP critical fail response - const resp: Payload.RespPayload = { - type: Payload.RespType.Error, - reason: err.toString(), - }; - return sendResponse(sendParams, resp); - }, - ); + const resFetch = await EndpointApi.fetchUrl( + ops.pinnedFetch, + reqPayload.endpoint, + reqPayload, + ).catch((err: Error) => { + log.error('error doing RPC req on %s with %o: %o', reqPayload.endpoint, reqPayload, err); + // HTTP critical fail response + const resp: Payload.RespPayload = { + type: Payload.RespType.Error, + reason: err.toString(), + }; + return sendResponse(sendParams, resp); + }); if (!resFetch) { return; } @@ -604,8 +603,8 @@ function sendResponse( if (ops.discoveryPlatform) { reportToDiscoveryPlatform({ + ops, cacheEntry, - opsDP: ops.discoveryPlatform, reqPayload, segments, }); @@ -613,13 +612,13 @@ function sendResponse( } async function reportToDiscoveryPlatform({ + ops, cacheEntry, - opsDP, reqPayload, segments, }: { cacheEntry: SegmentCache.Entry; - opsDP: OpsDP; + ops: Ops; reqPayload: Payload.ReqPayload; segments: Segment.Segment[]; }) { @@ -649,9 +648,11 @@ async function reportToDiscoveryPlatform({ type: 'response', }; + const dp = ops.discoveryPlatform as OpsDP; const conn = { - discoveryPlatformEndpoint: opsDP.endpoint, - nodeAccessToken: opsDP.nodeAccessToken, + discoveryPlatformEndpoint: dp.endpoint, + nodeAccessToken: dp.nodeAccessToken, + pinnedFetch: ops.pinnedFetch, }; DpApi.postQuota(conn, quotaRequest).catch((err) => { log.error('error recording request quota: %o', err); @@ -723,5 +724,6 @@ if (require.main === module) { accessToken: process.env.UHTTP_EA_HOPRD_ACCESS_TOKEN, discoveryPlatform, dbFile: process.env.UHTTP_EA_DATABASE_FILE, + pinnedFetch: globalThis.fetch.bind(globalThis), }); } diff --git a/yarn.lock b/yarn.lock index e60f4d0..ba69dd6 100644 --- a/yarn.lock +++ b/yarn.lock @@ -355,10 +355,10 @@ "@noble/curves" "^1.3.0" "@noble/hashes" "^1.3.3" -"@hoprnet/uhttp-lib@^3.3.0": - version "3.3.0" - resolved "https://registry.yarnpkg.com/@hoprnet/uhttp-lib/-/uhttp-lib-3.3.0.tgz#11ab38f3da1bd6dca6ecc93e19e582fc827ab313" - integrity sha512-oFw+eCSO2uyaCmp48JazzyilPSZ8Z2SW7JAoq40YpKbiilG7rqWW+sdVWHegjqdN2RNJJnNFonXT+7UWdE0mSw== +"@hoprnet/uhttp-lib@^3.7.2": + version "3.7.2" + resolved "https://registry.yarnpkg.com/@hoprnet/uhttp-lib/-/uhttp-lib-3.7.2.tgz#ac97c51022b5298f020f502d00c8295f3fe3fdb3" + integrity sha512-aNUtnUsK367b8QDFODGmo2l6qLQgzTlFAk7yd1kNOGHBkjfL9ZVQIEn9oka1FiV4jnIBNBU+5+7F4ovZ5Yq2ug== dependencies: "@hoprnet/uhttp-crypto" "^1.0.1" debug "^4.3.4" From fb0d4f78d09c79126c066587aa4a66182999336c Mon Sep 17 00:00:00 2001 From: Ronny Esterluss Date: Wed, 16 Oct 2024 11:06:14 +0200 Subject: [PATCH 7/8] fix tests --- src/request-store.spec.ts | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/src/request-store.spec.ts b/src/request-store.spec.ts index 091eca6..729feb7 100644 --- a/src/request-store.spec.ts +++ b/src/request-store.spec.ts @@ -1,38 +1,40 @@ import * as reqStore from './request-store'; +import * as DB from './db'; -let store: reqStore.RequestStore; +let db: DB.DB; describe('request store', function () { beforeEach(async () => { // setup fluent database on disk - store = await reqStore.setup(''); + db = await DB.setup(''); + await reqStore.setup(db); }); - afterEach(async () => reqStore.close(store)); + afterEach(async () => DB.close(db)); it('addIfAbsent adds correctly', async function () { - const res = await reqStore.addIfAbsent(store, 'foobar', Date.now()); + const res = await reqStore.addIfAbsent(db, 'foobar', Date.now()); expect(res).toBe(reqStore.AddRes.Success); }); it('addIfAbsent detects duplicates', async function () { - await reqStore.addIfAbsent(store, 'foobar', Date.now()); - const res = await reqStore.addIfAbsent(store, 'foobar', Date.now()); + await reqStore.addIfAbsent(db, 'foobar', Date.now()); + const res = await reqStore.addIfAbsent(db, 'foobar', Date.now()); expect(res).toBe(reqStore.AddRes.Duplicate); }); it('removeExpired removes olderThan entries', async function () { const now = Date.now(); await Promise.all([ - reqStore.addIfAbsent(store, 'foobar1', now - 100), - reqStore.addIfAbsent(store, 'foobar2', now - 100), - reqStore.addIfAbsent(store, 'foobar3', now - 50), + reqStore.addIfAbsent(db, 'foobar1', now - 100), + reqStore.addIfAbsent(db, 'foobar2', now - 100), + reqStore.addIfAbsent(db, 'foobar3', now - 50), ]); - await reqStore.removeExpired(store, now - 50); + await reqStore.removeExpired(db, now - 50); const p = new Promise((res) => { - store.db.all('SELECT * from request_store', (err, rows) => { + db.all('SELECT * from request_store', (err, rows) => { expect(err).toBe(null); expect(rows).toHaveLength(1); const row = rows[0] as { uuid: string; counter: number }; From ee39273b79adfd96a24362c5b918947daebfd188 Mon Sep 17 00:00:00 2001 From: Ronny Esterluss Date: Wed, 16 Oct 2024 11:13:48 +0200 Subject: [PATCH 8/8] adjust docker build to current verison --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index e21dfba..70566d6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ # Specify platform to be amd because arm doesn't work -FROM --platform=linux/amd64 node:18-alpine as builder +FROM --platform=linux/amd64 node:20-alpine as builder RUN apk upgrade --no-cache