diff --git a/API.md b/API.md index 3180d383..a6dc3dd6 100644 --- a/API.md +++ b/API.md @@ -1,91 +1,5 @@ ## API -### Find transitive transfer steps - -Returns steps to transfer transitively through trust graph from one node to another. - -**Request:** - -`POST /api/transfers` - -**Parameters:** - -``` -{ - from: , - to: , - value: , - hops: , -} -``` - -- `from`: Sender address -- `to`: Receiver address -- `value`: Amount of Freckles to send between sender and receiver (the fractional monetary unit of Circles is named Freckles. One Circle = 1,000,000,000,000,000,000 Freckles (1018)) -- `hops` (optional): pathfinder2 parameter used to limit the area around the sender that is explored; the maximal "chain length" - -**Response:** - -``` -{ - status: 'ok', - data: { - from: , - to: , - maxFlowValue: , - transferSteps: , - transferValue: , - statistics: - } -} -``` - -**Errors:** - -- `400` Parameters missing or malformed -- `422` Invalid transfer - -### Update transitive transfer steps - -Updates the steps of a transitive transfer. - -**Request:** - -`POST /api/transfers/update` - -**Parameters:** - -``` -{ - from: , - to: , - value: , - hops: , -} -``` - -- `from`: Sender address -- `to`: Receiver address -- `value`: Amount of Freckles intended to be sent between sender and receiver (the fractional monetary unit of Circles is named Freckles. One Circle = 1,000,000,000,000,000,000 Freckles (1018)) -- `hops` (optional): pathfinder2 parameter used to limit the area around the sender that is explored; the maximal "chain length" - -**Response:** - -``` -{ - status: 'ok', - data: { - updated: - } -} -``` - -- `updated`: Whether all the steps have been successfully updated - -**Errors:** - -- `400` Parameters missing or malformed - ### Store transfer meta data Stores meta data like payment note connected to a made transfer. This data is only readable for sender or receiver of that transaction. @@ -515,4 +429,4 @@ Resolves multiple news items starting from newest item in the database, via: **Errors:** -When no news were found an empty response will be returned. \ No newline at end of file +When no news were found an empty response will be returned. diff --git a/package-lock.json b/package-lock.json index a75224cf..4b07bb5e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -15380,6 +15380,7 @@ "resolved": "https://registry.npmjs.org/bufferutil/-/bufferutil-4.0.5.tgz", "integrity": "sha512-HTm14iMQKK2FjFLRTM5lAVcyaUzOnqbPtesFIvREgXpJHdQm8bWS+GkQgIkfaBYRHuCnea7w8UVNfwiAQhlr9A==", "dev": true, + "hasInstallScript": true, "optional": true, "dependencies": { "node-gyp-build": "^4.3.0" @@ -15737,6 +15738,7 @@ "resolved": "https://registry.npmjs.org/utf-8-validate/-/utf-8-validate-5.0.7.tgz", "integrity": "sha512-vLt1O5Pp+flcArHGIyKEQq883nBt8nN8tVBcoL0qUXj2XT1n7p70yGIq2VK98I5FdZ1YHc0wk/koOnHjnXWk1Q==", "dev": true, + "hasInstallScript": true, "optional": true, "dependencies": { "node-gyp-build": "^4.3.0" diff --git a/src/constants.js b/src/constants.js deleted file mode 100644 index 023b40ee..00000000 --- a/src/constants.js +++ /dev/null @@ -1,10 +0,0 @@ -import path from 'path'; - -export const ZERO_ADDRESS = '0x0000000000000000000000000000000000000000'; - -export const BASE_PATH = path.join(__dirname, '..'); -export const EDGES_DIRECTORY_PATH = path.join(BASE_PATH, 'edges-data'); -export const EDGES_FILE_PATH = path.join(EDGES_DIRECTORY_PATH, 'edges.csv'); -export const PATHFINDER_FILE_PATH = path.join(BASE_PATH, 'pathfinder'); - -export const HOPS_DEFAULT = '3'; diff --git a/src/controllers/transfers.js b/src/controllers/transfers.js deleted file mode 100644 index 83bc6c44..00000000 --- a/src/controllers/transfers.js +++ /dev/null @@ -1,164 +0,0 @@ -import httpStatus from 'http-status'; - -import APIError from '../helpers/errors'; -import Transfer from '../models/transfers'; -import transferSteps from '../services/findTransferSteps'; -import updatePath from '../services/updateTransferSteps'; -import { checkFileExists } from '../services/edgesFile'; -import { checkSignature } from '../helpers/signature'; -import { requestGraph } from '../services/graph'; -import { respondWithSuccess } from '../helpers/responses'; - -function prepareTransferResult(response) { - return { - id: response.id, - from: response.from, - to: response.to, - transactionHash: response.transactionHash, - paymentNote: response.paymentNote, - }; -} - -async function checkIfExists(transactionHash) { - const response = await Transfer.findOne({ - where: { - transactionHash, - }, - }); - - if (response) { - throw new APIError(httpStatus.CONFLICT, 'Entry already exists'); - } -} - -export default { - createNewTransfer: async (req, res, next) => { - const { address, signature, data } = req.body; - const { from, to, transactionHash, paymentNote } = data; - - try { - // Check signature - if (!checkSignature([from, to, transactionHash], signature, address)) { - throw new APIError(httpStatus.FORBIDDEN, 'Invalid signature'); - } - - // Check if entry already exists - await checkIfExists(transactionHash); - } catch (err) { - return next(err); - } - - // Everything is fine, create entry! - Transfer.create({ - from, - to, - paymentNote, - transactionHash, - }) - .then(() => { - respondWithSuccess(res, null, httpStatus.CREATED); - }) - .catch((err) => { - next(err); - }); - }, - - getByTransactionHash: async (req, res, next) => { - const { transactionHash } = req.params; - const { address, signature } = req.body; - let safeAddresses = []; - - // Check signature - try { - if (!checkSignature([transactionHash], signature, address)) { - throw new APIError(httpStatus.FORBIDDEN, 'Invalid signature'); - } - - // Check if signer ownes the claimed safe address - const query = `{ - user(id: "${address.toLowerCase()}") { - safeAddresses - } - }`; - - const data = await requestGraph(query); - - if (!data || !data.user) { - throw new APIError(httpStatus.FORBIDDEN, 'Not allowed'); - } - - safeAddresses = data.user.safeAddresses; - } catch (err) { - return next(err); - } - - Transfer.findOne({ - where: { - transactionHash, - }, - }) - .then((response) => { - if (!response) { - next(new APIError(httpStatus.NOT_FOUND)); - } else if ( - // Check if user is either sender or receiver - !safeAddresses.includes(response.from.toLowerCase()) && - !safeAddresses.includes(response.to.toLowerCase()) - ) { - next(new APIError(httpStatus.FORBIDDEN, 'Not allowed')); - } else { - respondWithSuccess(res, prepareTransferResult(response)); - } - }) - .catch((err) => { - next(err); - }); - }, - - findTransferSteps: async (req, res, next) => { - if (!checkFileExists()) { - next( - new APIError( - httpStatus.SERVICE_UNAVAILABLE, - 'Trust network file does not exist', - ), - ); - } - - try { - const result = await transferSteps({ - ...req.body, - }); - - respondWithSuccess(res, result); - } catch (error) { - next(new APIError(httpStatus.UNPROCESSABLE_ENTITY, error.message)); - } - }, - - updateTransferSteps: async (req, res, next) => { - if (!checkFileExists()) { - next( - new APIError( - httpStatus.SERVICE_UNAVAILABLE, - 'Trust network file does not exist', - ), - ); - } - - try { - const result = await updatePath({ - ...req.body, - }); - - respondWithSuccess(res, result); - } catch (error) { - next(new APIError(httpStatus.UNPROCESSABLE_ENTITY, error.message)); - } - }, - - getMetrics: async (req, res) => { - // @DEPRECATED - respondWithSuccess(res); - }, -}; diff --git a/src/database/migrations/20201008130741-create-edges.js b/src/database/migrations/20201008130741-create-edges.js deleted file mode 100644 index 2642ab11..00000000 --- a/src/database/migrations/20201008130741-create-edges.js +++ /dev/null @@ -1,39 +0,0 @@ -module.exports = { - up: (queryInterface, Sequelize) => { - return queryInterface.createTable('edges', { - id: { - type: Sequelize.INTEGER, - primaryKey: true, - autoIncrement: true, - }, - createdAt: { - type: Sequelize.DATE, - }, - updatedAt: { - type: Sequelize.DATE, - }, - from: { - type: Sequelize.STRING(42), - allowNull: false, - unique: 'edges_unique', - }, - to: { - type: Sequelize.STRING(42), - allowNull: false, - unique: 'edges_unique', - }, - token: { - type: Sequelize.STRING(42), - allowNull: false, - unique: 'edges_unique', - }, - capacity: { - type: Sequelize.STRING, - allowNull: false, - }, - }); - }, - down: (queryInterface) => { - return queryInterface.dropTable('edges'); - }, -}; diff --git a/src/database/migrations/20201012092801-create-metrics.js b/src/database/migrations/20201012092801-create-metrics.js deleted file mode 100644 index 3463139d..00000000 --- a/src/database/migrations/20201012092801-create-metrics.js +++ /dev/null @@ -1,34 +0,0 @@ -module.exports = { - up: (queryInterface, Sequelize) => { - return queryInterface.createTable('metrics', { - id: { - type: Sequelize.INTEGER, - primaryKey: true, - autoIncrement: true, - }, - createdAt: { - type: Sequelize.DATE, - }, - updatedAt: { - type: Sequelize.DATE, - }, - category: { - type: Sequelize.STRING, - allowNull: false, - unique: 'metrics_unique', - }, - name: { - type: Sequelize.STRING, - allowNull: false, - unique: 'metrics_unique', - }, - value: { - type: Sequelize.BIGINT, - allowNull: false, - }, - }); - }, - down: (queryInterface) => { - return queryInterface.dropTable('metrics'); - }, -}; diff --git a/src/database/migrations/20201021125825-add-index-to-edges.js b/src/database/migrations/20201021125825-add-index-to-edges.js deleted file mode 100644 index c14c1185..00000000 --- a/src/database/migrations/20201021125825-add-index-to-edges.js +++ /dev/null @@ -1,11 +0,0 @@ -module.exports = { - up: async (queryInterface) => { - await queryInterface.addIndex('edges', ['from', 'to', 'token'], { - name: 'edges_unique', - unique: true, - }); - }, - down: async (queryInterface) => { - await queryInterface.removeIndex('edges', ['from', 'to', 'token']); - }, -}; diff --git a/src/database/seeders/20201012093423-create-transfer-metrics.js b/src/database/seeders/20201012093423-create-transfer-metrics.js deleted file mode 100644 index d886eed6..00000000 --- a/src/database/seeders/20201012093423-create-transfer-metrics.js +++ /dev/null @@ -1,29 +0,0 @@ -module.exports = { - up: async (queryInterface) => { - await queryInterface.bulkInsert( - 'metrics', - [ - 'countEdges', - 'countSafes', - 'countTokens', - 'edgesLastAdded', - 'edgesLastRemoved', - 'edgesLastUpdated', - 'lastBlockNumber', - 'lastUpdateAt', - 'lastUpdateDuration', - ].map((name) => { - return { - category: 'transfers', - name, - value: 0, - }; - }), - {}, - ); - }, - - down: async (queryInterface) => { - await queryInterface.bulkDelete('metrics', null, {}); - }, -}; diff --git a/src/helpers/loop.js b/src/helpers/loop.js deleted file mode 100644 index d29346ca..00000000 --- a/src/helpers/loop.js +++ /dev/null @@ -1,25 +0,0 @@ -const LOOP_INTERVAL = 5000; -const MAX_ATTEMPTS = 12; - -export default async function loop(request, condition) { - return new Promise((resolve, reject) => { - let attempt = 0; - - const interval = setInterval(async () => { - try { - const response = await request(); - attempt += 1; - - if (condition(response)) { - clearInterval(interval); - resolve(response); - } else if (attempt > MAX_ATTEMPTS) { - throw new Error('Too many attempts'); - } - } catch (error) { - clearInterval(interval); - reject(error); - } - }, LOOP_INTERVAL); - }); -} diff --git a/src/models/edges.js b/src/models/edges.js deleted file mode 100644 index d0abbe65..00000000 --- a/src/models/edges.js +++ /dev/null @@ -1,44 +0,0 @@ -import Sequelize from 'sequelize'; - -import db from '../database'; - -const Edge = db.define( - 'edges', - { - id: { - type: Sequelize.INTEGER, - primaryKey: true, - autoIncrement: true, - }, - from: { - type: Sequelize.STRING(42), - allowNull: false, - unique: 'edges_unique', - }, - to: { - type: Sequelize.STRING(42), - allowNull: false, - unique: 'edges_unique', - }, - token: { - type: Sequelize.STRING(42), - allowNull: false, - unique: 'edges_unique', - }, - capacity: { - type: Sequelize.STRING, - allowNull: false, - }, - }, - { - indexes: [ - { - name: 'edges_unique', - unique: true, - fields: ['from', 'to', 'token'], - }, - ], - }, -); - -export default Edge; diff --git a/src/routes/index.js b/src/routes/index.js index c0ce7974..8bf9a76a 100644 --- a/src/routes/index.js +++ b/src/routes/index.js @@ -3,7 +3,6 @@ import httpStatus from 'http-status'; import APIError from '../helpers/errors'; import newsRouter from './news'; -import transfersRouter from './transfers'; import uploadsRouter from './uploads'; import usersRouter from './users'; import { respondWithSuccess } from '../helpers/responses'; @@ -15,7 +14,6 @@ router.get('/', (req, res) => { }); router.use('/news', newsRouter); -router.use('/transfers', transfersRouter); router.use('/uploads', uploadsRouter); router.use('/users', usersRouter); diff --git a/src/routes/transfers.js b/src/routes/transfers.js deleted file mode 100644 index d6f4fa20..00000000 --- a/src/routes/transfers.js +++ /dev/null @@ -1,35 +0,0 @@ -import express from 'express'; - -import transfersController from '../controllers/transfers'; -import transfersValidation from '../validations/transfers'; -import validate from '../helpers/validate'; - -const router = express.Router(); - -router.put( - '/', - validate(transfersValidation.createNewTransfer), - transfersController.createNewTransfer, -); - -router.post( - '/update', - validate(transfersValidation.findTransferSteps), - transfersController.updateTransferSteps, -); - -router.post( - '/:transactionHash', - validate(transfersValidation.getByTransactionHash), - transfersController.getByTransactionHash, -); - -router.post( - '/', - validate(transfersValidation.findTransferSteps), - transfersController.findTransferSteps, -); - -router.get('/status', transfersController.getMetrics); - -export default router; diff --git a/src/services/edgesDatabase.js b/src/services/edgesDatabase.js deleted file mode 100644 index 953d2c86..00000000 --- a/src/services/edgesDatabase.js +++ /dev/null @@ -1,41 +0,0 @@ -import Edge from '../models/edges'; - -export async function upsertEdge(edge) { - if (edge.capacity.toString() === '0') { - return destroyEdge(edge); - } else { - return Edge.upsert(edge, { - where: { - token: edge.token, - from: edge.from, - to: edge.to, - }, - }); - } -} - -export async function destroyEdge(edge) { - return Edge.destroy({ - where: { - token: edge.token, - from: edge.from, - to: edge.to, - }, - }); -} - -export async function queryEdges(where) { - return await Edge.findAll({ - where, - order: [['from', 'ASC']], - raw: true, - }); -} - -export async function getStoredEdges({ hasOnlyFileFields = false } = {}) { - return await Edge.findAll({ - attributes: hasOnlyFileFields ? ['from', 'to', 'token', 'capacity'] : null, - order: [['from', 'ASC']], - raw: true, - }); -} diff --git a/src/services/edgesFile.js b/src/services/edgesFile.js deleted file mode 100644 index 98858556..00000000 --- a/src/services/edgesFile.js +++ /dev/null @@ -1,73 +0,0 @@ -import path from 'path'; -import web3 from './web3'; -import fs from 'fs'; -import { execSync } from 'child_process'; -import copyTo from 'pg-copy-streams/copy-to'; - -import db from '../database'; -import { EDGES_FILE_PATH, EDGES_DIRECTORY_PATH } from '../constants'; - -// Export from PostgresDB to CSV file -async function exportCSV(file) { - return new Promise((resolve, reject) => { - db.connectionManager - .getConnection() - .then((client) => { - const output = client.query( - copyTo( - `COPY edges ("from", "to", "token", "capacity") TO STDOUT WITH (FORMAT CSV)`, - ), - ); - let data = ''; - const stream = fs.createWriteStream(file); - stream.setDefaultEncoding('utf8'); - stream.on('error', (err) => { - db.connectionManager.releaseConnection(client); - reject('Error in DB connection. Error:', err); - }); - - stream.on('finish', () => { - db.connectionManager.releaseConnection(client); - resolve(data); - }); - output.on('data', (chunk) => (data += chunk)).pipe(stream); - }) - .catch((err) => { - reject(err); - }); - }); -} - -export function checkFileExists() { - return fs.existsSync(EDGES_FILE_PATH); -} - -// Store edges into .csv file for pathfinder -export async function writeToFile( - tmpFileKey = web3.utils.randomHex(16).slice(2), -) { - // Create temporary file path first - const tmpFilePath = path.join( - EDGES_DIRECTORY_PATH, - `edges-tmp-${tmpFileKey}.csv`, - ); - try { - // Check if `edges-data` folder exists and create it otherwise - if (!fs.existsSync(EDGES_DIRECTORY_PATH)) { - fs.mkdirSync(EDGES_DIRECTORY_PATH); - } - // Create empty file - fs.closeSync(fs.openSync(tmpFilePath, 'w')); - await exportCSV(tmpFilePath); - fs.renameSync(tmpFilePath, EDGES_FILE_PATH); - // count lines of final csv - return parseInt(execSync(`wc -l ${EDGES_FILE_PATH} | awk '{ print $1 }'`)); - } catch (error) { - try { - fs.unlinkSync(tmpFilePath); - } catch (err) { - throw new Error('Could not delete temporary csv file. Error:' + err); - } - throw new Error('Could not create csv file. Error:' + error); - } -} diff --git a/src/services/edgesFromEvents.js b/src/services/edgesFromEvents.js deleted file mode 100644 index 681f677b..00000000 --- a/src/services/edgesFromEvents.js +++ /dev/null @@ -1,221 +0,0 @@ -import HubContract from '@circles/circles-contracts/build/contracts/Hub.json'; - -import EdgeUpdateManager from './edgesUpdate'; -import logger from '../helpers/logger'; -import web3 from './web3'; -import { ZERO_ADDRESS } from '../constants'; -import { queryEdges } from './edgesDatabase'; -import { requestGraph } from './graph'; - -const hubContract = new web3.eth.Contract( - HubContract.abi, - process.env.HUB_ADDRESS, -); - -function addressesFromTopics(topics) { - return [ - web3.utils.toChecksumAddress(`0x${topics[1].slice(26)}`), - web3.utils.toChecksumAddress(`0x${topics[2].slice(26)}`), - ]; -} - -async function requestSafe(safeAddress) { - const query = `{ - safe(id: "${safeAddress.toLowerCase()}") { - outgoing { - canSendToAddress - userAddress - } - incoming { - canSendToAddress - userAddress - } - } - }`; - - const data = await requestGraph(query); - - if (!data || !('safe' in data)) { - throw new Error(`Could not fetch graph data for Safe ${safeAddress}`); - } - - return data.safe; -} - -async function updateAllWhoTrustToken( - { tokenOwner, tokenAddress, address, tokenOwnerData }, - edgeUpdateManager, -) { - logger.info( - `Found ${tokenOwnerData.outgoing.length} outgoing addresses while processing job for Safe ${tokenOwner}`, - ); - - await Promise.all( - tokenOwnerData.outgoing.map(async (trustObject) => { - const canSendToAddress = web3.utils.toChecksumAddress( - trustObject.canSendToAddress, - ); - - // address -> canSendToAddress - await edgeUpdateManager.updateEdge( - { - token: tokenOwner, - from: address, - to: canSendToAddress, - }, - tokenAddress, - ); - }), - ); -} - -export async function processTransferEvent(data) { - const edgeUpdateManager = new EdgeUpdateManager(); - const [sender, recipient] = addressesFromTopics(data.topics); - - const tokenAddress = web3.utils.toChecksumAddress(data.tokenAddress); - logger.info(`Processing transfer for ${tokenAddress}`); - - const tokenOwner = await hubContract.methods.tokenToUser(tokenAddress).call(); - if (tokenOwner === ZERO_ADDRESS) { - logger.info(`${tokenAddress} is not a Circles token`); - return false; - } - - // a) Update the edge between the `recipient` safe and the `tokenOwner` safe. - // The limit will increase here as the `recipient` will get tokens the - // `tokenOwner` accepts. This update will be ignored if the `tokenOwner` is - // also the `recipient`, if the recipient is the relayer, or if the recipient - // is the zero address - await edgeUpdateManager.updateEdge( - { - token: tokenOwner, - from: recipient, - to: tokenOwner, - }, - tokenAddress, - ); - - // b) Update the edge between the `sender` safe and the `tokenOwner` safe. - // The limit will decrease here as the `sender` will lose tokens the - // `tokenOwner` accepts. This update will be ignored if the `tokenOwner` is - // also the `sender`, or if the sender is the zero address - await edgeUpdateManager.updateEdge( - { - token: tokenOwner, - from: sender, - to: tokenOwner, - }, - tokenAddress, - ); - - // Get more information from the graph about the current trust connections of - // `tokenOwner` - let tokenOwnerData; - try { - tokenOwnerData = await requestSafe(tokenOwner); - } catch (err) { - logger.error(`Safe ${tokenOwner} for job is not registered in graph`); - logger.error(err); - return; - } - - // c) Go through everyone who trusts this token, and update the limit from - // the `recipient` to them. The recipient's capacity increases for this token - // to everyone who trusts the tokenOwner - await updateAllWhoTrustToken( - { - address: recipient, - tokenAddress, - tokenOwner, - tokenOwnerData, - }, - edgeUpdateManager, - ); - - // d) Go through everyone who trusts this token, and update the limit from - // the `sender` to them. The sender's capacity decreases for this token to - // everyone who trusts the tokenOwner - await updateAllWhoTrustToken( - { - address: sender, - tokenAddress, - tokenOwner, - tokenOwnerData, - }, - edgeUpdateManager, - ); - - // e) If someone is sending or receiving their own token, the balance of their own - // token has changed, and therefore the trust limits for all the tokens they accept - // must be updated - if (sender === tokenOwner || recipient === tokenOwner) { - await Promise.all( - tokenOwnerData.incoming.map(async (trustObject) => { - const userTokenAddress = await hubContract.methods - .userToToken(trustObject.userAddress) - .call(); - - if (tokenAddress === ZERO_ADDRESS) { - logger.info(`${sender} is not a Circles user`); - return; - } - - const user = web3.utils.toChecksumAddress(trustObject.userAddress); - return edgeUpdateManager.updateEdge( - { - token: user, - from: user, - to: tokenOwner, - }, - userTokenAddress, - ); - }), - ); - } - - return true; -} - -export async function processTrustEvent(data) { - const edgeUpdateManager = new EdgeUpdateManager(); - const [truster, tokenOwner] = addressesFromTopics(data.topics); - - logger.info(`Processing trust for ${truster}`); - - const tokenAddress = await hubContract.methods.userToToken(tokenOwner).call(); - if (tokenAddress === ZERO_ADDRESS) { - logger.info(`${tokenOwner} is not a Circles user`); - return false; - } - - // a) Update the edge between `tokenOwner` and the `truster`, as the latter - // accepts their token now - await edgeUpdateManager.updateEdge( - { - token: tokenOwner, - from: tokenOwner, - to: truster, - }, - tokenAddress, - ); - - // b) Go through everyone else who holds this token, and update the path - // from the `truster` to them as well, as they can send this token to the - // `truster` - const tokenholders = await queryEdges({ to: tokenOwner, token: tokenOwner }); - await Promise.all( - tokenholders.map(async (edge) => { - await edgeUpdateManager.updateEdge( - { - token: tokenOwner, - from: edge.from, - to: truster, - }, - tokenAddress, - ); - }), - ); - - return true; -} diff --git a/src/services/edgesFromGraph.js b/src/services/edgesFromGraph.js deleted file mode 100644 index 80d554bc..00000000 --- a/src/services/edgesFromGraph.js +++ /dev/null @@ -1,436 +0,0 @@ -import HubContract from '@circles/circles-contracts/build/contracts/Hub.json'; -import TokenContract from '@circles/circles-contracts/build/contracts/Token.json'; -import fastJsonStringify from 'fast-json-stringify'; -import findTransferSteps from '@circles/transfer'; -import fs from 'fs'; -import { performance } from 'perf_hooks'; - -import Edge from '../models/edges'; -import logger from '../helpers/logger'; -import fetchAllFromGraph from './graph'; -import web3 from './web3'; -import { getMetrics, setMetrics } from './metrics'; -import { minNumberString } from '../helpers/compare'; -import { - EDGES_FILE_PATH, - EDGES_TMP_FILE_PATH, - HOPS_DEFAULT, - PATHFINDER_FILE_PATH, -} from '../constants'; - -const METRICS_TRANSFERS = 'transfers'; - -const DEFAULT_PROCESS_TIMEOUT = 1000 * 10; - -const hubContract = new web3.eth.Contract( - HubContract.abi, - process.env.HUB_ADDRESS, -); - -const stringify = fastJsonStringify({ - title: 'Circles Edges Schema', - type: 'array', - properties: { - from: { - type: 'string', - }, - to: { - type: 'string', - }, - token: { - type: 'string', - }, - capacity: { - type: 'string', - }, - }, -}); - -const findToken = (tokens, tokenAddress) => { - return tokens.find((token) => token.address === tokenAddress); -}; - -const findSafe = (safes, safeAddress) => { - return safes.find((safe) => safe.address === safeAddress); -}; - -const findConnection = (connections, userAddress, canSendToAddress) => { - return connections.find((edge) => { - return ( - edge.canSendToAddress === canSendToAddress && - edge.userAddress === userAddress - ); - }); -}; - -export const safeQuery = `{ - canSendToAddress - userAddress -}`; - -export const safeFields = ` - id - outgoing ${safeQuery} - incoming ${safeQuery} - balances { - token { - id - owner { - id - } - } - } -`; - -export async function getTrustNetworkEdges() { - // Methods to parse the data we get to break all down into given safe - // addresses, the tokens they own, the trust connections they have between - // each other and finally a list of all tokens. - const connections = []; - const safes = []; - const tokens = []; - - const addConnection = (userAddress, canSendToAddress) => { - connections.push({ - canSendToAddress, - userAddress, - isExtended: false, - }); - }; - - const addConnections = (newConnections) => { - newConnections.forEach((connection) => { - const userAddress = web3.utils.toChecksumAddress(connection.userAddress); - const canSendToAddress = web3.utils.toChecksumAddress( - connection.canSendToAddress, - ); - - if (!findConnection(connections, userAddress, canSendToAddress)) { - addConnection(userAddress, canSendToAddress); - } - }); - }; - - const addToken = (address, safeAddress) => { - tokens.push({ - address, - safeAddress, - }); - }; - - const addSafe = (safeAddress, balances) => { - const safe = balances.reduce( - (acc, { token }) => { - const tokenAddress = web3.utils.toChecksumAddress(token.id); - const tokenSafeAddress = web3.utils.toChecksumAddress(token.owner.id); - - acc.tokens.push({ - address: tokenAddress, - }); - - if (!findToken(tokens, tokenAddress)) { - addToken(tokenAddress, tokenSafeAddress); - } - - return acc; - }, - { - address: web3.utils.toChecksumAddress(safeAddress), - tokens: [], - }, - ); - - safes.push(safe); - }; - - const response = await fetchAllFromGraph('safes', safeFields); - - response.forEach((safe) => { - if (!findSafe(safes, safe.id)) { - addSafe(safe.id, safe.balances); - - addConnections(safe.outgoing); - addConnections(safe.incoming); - } - }); - - return { - statistics: { - safes: safes.length, - connections: connections.length, - tokens: tokens.length, - }, - edges: findEdgesInGraphData({ - connections, - safes, - tokens, - }), - }; -} - -export function findEdgesInGraphData({ connections, safes, tokens }) { - const edges = []; - - // Find tokens for each connection we can actually use for transitive - // transactions - const checkedEdges = {}; - - const getKey = (from, to, token) => { - return [from, to, token].join(''); - }; - - const addEdge = ({ from, to, tokenAddress, tokenOwner }) => { - // Ignore sending to ourselves - if (from === to) { - return; - } - - // Ignore duplicates - const key = getKey(from, to, tokenOwner); - if (checkedEdges[key]) { - return; - } - checkedEdges[key] = true; - - edges.push({ - from, - to, - tokenAddress, - tokenOwner, - }); - }; - - connections.forEach((connection) => { - const senderSafeAddress = connection.userAddress; - const receiverSafeAddress = connection.canSendToAddress; - - // Get the senders Safe - const senderSafe = findSafe(safes, senderSafeAddress); - - if (!senderSafe) { - return; - } - - // Get tokens the sender owns - const senderTokens = senderSafe.tokens; - - // Which of them are trusted by the receiving node? - const trustedTokens = senderTokens.reduce( - (tokenAcc, { address, balance }) => { - const token = findToken(tokens, address); - - const tokenConnection = connections.find( - ({ limit, userAddress, canSendToAddress }) => { - if (!limit) { - return false; - } - - // Calculate what maximum token value we can send. We use this - // special string comparison method as using BN instances affects - // performance significantly - const capacity = minNumberString(limit, balance); - return ( - userAddress === token.safeAddress && - canSendToAddress === receiverSafeAddress && - capacity !== '0' - ); - }, - ); - - if (tokenConnection) { - tokenAcc.push({ - tokenAddress: token.address, - tokenOwner: token.safeAddress, - }); - } - return tokenAcc; - }, - [], - ); - // Merge all known data to get a list in the end containing what Token can - // be sent to whom with what maximum value. - trustedTokens.reduce((acc, trustedToken) => { - // Ignore sending to ourselves - if (senderSafeAddress === receiverSafeAddress) { - return; - } - - // Ignore duplicates - const key = getKey( - senderSafeAddress, - receiverSafeAddress, - trustedToken.token, - ); - - if (checkedEdges[key]) { - return; - } - - checkedEdges[key] = true; - - acc.push({ - from: senderSafeAddress, - to: receiverSafeAddress, - tokenAddress: trustedToken.tokenAddress, - tokenOwner: trustedToken.tokenOwner, - }); - - return acc; - }, []); - }); - - // Add connections between token owners and the original safe of the token as - // they might not be represented by trust connections (for example when an - // organization owns tokens it can still send them even though noone trusts - // the organization) - safes.forEach(({ address, tokens: ownedTokens }) => { - ownedTokens.forEach(({ address: tokenAddress }) => { - const token = findToken(tokens, tokenAddress); - - connections.forEach((connection) => { - if (connection.userAddress === token.safeAddress) { - addEdge({ - from: address, - to: connection.canSendToAddress, - tokenAddress, - tokenOwner: token.safeAddress, - }); - } - }); - }); - }); - - return edges; -} - -export async function upsert(edge) { - if (edge.capacity.toString() === '0') { - return Edge.destroy({ - where: { - token: edge.token, - from: edge.from, - to: edge.to, - }, - }); - } else { - return Edge.upsert(edge, { - where: { - token: edge.token, - from: edge.from, - to: edge.to, - }, - }); - } -} - -export async function updateEdge(edge, tokenAddress) { - // Ignore self-trust - if (edge.from === edge.to) { - return; - } - - try { - // Get send limit - const limit = await hubContract.methods - .checkSendLimit(edge.token, edge.from, edge.to) - .call(); - - // Get Token balance - const tokenContract = new web3.eth.Contract( - TokenContract.abi, - tokenAddress, - ); - const balance = await tokenContract.methods.balanceOf(edge.from).call(); - - // Update edge capacity - edge.capacity = minNumberString(limit, balance); - - await upsert(edge); - } catch (error) { - logger.error( - `Found error with checking sending limit for token of ${edge.token} from ${edge.from} to ${edge.to} [${error}]`, - ); - - await Edge.destroy({ - where: { - token: edge.token, - from: edge.from, - to: edge.to, - }, - }); - } -} - -export async function setTransferMetrics(metrics) { - return await setMetrics(METRICS_TRANSFERS, metrics); -} - -export async function getTransferMetrics() { - return await getMetrics(METRICS_TRANSFERS); -} - -export async function getStoredEdges(isWithAttributes = false) { - return await Edge.findAll({ - attributes: isWithAttributes ? ['from', 'to', 'token', 'capacity'] : null, - order: [['from', 'ASC']], - raw: true, - }); -} - -export function checkFileExists() { - return fs.existsSync(EDGES_FILE_PATH); -} - -// Store edges into .json file for pathfinder executable -export async function writeToFile(edges) { - return new Promise((resolve, reject) => { - fs.writeFile(EDGES_TMP_FILE_PATH, stringify(edges), (error) => { - if (error) { - reject( - new Error(`Could not write to ${EDGES_TMP_FILE_PATH} file: ${error}`), - ); - } else { - fs.renameSync(EDGES_TMP_FILE_PATH, EDGES_FILE_PATH); - resolve(); - } - }); - }); -} - -export async function transferSteps({ from, to, value, hops = HOPS_DEFAULT }) { - if (from === to) { - throw new Error('Can not send to yourself'); - } - - const startTime = performance.now(); - - const result = await findTransferSteps( - { - from, - to, - value, - hops: hops.toString(), - }, - { - edgesFile: EDGES_FILE_PATH, - pathfinderExecutable: PATHFINDER_FILE_PATH, - timeout: process.env.TRANSFER_STEPS_TIMEOUT || DEFAULT_PROCESS_TIMEOUT, - }, - ); - - const endTime = performance.now(); - - return { - from, - to, - maxFlowValue: result.maxFlowValue, - processDuration: Math.round(endTime - startTime), - transferValue: value, - transferSteps: result.transferSteps.map(({ token, ...step }) => { - return { - ...step, - tokenOwnerAddress: token, - }; - }), - }; -} diff --git a/src/services/edgesUpdate.js b/src/services/edgesUpdate.js deleted file mode 100644 index 13f99963..00000000 --- a/src/services/edgesUpdate.js +++ /dev/null @@ -1,79 +0,0 @@ -import HubContract from '@circles/circles-contracts/build/contracts/Hub.json'; -import TokenContract from '@circles/circles-contracts/build/contracts/Token.json'; - -import logger from '../helpers/logger'; -import web3 from './web3'; - -import { minNumberString } from '../helpers/compare'; -import { upsertEdge } from './edgesDatabase'; -import { ZERO_ADDRESS } from '../constants'; - -const hubContract = new web3.eth.Contract( - HubContract.abi, - process.env.HUB_ADDRESS, -); - -const getKey = (from, to, token) => { - return [from, to, token].join(''); -}; - -export default class EdgeUpdateManager { - constructor() { - this.checkedEdges = {}; - } - - checkDuplicate(edge) { - const key = getKey(edge.from, edge.to, edge.token); - if (key in this.checkedEdges) { - return true; - } - this.checkedEdges[key] = true; - return false; - } - - async updateEdge(edge, tokenAddress) { - // Don't store edges from relayer - if (edge.from === process.env.TX_SENDER_ADDRESS) { - return; - } - - // Don't store edges to or from zero address - if (edge.to === ZERO_ADDRESS || edge.from === ZERO_ADDRESS) { - return; - } - - // Ignore self-referential edges - if (edge.from === edge.to) { - return; - } - - // Ignore duplicates - if (this.checkDuplicate(edge)) { - return; - } - - // Update edge capacity - try { - // Get send limit - const limit = await hubContract.methods - .checkSendLimit(edge.token, edge.from, edge.to) - .call(); - - // Get Token balance - const tokenContract = new web3.eth.Contract( - TokenContract.abi, - tokenAddress, - ); - const balance = await tokenContract.methods.balanceOf(edge.from).call(); - - // Update edge capacity - edge.capacity = minNumberString(limit, balance); - - await upsertEdge(edge); - } catch (error) { - logger.error( - `Found error with checking sending limit for token of ${edge.token} from ${edge.from} to ${edge.to} [${error}]`, - ); - } - } -} diff --git a/src/services/findTransferSteps.js b/src/services/findTransferSteps.js deleted file mode 100644 index 9e71deda..00000000 --- a/src/services/findTransferSteps.js +++ /dev/null @@ -1,58 +0,0 @@ -import findTransferSteps from '@circles/transfer'; -import { performance } from 'perf_hooks'; - -import { - EDGES_FILE_PATH, - HOPS_DEFAULT, - PATHFINDER_FILE_PATH, -} from '../constants'; - -const DEFAULT_PROCESS_TIMEOUT = 1000 * 200; -const FLAG = '--csv'; - -export default async function transferSteps({ - from, - to, - value, - hops = HOPS_DEFAULT, -}) { - if (from === to) { - throw new Error('Cannot send to yourself'); - } - const startTime = performance.now(); - - const timeout = process.env.TRANSFER_STEPS_TIMEOUT - ? parseInt(process.env.TRANSFER_STEPS_TIMEOUT, 10) - : DEFAULT_PROCESS_TIMEOUT; - - const result = await findTransferSteps( - { - from, - to, - value, - hops: hops.toString(), - }, - { - edgesFile: EDGES_FILE_PATH, - pathfinderExecutable: PATHFINDER_FILE_PATH, - flag: FLAG, - timeout, - }, - ); - - const endTime = performance.now(); - - return { - from, - to, - maxFlowValue: result.maxFlowValue, - processDuration: Math.round(endTime - startTime), - transferValue: value, - transferSteps: result.transferSteps.map(({ token, ...step }) => { - return { - ...step, - tokenOwnerAddress: token, - }; - }), - }; -} diff --git a/src/services/graph.js b/src/services/graph.js index 14ce4223..4257c00c 100644 --- a/src/services/graph.js +++ b/src/services/graph.js @@ -1,179 +1,8 @@ -import fetch from 'isomorphic-fetch'; - -import logger from '../helpers/logger'; -import loop from '../helpers/loop'; import core from './core'; -const PAGINATION_SIZE = 500; - -function isOfficialNode() { - return process.env.GRAPH_NODE_ENDPOINT.includes('api.thegraph.com'); -} - -async function fetchFromGraphStatus(query) { - const endpoint = isOfficialNode() - ? `${process.env.GRAPH_NODE_ENDPOINT}/index-node/graphql` - : `${process.env.GRAPH_NODE_INDEXING_STATUS_ENDPOINT}/graphql`; - return await fetch(endpoint, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - query: query.replace(/\s\s+/g, ' '), - }), - }) - .then((response) => { - return response.json(); - }) - .then((response) => { - return response.data; - }); -} - -// This function aims to replace `fetchFromGraphStatus()` when `index-node` -// requests don't work for thegraph.com/hosted-service -async function fetchFromSubgraphStatus(query) { - const endpoint = `${process.env.GRAPH_NODE_ENDPOINT}/subgraphs/name/${process.env.SUBGRAPH_NAME}`; - logger.info(`Graph endpoint: ${endpoint}`); - return await fetch(endpoint, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - query: query.replace(/\s\s+/g, ' '), - }), - }) - .then((response) => { - return response.json(); - }) - .then((response) => { - return response.data; - }); -} - -async function wait(ms) { - return new Promise((resolve) => { - setTimeout(resolve, ms); - }); -} - export async function requestGraph(query) { // Strip newlines in query before doing request return await core.utils.requestGraph({ query: query.replace(/(\r\n|\n|\r)/gm, ' '), }); } - -export async function fetchFromGraph( - name, - fields, - extra = '', - lastID = '', - first = PAGINATION_SIZE, -) { - const query = `{ - ${name}(${extra} first: ${first}, orderBy: id, where: { id_gt: "${lastID}"}) { - ${fields} - } - }`; - const data = await requestGraph(query); - if (!data) { - logger.error(`Error requesting graph with query: ${query}`); - return false; - } - return data[name]; -} - -async function* fetchGraphGenerator(name, fields, extra = '') { - // The `skip` argument must be between 0 and 5000 (current limitations by TheGraph). - // Therefore, we sort the elements by id and reference the last element id for the next query - let hasData = true; - let lastID = ''; - - while (hasData) { - //console.log({lastID}); - const data = await fetchFromGraph(name, fields, extra, lastID); - await wait(500); - hasData = data.length > 0; - if (hasData) lastID = data[data.length - 1].id; - yield data; - } -} - -export default async function fetchAllFromGraph(name, fields, extra = '') { - let result = []; - let index = 0; - - for await (let data of fetchGraphGenerator(name, fields, extra)) { - result = result.concat( - data.map((entry) => { - entry.index = ++index; - return entry; - }), - ); - } - - return result; -} - -export async function waitUntilGraphIsReady() { - const query = `{ _meta { block { number } } }`; - return await loop( - async () => { - try { - return await fetchFromSubgraphStatus(query); - } catch { - return false; - } - }, - (isHealthy) => { - return isHealthy; - }, - ); -} - -export async function waitForBlockNumber(blockNumber) { - const query = `{ - indexingStatusForCurrentVersion(subgraphName: "${process.env.SUBGRAPH_NAME}") { - chains { - latestBlock { - number - } - } - } - }`; - - await loop( - () => { - return fetchFromGraphStatus(query); - }, - (data) => { - const { chains } = data.indexingStatusForCurrentVersion; - if (chains.length === 0) { - return false; - } - return parseInt(chains[0].latestBlock.number, 10) >= blockNumber; - }, - ); -} - -export async function getBlockNumber() { - const query = `{ - indexingStatusForCurrentVersion(subgraphName: "${process.env.SUBGRAPH_NAME}") { - chains { - latestBlock { - number - } - } - } - }`; - - const data = await fetchFromGraphStatus(query); - const { chains } = data.indexingStatusForCurrentVersion; - if (chains.length === 0) { - return 0; - } - return parseInt(chains[0].latestBlock.number, 10); -} diff --git a/src/services/metrics.js b/src/services/metrics.js deleted file mode 100644 index 8ef77468..00000000 --- a/src/services/metrics.js +++ /dev/null @@ -1,30 +0,0 @@ -import Metric from '../models/metrics'; - -export async function setMetrics(category, metrics = []) { - const promises = metrics.map(({ name, value }) => { - return Metric.update( - { value }, - { - where: { - category, - name, - }, - }, - ); - }); - - return Promise.all(promises); -} - -export async function getMetrics(category) { - return Metric.findAll({ - where: { - category, - }, - }).then((response) => { - return response.reduce((acc, item) => { - acc[item.name] = parseInt(item.value, 10); - return acc; - }, {}); - }); -} diff --git a/src/services/redis.js b/src/services/redis.js deleted file mode 100644 index 5ccf80ff..00000000 --- a/src/services/redis.js +++ /dev/null @@ -1,22 +0,0 @@ -export const redisUrl = process.env.REDIS_URL || 'redis://127.0.0.1:6379'; - -export const redisOptions = { - settings: { - lockDuration: 1000 * 30, // Key expiration time for job locks - stalledInterval: 1000 * 30, // How often check for stalled jobs (use 0 for never checking) - maxStalledCount: 1, // Max amount of times a stalled job will be re-processed - guardInterval: 1000 * 5, // Poll interval for delayed jobs and added jobs - retryProcessDelay: 1000 * 5, // delay before processing next job in case of internal error - }, -}; - -export const redisLongRunningOptions = { - settings: { - lockDuration: 1000 * 60 * 30, - lockRenewTime: 1000 * 15, - stalledInterval: 1000 * 60 * 1, - maxStalledCount: 2, - guardInterval: 1000 * 10, - retryProcessDelay: 1000 * 15, - }, -}; diff --git a/src/services/updateTransferSteps.js b/src/services/updateTransferSteps.js deleted file mode 100644 index a3e51ffc..00000000 --- a/src/services/updateTransferSteps.js +++ /dev/null @@ -1,86 +0,0 @@ -import HubContract from '@circles/circles-contracts/build/contracts/Hub.json'; -import findTransferSteps from '@circles/transfer'; - -import web3 from './web3'; -import EdgeUpdateManager from './edgesUpdate'; -import logger from '../helpers/logger'; -import tasks from '../tasks'; -import submitJob from '../tasks/submitJob'; -import { - EDGES_FILE_PATH, - HOPS_DEFAULT, - PATHFINDER_FILE_PATH, -} from '../constants'; - -const DEFAULT_PROCESS_TIMEOUT = 1000 * 200; -const FLAG = '--csv'; - -const hubContract = new web3.eth.Contract( - HubContract.abi, - process.env.HUB_ADDRESS, -); - -// All the steps are updated -async function updateSteps(result) { - const edgeUpdateManager = new EdgeUpdateManager(); - - const values = await Promise.allSettled( - result.transferSteps.map(async (step) => { - const tokenAddress = await hubContract.methods - .userToToken(step.token) - .call(); - - // Update the edge - await edgeUpdateManager.updateEdge( - { - token: step.token, - from: step.from, - to: step.to, - }, - tokenAddress, - ); - - return true; - }), - ); - - // Write edges.csv file to update edges - submitJob(tasks.exportEdges, 'exportEdges-updateSteps'); - - return values.every((step) => step.status === 'fulfilled'); -} - -export default async function updatePath({ - from, - to, - value, - hops = HOPS_DEFAULT, -}) { - const timeout = process.env.TRANSFER_STEPS_TIMEOUT - ? parseInt(process.env.TRANSFER_STEPS_TIMEOUT, 10) - : DEFAULT_PROCESS_TIMEOUT; - - try { - return { - updated: await updateSteps( - await findTransferSteps( - { - from, - to, - value, - hops: hops.toString(), - }, - { - edgesFile: EDGES_FILE_PATH, - pathfinderExecutable: PATHFINDER_FILE_PATH, - flag: FLAG, - timeout, - }, - ), - ), - }; - } catch (error) { - logger.error(`Error updating steps [${error.message}]`); - throw error; - } -} diff --git a/src/tasks/cleanup.js b/src/tasks/cleanup.js deleted file mode 100644 index 061f5e91..00000000 --- a/src/tasks/cleanup.js +++ /dev/null @@ -1,18 +0,0 @@ -import Queue from 'bull'; - -import processor from './processor'; -import { allTasks } from './'; -import { redisUrl, redisLongRunningOptions } from '../services/redis'; - -const cleanup = new Queue('Clean up queues', redisUrl, { - settings: redisLongRunningOptions, -}); - -processor(cleanup).process(async () => { - return await Promise.all([ - cleanup.clean(0), - ...allTasks.map((queue) => queue.clean(1000, 'completed')), - ]); -}); - -export default cleanup; diff --git a/src/tasks/exportEdges.js b/src/tasks/exportEdges.js deleted file mode 100644 index b0a82717..00000000 --- a/src/tasks/exportEdges.js +++ /dev/null @@ -1,35 +0,0 @@ -import Queue from 'bull'; -import { performance } from 'perf_hooks'; - -import logger from '../helpers/logger'; -import processor from './processor'; -import { redisUrl, redisOptions } from '../services/redis'; -import { writeToFile } from '../services/edgesFile'; - -const exportEdges = new Queue('Export edges to csv', redisUrl, { - settings: redisOptions, -}); - -processor(exportEdges).process(async () => { - // Measure time of the whole process - const startTime = performance.now(); - - try { - // Write edges.csv - const lines = await writeToFile(); - - // End time - const endTime = performance.now(); - const milliseconds = Math.round(endTime - startTime); - - // Show metrics - logger.info(`Written ${lines} lines edges.csv in ${milliseconds}ms`); - - return Promise.resolve(); - } catch (error) { - logger.error(`Export edges failed [${error.message}]`); - throw error; - } -}); - -export default exportEdges; diff --git a/src/tasks/index.js b/src/tasks/index.js deleted file mode 100644 index 32b072bf..00000000 --- a/src/tasks/index.js +++ /dev/null @@ -1,21 +0,0 @@ -import cleanup from './cleanup'; -import exportEdges from './exportEdges'; -import syncAddress from './syncAddress'; -import syncFullGraph from './syncFullGraph'; -import uploadEdgesS3 from './uploadEdgesS3'; - -export const allTasks = [ - cleanup, - exportEdges, - syncAddress, - syncFullGraph, - uploadEdgesS3, -]; - -export default { - cleanup, - exportEdges, - syncAddress, - syncFullGraph, - uploadEdgesS3, -}; diff --git a/src/tasks/processor.js b/src/tasks/processor.js deleted file mode 100644 index e8d092ff..00000000 --- a/src/tasks/processor.js +++ /dev/null @@ -1,47 +0,0 @@ -import logger from '../helpers/logger'; - -export default function processor(queue) { - queue.on('error', (error) => { - logger.error(`ERROR "${queue.name}" job: ${error}`); - - // eslint-disable-next-line - console.error(error); - }); - - queue.on('active', (job) => { - logger.info(`[${job.id}] ACTIVE "${queue.name}" job started`); - }); - - queue.on('progress', (job, progress) => { - logger.info(`[${job.id}]" PROGRESS ${queue.name}" job: ${progress}`); - }); - - queue.on('completed', (job) => { - logger.info(`[${job.id}] COMPLETE "${queue.name}" job`); - }); - - queue.on('failed', (job, error) => { - logger.warn(`[${job.id}] FAILED "${queue.name}": ${error}`); - - // eslint-disable-next-line - console.error(error); - }); - - queue.on('stalled', (job) => { - logger.info(`[${job.id}] STALLED "${queue.name}"`); - }); - - queue.on('cleaned', (jobs, type) => { - logger.info(`"${queue.name}" cleaned ${type} ${jobs.length}`); - }); - - queue.on('paused', () => { - logger.info(`"${queue.name}" queue paused`); - }); - - queue.on('resumed', () => { - logger.info(`"${queue.name}" queue resumed`); - }); - - return queue; -} diff --git a/src/tasks/submitJob.js b/src/tasks/submitJob.js deleted file mode 100644 index f90f4265..00000000 --- a/src/tasks/submitJob.js +++ /dev/null @@ -1,24 +0,0 @@ -import logger from '../helpers/logger'; - -const jobDefaultOptions = { - timeout: 1000 * 60 * 40, - attempts: 100, - removeOnComplete: true, - backoff: { type: 'fixed', delay: 1000 * 10 }, -}; - -export default function submitJob(queue, id, data = {}, jobOptions = {}) { - return queue.getJob(id).then((job) => { - if (job) { - logger.warn(`Job "${queue.name}" with id "${id}" is already running`); - return; - } - - logger.info(`Adding job "${queue.name}" with id "${id}"`); - - return queue.add( - { id, ...data }, - { jobId: id, ...jobDefaultOptions, ...jobOptions }, - ); - }); -} diff --git a/src/tasks/syncAddress.js b/src/tasks/syncAddress.js deleted file mode 100644 index c219a765..00000000 --- a/src/tasks/syncAddress.js +++ /dev/null @@ -1,35 +0,0 @@ -import Queue from 'bull'; - -import processor from './processor'; -import submitJob from './submitJob'; -import tasks from './'; -import { - processTrustEvent, - processTransferEvent, -} from '../services/edgesFromEvents'; -import { redisUrl, redisOptions } from '../services/redis'; - -const syncAddress = new Queue('Sync trust graph for address', redisUrl, { - settings: redisOptions, -}); - -processor(syncAddress).process(async (job) => { - let isSuccessful = false; - - // This job is either triggered by a transfer event or a trust event. - if (job.data.type === 'Transfer') { - isSuccessful = await processTransferEvent(job.data); - } else { - isSuccessful = await processTrustEvent(job.data); - } - - // Always write edges .csv file afterwards - if (isSuccessful) { - submitJob( - tasks.exportEdges, - `exportEdges-after-chain-event-${job.data.transactionHash}`, - ); - } -}); - -export default syncAddress; diff --git a/src/tasks/syncFullGraph.js b/src/tasks/syncFullGraph.js deleted file mode 100644 index 482e5ffb..00000000 --- a/src/tasks/syncFullGraph.js +++ /dev/null @@ -1,68 +0,0 @@ -import Queue from 'bull'; -import { performance } from 'perf_hooks'; - -import EdgeUpdateManager from '../services/edgesUpdate'; -import logger from '../helpers/logger'; -import processor from './processor'; -import submitJob from './submitJob'; -import tasks from './'; -import { getBlockNumber } from '../services/graph'; -import { getTrustNetworkEdges } from '../services/edgesFromGraph'; -import { redisUrl, redisLongRunningOptions } from '../services/redis'; - -const syncFullGraph = new Queue('Sync full trust graph', redisUrl, { - settings: redisLongRunningOptions, -}); - -async function rebuildTrustNetwork() { - const edgeUpdateManager = new EdgeUpdateManager(); - const blockNumber = await getBlockNumber(); - - if (blockNumber === 0) { - logger.warn('Found block number 0 from graph, aborting'); - return; - } - - logger.info(`Syncing trust graph with current block ${blockNumber}`); - - // Measure time of the whole process - const startTime = performance.now(); - - try { - const { edges, statistics } = await getTrustNetworkEdges(); - - logger.info( - `Finished getting trust network edges (${edges.length} entities). Start updating capacities.`, - ); - - for await (const edge of edges) { - await edgeUpdateManager.updateEdge( - { - ...edge, - token: edge.tokenOwner, - }, - edge.tokenAddress, - ); - } - - const endTime = performance.now(); - const milliseconds = Math.round(endTime - startTime); - - const checkedEdges = Object.keys(edgeUpdateManager.checkedEdges).length; - logger.info( - `Updated ${checkedEdges} edges with ${statistics.safes} safes, ${statistics.connections} connections and ${statistics.tokens} tokens (${milliseconds}ms)`, - ); - } catch (error) { - logger.error(`Worker failed [${error.message}]`); - throw error; - } -} - -processor(syncFullGraph).process(async () => { - await rebuildTrustNetwork(); - - // Always write edges .csv file afterwards - submitJob(tasks.exportEdges, `exportEdges-after-fullSync`); -}); - -export default syncFullGraph; diff --git a/src/tasks/uploadEdgesS3.js b/src/tasks/uploadEdgesS3.js deleted file mode 100644 index ccaad089..00000000 --- a/src/tasks/uploadEdgesS3.js +++ /dev/null @@ -1,32 +0,0 @@ -import Queue from 'bull'; -import fs from 'fs'; - -import logger from '../helpers/logger'; -import processor from './processor'; -import { EDGES_FILE_PATH } from '../constants'; -import { checkFileExists } from '../services/edgesFile'; -import { redisUrl, redisOptions } from '../services/redis'; -import { s3 } from '../services/aws'; -import { PutObjectCommand } from '@aws-sdk/client-s3'; - -const uploadEdgesS3 = new Queue('Upload edges to S3 storage', redisUrl, { - settings: redisOptions, -}); - -processor(uploadEdgesS3).process(async () => { - if (!checkFileExists()) { - logger.log(`${EDGES_FILE_PATH} does not exist yet. Skip job`); - return; - } - - const edges = fs.readFileSync(EDGES_FILE_PATH); - const params = { - Bucket: process.env.AWS_S3_BUCKET_TRUST_NETWORK, - Key: `${new Date()}.csv`, - Body: edges, - ACL: 'public-read', - ContentType: 'application/json', - }; - return await s3.send(new PutObjectCommand(params)); -}); -export default uploadEdgesS3; diff --git a/src/worker.js b/src/worker.js deleted file mode 100644 index 751d91c5..00000000 --- a/src/worker.js +++ /dev/null @@ -1,107 +0,0 @@ -import HubContract from '@circles/circles-contracts/build/contracts/Hub.json'; -import TokenContract from '@circles/circles-contracts/build/contracts/Token.json'; - -import './helpers/env'; - -import db from './database'; -import logger from './helpers/logger'; -import tasks from './tasks'; -import submitJob from './tasks/submitJob'; -import web3 from './services/web3'; -import { - checkConnection, - getEventSignature, - subscribeEvent, -} from './services/web3Ws'; -import { waitUntilGraphIsReady } from './services/graph'; - -const CRON_NIGHTLY = '0 0 0 * * *'; - -// Connect with postgres database -db.authenticate() - .then(() => { - logger.info('Database connection has been established successfully'); - }) - .catch(() => { - logger.error('Unable to connect to database'); - process.exit(1); - }); - -// Check blockchain connection -checkConnection() - .then((num) => { - logger.info(`Blockchain connection established, block height is ${num}`); - }) - .catch(() => { - logger.error('Unable to connect to blockchain'); - process.exit(1); - }); - -// Listen for blockchain events which might alter the trust limit between users -// in the trust network -const hubContract = new web3.eth.Contract(HubContract.abi); -const tokenContract = new web3.eth.Contract(TokenContract.abi); - -const transferSignature = getEventSignature(tokenContract, 'Transfer'); -const trustSignature = getEventSignature(hubContract, 'Trust'); - -function handleTrustChange({ address, topics, transactionHash }) { - if (topics.includes(transferSignature)) { - submitJob( - tasks.syncAddress, - `syncAddress-transfer-${address}-${Date.now()}`, - { - tokenAddress: address, - type: 'Transfer', - topics, - transactionHash, - }, - ); - } else if (topics.includes(trustSignature)) { - submitJob( - tasks.syncAddress, - `syncAddress-trust-${topics[1]}-${Date.now()}`, - { - type: 'Trust', - topics, - transactionHash, - }, - ); - } -} - -waitUntilGraphIsReady() - .then(() => { - logger.info('Graph node connection has been established successfully'); - }) - .then(() => { - // Subscribe to events to handle trust graph updates for single addresses - subscribeEvent( - hubContract, - process.env.HUB_ADDRESS, - 'Trust', - handleTrustChange, - ); - subscribeEvent(tokenContract, null, 'Transfer', handleTrustChange); - - // Clean up worker queues every night - submitJob(tasks.cleanup, 'cleanUp-nightly', null, { - repeat: { - cron: CRON_NIGHTLY, - }, - }); - - // Upload latest edges .json to S3 every night - submitJob(tasks.uploadEdgesS3, 'uploadEdgesS3-nightly', null, { - repeat: { - cron: CRON_NIGHTLY, - }, - }); - - // Always write edges.json file on start to make sure it exists - submitJob(tasks.exportEdges, 'exportEdges-initial'); - }) - .catch(() => { - logger.error('Unable to connect to graph node'); - process.exit(1); - }); diff --git a/test/transfers-create.test.js b/test/transfers-create.test.js deleted file mode 100644 index be447223..00000000 --- a/test/transfers-create.test.js +++ /dev/null @@ -1,53 +0,0 @@ -import httpStatus from 'http-status'; -import request from 'supertest'; - -import { createTransferPayload } from './utils/transfers'; -import { randomChecksumAddress, randomTransactionHash } from './utils/common'; - -import Transfer from '~/models/transfers'; -import app from '~'; - -describe('PUT /transfers - Creating a new transfer', () => { - let from; - let to; - let transactionHash; - let paymentNote; - - let payload; - - beforeEach(() => { - from = randomChecksumAddress(); - to = randomChecksumAddress(); - transactionHash = randomTransactionHash(); - paymentNote = 'Thank you for the banana'; - - payload = createTransferPayload({ - from, - to, - transactionHash, - paymentNote, - }); - }); - - afterAll(async () => { - return await Transfer.destroy({ - where: { - transactionHash, - }, - }); - }); - - it('should successfully respond and fail when we try again', async () => { - await request(app) - .put('/api/transfers') - .send(payload) - .set('Accept', 'application/json') - .expect(httpStatus.CREATED); - - await request(app) - .put('/api/transfers') - .send(payload) - .set('Accept', 'application/json') - .expect(httpStatus.CONFLICT); - }); -}); diff --git a/test/transfers-find-steps.test.js b/test/transfers-find-steps.test.js deleted file mode 100644 index f25d16f8..00000000 --- a/test/transfers-find-steps.test.js +++ /dev/null @@ -1,49 +0,0 @@ -import httpStatus from 'http-status'; -import request from 'supertest'; - -import { mockGraphSafes } from './utils/mocks'; -import { randomChecksumAddress } from './utils/common'; - -import app from '~'; - -describe('POST /transfers - Find transfer steps', () => { - beforeAll(async () => { - mockGraphSafes(); - }); - - it('should return an error when value is not positive', async () => { - await request(app) - .post('/api/transfers') - .send({ - from: randomChecksumAddress(), - to: randomChecksumAddress(), - value: 0, - }) - .set('Accept', 'application/json') - .expect(httpStatus.BAD_REQUEST); - }); - it('should return an error when hops is not positive', async () => { - await request(app) - .post('/api/transfers') - .send({ - from: randomChecksumAddress(), - to: randomChecksumAddress(), - value: '5', - hops: '0', - }) - .set('Accept', 'application/json') - .expect(httpStatus.BAD_REQUEST); - }); - it('should return an error when hops is empty', async () => { - await request(app) - .post('/api/transfers') - .send({ - from: randomChecksumAddress(), - to: randomChecksumAddress(), - value: '5', - hops: '', - }) - .set('Accept', 'application/json') - .expect(httpStatus.BAD_REQUEST); - }); -}); diff --git a/test/transfers-find.test.js b/test/transfers-find.test.js deleted file mode 100644 index 843d60e5..00000000 --- a/test/transfers-find.test.js +++ /dev/null @@ -1,241 +0,0 @@ -import httpStatus from 'http-status'; -import request from 'supertest'; - -import web3 from './utils/web3'; -import { mockGraphUsers } from './utils/mocks'; -import { - randomTransactionHash, - randomChecksumAddress, - getSignature, -} from './utils/common'; - -import Transfer from '~/models/transfers'; -import app from '~'; - -const NUM_TEST_TRANSFERS = 5; - -const transfers = []; -const accounts = []; - -async function expectTransfer(app, account, { transactionHash, from, to }) { - mockGraphUsers(account.address, to); - const signature = getSignature([transactionHash], account.privateKey); - - return await request(app) - .post(`/api/transfers/${transactionHash}`) - .send({ - address: account.address, - signature, - }) - .set('Accept', 'application/json') - .expect(httpStatus.OK) - .expect(({ body }) => { - if (!('id' in body.data)) { - throw new Error('id missing'); - } - - if (body.data.transactionHash !== transactionHash) { - throw new Error('Wrong transactionHash'); - } - - if (body.data.from !== from) { - throw new Error('Wrong from address'); - } - - if (body.data.to !== to) { - throw new Error('Wrong to address'); - } - }); -} - -beforeAll(async () => { - const items = new Array(NUM_TEST_TRANSFERS).fill(0); - - await Promise.all( - items.map(async () => { - const account = web3.eth.accounts.create(); - const address = account.address; - const privateKey = account.privateKey; - - const from = randomChecksumAddress(); - const to = randomChecksumAddress(); - const transactionHash = randomTransactionHash(); - const paymentNote = `This is a payment note ${Math.random() * 10000}`; - - const signature = getSignature([from, to, transactionHash], privateKey); - - await request(app) - .put('/api/transfers') - .send({ - address, - signature, - data: { - from, - to, - transactionHash, - paymentNote, - }, - }) - .set('Accept', 'application/json') - .expect(httpStatus.CREATED); - - accounts.push(account); - - transfers.push({ - from, - to, - transactionHash, - paymentNote, - }); - }), - ); -}); - -afterAll(async () => { - await Promise.all( - transfers.map(async (transfer) => { - return await Transfer.destroy({ - where: { - transactionHash: transfer.transactionHash, - }, - }); - }), - ); -}); - -describe('POST /transfers/:transactionHash - Resolve by transactionHash', () => { - it('should find one transfer', async () => { - await Promise.all( - transfers.map(async (transfer, index) => { - const account = accounts[index]; - return await expectTransfer(app, account, transfer); - }), - ); - }); - - it('should throw an error when signature is invalid', async () => { - const transactionHash = transfers[1].transactionHash; - const account = accounts[1]; - - const signature = getSignature( - [randomTransactionHash()], - account.privateKey, - ); - - await request(app) - .post(`/api/transfers/${transactionHash}`) - .send({ - address: account.address, - signature, - }) - .set('Accept', 'application/json') - .expect(httpStatus.FORBIDDEN); - }); - - describe('validation', () => { - let sender; - let receiver; - let from; - let to; - let transactionHash; - let paymentNote; - - beforeEach(async () => { - sender = web3.eth.accounts.create(); - receiver = web3.eth.accounts.create(); - - from = randomChecksumAddress(); - to = randomChecksumAddress(); - transactionHash = randomTransactionHash(); - paymentNote = 'Thank you!'; - - const signature = getSignature( - [from, to, transactionHash], - sender.privateKey, - ); - - await request(app) - .put('/api/transfers') - .send({ - address: sender.address, - signature, - data: { - from, - to, - transactionHash, - paymentNote, - }, - }) - .set('Accept', 'application/json') - .expect(httpStatus.CREATED); - - mockGraphUsers(sender.address, from); - mockGraphUsers(receiver.address, to); - }); - - it('should return the result for only sender or receiver', async () => { - const senderSignature = getSignature( - [transactionHash], - sender.privateKey, - ); - const receiverSignature = getSignature( - [transactionHash], - receiver.privateKey, - ); - - await request(app) - .post(`/api/transfers/${transactionHash}`) - .send({ - address: sender.address, - signature: senderSignature, - }) - .set('Accept', 'application/json') - .expect(httpStatus.OK); - - await request(app) - .post(`/api/transfers/${transactionHash}`) - .send({ - address: receiver.address, - signature: receiverSignature, - }) - .set('Accept', 'application/json') - .expect(httpStatus.OK); - }); - - it('should return an error when signature is valid but entry was not found', async () => { - const wrongTransactionHash = randomTransactionHash(); - const senderSignature = getSignature( - [wrongTransactionHash], - sender.privateKey, - ); - - await request(app) - .post(`/api/transfers/${wrongTransactionHash}`) - .send({ - address: sender.address, - signature: senderSignature, - }) - .set('Accept', 'application/json') - .expect(httpStatus.NOT_FOUND); - }); - - it('should throw an error when sender or receiver is not the signer', async () => { - const thirdAccount = web3.eth.accounts.create(); - const signature = getSignature( - [transactionHash], - thirdAccount.privateKey, - ); - - mockGraphUsers(thirdAccount.address, randomChecksumAddress()); - - await request(app) - .post(`/api/transfers/${transactionHash}`) - .send({ - address: thirdAccount.address, - signature, - }) - .set('Accept', 'application/json') - .expect(httpStatus.FORBIDDEN); - }); - }); -}); diff --git a/test/transfers-update-steps.test.js b/test/transfers-update-steps.test.js deleted file mode 100644 index c8755058..00000000 --- a/test/transfers-update-steps.test.js +++ /dev/null @@ -1,25 +0,0 @@ -import httpStatus from 'http-status'; -import request from 'supertest'; - -import { mockGraphSafes } from './utils/mocks'; -import { randomChecksumAddress } from './utils/common'; - -import app from '~'; - -describe('POST /transfers/update - Update transfer steps', () => { - beforeAll(async () => { - mockGraphSafes(); - }); - - it('should return an error when value is not positive', async () => { - await request(app) - .post('/api/transfers/update') - .send({ - from: randomChecksumAddress(), - to: randomChecksumAddress(), - value: 0, - }) - .set('Accept', 'application/json') - .expect(httpStatus.BAD_REQUEST); - }); -}); diff --git a/test/transfers-validation.test.js b/test/transfers-validation.test.js deleted file mode 100644 index a0e2d766..00000000 --- a/test/transfers-validation.test.js +++ /dev/null @@ -1,147 +0,0 @@ -import httpStatus from 'http-status'; -import request from 'supertest'; - -import web3 from './utils/web3'; -import { - randomChecksumAddress, - randomTransactionHash, - getSignature, -} from './utils/common'; - -import app from '~'; - -async function expectErrorStatus(body, status = httpStatus.BAD_REQUEST) { - return await request(app) - .put('/api/transfers') - .send(body) - .set('Accept', 'application/json') - .expect(status); -} - -describe('PUT /transfers - validation', () => { - let address; - let privateKey; - let signature; - let from; - let to; - let transactionHash; - let paymentNote; - let correctBody; - - beforeEach(() => { - const account = web3.eth.accounts.create(); - - address = account.address; - privateKey = account.privateKey; - - from = randomChecksumAddress(); - to = randomChecksumAddress(); - transactionHash = randomTransactionHash(); - paymentNote = 'Thank you for the banana'; - - signature = getSignature([from, to, transactionHash], privateKey); - - correctBody = { - address, - signature, - data: { - from, - to, - transactionHash, - paymentNote, - }, - }; - }); - - describe('when using invalid parameters', () => { - it('should return errors', async () => { - // Missing fields - await expectErrorStatus({ - ...correctBody, - address: 'invalid', - }); - - // Missing signature - await expectErrorStatus({ - ...correctBody, - signature: '', - }); - - // Wrong address - await expectErrorStatus({ - ...correctBody, - address: web3.utils.randomHex(21), - }); - - // Wrong address checksum - await expectErrorStatus({ - ...correctBody, - address: web3.utils.randomHex(20), - }); - - // Invalid transaction hash - await expectErrorStatus({ - ...correctBody, - data: { - ...correctBody.data, - transaction: web3.utils.randomHex(10), - }, - }); - - // Invalid from field - await expectErrorStatus({ - ...correctBody, - data: { - ...correctBody.data, - from: web3.utils.randomHex(16), - }, - }); - - // Invalid payment note - await expectErrorStatus({ - ...correctBody, - data: { - ...correctBody.data, - paymentNote: 123, - }, - }); - }); - }); - - describe('when using invalid signatures', () => { - it('should return errors', async () => { - // Wrong address - await expectErrorStatus( - { - ...correctBody, - address: randomChecksumAddress(), - }, - httpStatus.FORBIDDEN, - ); - - // Wrong from field - await expectErrorStatus( - { - ...correctBody, - data: { - ...correctBody.data, - from: randomChecksumAddress(), - }, - }, - httpStatus.FORBIDDEN, - ); - - // Wrong transaction hash - await expectErrorStatus( - { - ...correctBody, - data: { - ...correctBody.data, - transactionHash: randomTransactionHash(), - }, - }, - httpStatus.FORBIDDEN, - ); - }); - }); -});