forked from tigerbeetle/tigerbeetle
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfast-ml-api-adapter.js
165 lines (148 loc) · 4.42 KB
/
fast-ml-api-adapter.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
const NAMESPACE = 'fast-ml-api-adapter';
const assert = require('assert');
const Node = {
crypto: require('crypto'),
http: require('http'),
net: require('net'),
process: process
};
const TigerBeetle = require('./client.js');
const LEV = require('./log-event.js');
// Provide the IP address of the log event server if you want to enable logging:
LEV.ENABLED = false;
LEV.HOST = '127.0.0.1';
LEV.PORT = 4444;
const HOST = '0.0.0.0';
const PORT = 3000;
const LOCALHOST = '127.0.0.1';
const TIGER_BEETLE_HOST = 'tb.perf.openafrica.network';
const TIGER_BEETLE_PORT = 30000;
// Test harness payee:
const PAYEE_HOST = LOCALHOST; //'10.126.10.139';
const PAYEE_PORT = 3333;
// Test harness payer:
const PAYER_HOST = LOCALHOST; //'10.126.10.139';
const PAYER_PORT = 7777;
Node.process.on('uncaughtException',
function(error) {
console.log(error);
LEV(`${NAMESPACE}: UNCAUGHT EXCEPTION: ${error}`);
}
);
// Measure event loop blocks of 10ms or more within fast-ml-api-adapter:
(function() {
const delay = 5;
let time = Date.now();
setInterval(
function() {
const start = time + delay;
const end = Date.now();
const delta = end - start;
if (delta > 10) {
LEV({
start: start,
end: end,
label: `${NAMESPACE}: event loop blocked for ${delta}ms`
});
}
time = end;
},
delay
);
})();
function CreateServer() {
const server = Node.http.createServer({},
function(request, response) {
const buffers = [];
request.on('data', function(buffer) { buffers.push(buffer); });
request.on('end',
function() {
if (request.url === '/transfers') {
// Handle an incoming prepare:
const source = Buffer.concat(buffers);
const object = JSON.parse(source.toString('ascii'));
const target = TigerBeetle.encodeCreate(object);
TigerBeetle.create(target, function() {
// Send prepare notification:
// We reuse the request path as the notification path is the same.
// We reuse the request payload to avoid any GC complications.
// This can always be optimized later without material overhead.
PostNotification(PAYEE_HOST, PAYEE_PORT, request.url, source,
function() {
// ACK:
response.statusCode = 202;
response.end();
}
);
});
} else if (request.url.length > 36) {
// TODO Improve request.url validation and parsing.
// Handle an incoming fulfill:
const id = request.url.split('/')[2];
const source = Buffer.concat(buffers);
const object = JSON.parse(source.toString('ascii'));
const target = TigerBeetle.encodeAccept(id, object);
TigerBeetle.accept(target, function() {
// Send fulfill notification:
PostNotification(PAYER_HOST, PAYER_PORT, request.url, source,
function() {
// ACK:
response.statusCode = 202;
response.end();
}
);
});
} else {
console.log(`unknown request.url: ${request.url}`);
response.end();
}
}
);
}
);
server.listen(PORT, HOST,
function() {
LEV(`${NAMESPACE}: Listening on ${HOST}:${PORT}...`);
}
);
}
function PostNotification(host, port, path, body, end) {
const headers = {
'Content-Length': body.length
};
const options = {
agent: ConnectionPool,
method: 'POST',
host: host,
port: port,
path: path,
headers: headers
};
const request = Node.http.request(options,
function(response) {
const buffers = [];
response.on('data', function(buffer) { buffers.push(buffer); });
response.on('end',
function() {
end();
}
);
}
);
request.write(body);
request.end();
}
// Create a keep-alive HTTP request connection pool:
// We don't want each and every notification to do a TCP handshake...
// This is critical. The lack of this causes multi-second event loop blocks.
const ConnectionPool = new Node.http.Agent({
keepAlive: true,
maxFreeSockets: 10000,
timeout: 60 * 1000
});
CreateServer();
TigerBeetle.connect(TIGER_BEETLE_HOST, TIGER_BEETLE_PORT,
function(error) {
if (error) throw error;
}
);