-
Notifications
You must be signed in to change notification settings - Fork 0
/
rpcProxyWax.js
176 lines (160 loc) · 5.2 KB
/
rpcProxyWax.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
const express = require('express')
var app = express()
const ax = require('axios')
const https = require('https');
const http = require('http');
const sleep = ms => new Promise(res => setTimeout(res, ms))
const rand = (min, max) => Math.floor(Math.random() * (Math.floor(max) - Math.ceil(min) + 1)) + Math.ceil(min)
const randSelect = (arr) => arr[rand(0, arr.length - 1)]
const logger = require('logging').default('rpcProxyNew')
const fs = require('fs-extra')
ax.defaults.timeout = 5000
ax.defaults.httpAgent = new http.Agent({ timeout: 5000 });
ax.defaults.httpsAgent = new https.Agent({ timeout: 5000 });
app.use(function(req, res, next) {
if (!req.headers['content-type']) req.headers['content-type'] = 'application/json'
if (req.headers['content-type'] === 'application/x-www-form-urlencoded') req.headers['content-type'] = 'application/json'
next()
})
app.use(express.text({limit:"20mb"}))
app.use(express.json({limit:"20mb",strict:false}))
app.set('trust proxy', 1)
var metrics = {}
try
{
metrics = require('./metrics.json')
}catch (e) {
console.log(e)
metrics = {}
}
async function syncEndpoints(){
try {
endpoints = (await ax.get('https://wax.stats.eosusa.news/public/rpc/endpoints-wax.json')).data
console.log('Getting Remote Endpoints',endpoints)
setTimeout(syncEndpoints,86400000)
} catch (err) {
console.error(err)
// Retry every 5 mins
setTimeout(syncEndpoints,300000)
}
}
async function syncMetrics(){
try {
await fs.writeJson('./metrics.json', metrics)
} catch (err) {
console.error(err)
}
setTimeout(syncMetrics,60000)
}
function pickEndpoint () {
//logger.info(endpoints);
endpoints.filter(el => !greylist.find(el2 => el === el2))
return randSelect(endpoints)
}
var greylist = []
async function addToGreylist (endpoint) {
const existing = greylist.indexOf(endpoint)
if (existing > -1) return
//logger.info('Greylisting API and picking new endpoint.', endpoint)
greylist.push(endpoint)
logger.info('Greylist ADD:', greylist)
await sleep(300000)
const index = greylist.indexOf(endpoint)
if (index < 0) return
else greylist.splice(index, 1)
//logger.info('Removing API from greylist:', endpoint)
logger.info('Greylist DEL:', greylist)
}
function isObject (item) {
return (typeof item === 'object' && item !== null)
}
async function doQuery (req) {
const endpoint = pickEndpoint()
if (!endpoint) {
await sleep(10000)
return doQuery(req)
}
var start = new Date()
const response = await ax({
url: endpoint + req.originalUrl,
method: req.method,
timeout: 5000,
validateStatus: function (status) {
var end = new Date() - start
logger.info(`CALL ${endpoint} `+JSON.stringify(req.body)+' '+JSON.stringify(req.params)+` ${status}`)
if( metrics[endpoint] ) {
if(metrics[endpoint][status]) {
metrics[endpoint][status] += 1
//logger.info(`METRICS adding 1 to ${status} for ${endpoint}`)
}else{
metrics[endpoint][status] = 1
//logger.info(`METRICS ADDING NEW STATUS CODE FOR ${metrics}`)
}
} else {
var codes = {}
codes[status] = 1
metrics[endpoint] = codes
}
// Record total executed transactions
if( metrics[endpoint]['total'] ) { metrics[endpoint]['total'] += 1 } else { metrics[endpoint]['total'] = 1 }
// Record response times
if( metrics[endpoint]['elapsed'] ) { metrics[endpoint]['elapsed'] += end } else { metrics[endpoint]['elapsed'] = end }
return status < 501
},
data: req.body
}).catch((err) => {
logger.error('RPC Error:')
logger.error(endpoint, err.message)
if( metrics[endpoint] ) {
if(metrics[endpoint]['50000']) {
metrics[endpoint]['5000'] += 1
} else {
metrics[endpoint]['5000'] = 1
}
} else {
var codes = {}
codes['5000'] = 1
//logger.error(codes)
metrics[endpoint] = codes
}
//addToGreylist(endpoint)
})
if (!response || !isObject(response.data)) {
//if (response) logger.error('Unexpected Response:',response.data)
await sleep(1000)
//addToGreylist(endpoint)
return doQuery(req)
} else if (response.status == 500) {
logger.error('')
logger.error('500 ERROR')
logger.error(JSON.stringify(response.data))
logger.error('')
logger.error(response.data.error.code)
const repeatCodes = [3081001, 3010008]
if (repeatCodes.find(el => el === response.data.error.code)) {
//console.log('Found Repeat err code:',response.data.error.code)
//addToGreylist(endpoint)
await sleep(1000)
return doQuery(req)
} else return response
} else {
// response.setHeader('RPCProxyEndpoint',endpoint)
return response
}
}
async function init () {
await syncEndpoints()
await syncMetrics()
app.all('*', async (req, res) => {
if(req.url == '/metrics' ) {
res.json(metrics)
} else {
const response = await doQuery(req)
for (const header of Object.entries(response.headers)) { res.setHeader(header[0], header[1]) }
res.status(response.status)
res.send(response.data)
}
})
app.listen(3055,"127.0.0.1", function () { logger.info('rpcProxy listening on port 3055') })
}
init().catch((err) => { logger.error(err.message), process.exit() })