-
Notifications
You must be signed in to change notification settings - Fork 15
/
ZMQ.cpp
128 lines (100 loc) · 4.15 KB
/
ZMQ.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
/*
*
* (C) 2021-24 - ntop.org
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*/
#include "include.h"
#ifdef HAVE_ZMQ
#define DEFAULT_ZMQ_TCP_KEEPALIVE 1 /* Keepalive ON */
#define DEFAULT_ZMQ_TCP_KEEPALIVE_IDLE 30 /* Keepalive after 30 seconds */
#define DEFAULT_ZMQ_TCP_KEEPALIVE_CNT 3 /* Keepalive send 3 probes */
#define DEFAULT_ZMQ_TCP_KEEPALIVE_INTVL 3 /* Keepalive probes sent every 3 seconds */
#define MAX_SOCKET_BUFFER_SIZE 8388608 /* 8 MB */
/* ******************************* */
ZMQ::ZMQ(const char *endpoint, const char *server_public_key) {
context = zmq_ctx_new();
if(context == NULL) {
trace->traceEvent(TRACE_ERROR, "[ERROR] Unable to create ZMQ context");
exit(1);
}
zmq_socket_handler = zmq_socket(context, ZMQ_PUB /* always publisher: we produce messages */);
if(zmq_socket_handler == NULL) {
trace->traceEvent(TRACE_ERROR, "Unable to create ZMQ socket");
exit(1);
}
if(server_public_key != NULL) {
char client_public_key[41];
char client_secret_key[41];
int rc;
if(strlen(server_public_key) != 40) {
trace->traceEvent(TRACE_ERROR, "Bad ZMQ server public key size (%lu != 40)", strlen(server_public_key));
goto no_encrypt;
}
trace->traceEvent(TRACE_INFO, "Setting ZMQ server curve key to '%s'", server_public_key);
/* (1) - Generate client keypairs */
rc = zmq_curve_keypair(client_public_key, client_secret_key);
if(rc != 0) {
trace->traceEvent(TRACE_ERROR, "Error generating ZMQ client key pair");
goto no_encrypt;
}
/* (2) - Enable client secret key */
rc = zmq_setsockopt(zmq_socket_handler, ZMQ_CURVE_SECRETKEY, client_secret_key, 41);
if(rc != 0) {
trace->traceEvent(TRACE_ERROR, "Error setting ZMQ_CURVE_SECRETKEY = %s", client_secret_key);
goto no_encrypt;
}
/* (3) - Enable client public key */
rc = zmq_setsockopt(zmq_socket_handler, ZMQ_CURVE_PUBLICKEY, client_public_key, 41);
if(rc != 0) {
trace->traceEvent(TRACE_ERROR, "Error setting ZMQ_CURVE_PUBLICKEY = %s", client_public_key);
goto no_encrypt;
}
/* (4) - Set the public server key generated on the server side */
rc = zmq_setsockopt(zmq_socket_handler, ZMQ_CURVE_SERVERKEY, server_public_key, 41);
if(rc != 0) {
trace->traceEvent(TRACE_ERROR, "Error setting ZMQ_CURVE_SERVERKEY = %s (%d)", server_public_key, errno);
goto no_encrypt;
}
no_encrypt:
;
}
if(zmq_connect(zmq_socket_handler, endpoint) != 0) {
trace->traceEvent(TRACE_ERROR, "Unable to connect to ZMQ endpoint %s [%s]", endpoint, strerror(errno));
throw "ZMQ connect error";
}
};
/* ******************************* */
ZMQ::~ZMQ() {
zmq_close(zmq_socket_handler);
zmq_ctx_destroy(context);
};
/* ******************************* */
void ZMQ::sendMessage(const char *topic, const char *msg) {
struct zmq_msg_hdr msg_hdr;
u_int len = strlen(msg);
memset(&msg_hdr, 0, sizeof(msg_hdr));
snprintf(msg_hdr.url, sizeof(msg_hdr.url), "%s", topic);
msg_hdr.source_id = 999, msg_hdr.version = ZMQ_MSG_VERSION, msg_hdr.size = len;
if(zmq_send(zmq_socket_handler, &msg_hdr, sizeof(msg_hdr), ZMQ_SNDMORE) != sizeof(msg_hdr))
trace->traceEvent(TRACE_WARNING, "ZMQ send errror");
if(zmq_send(zmq_socket_handler, msg, msg_hdr.size, 0) != msg_hdr.size)
trace->traceEvent(TRACE_WARNING, "ZMQ send errror");
else
trace->traceEvent(TRACE_INFO, "Sent [topic: %s][msg: %s]", msg_hdr.url, msg);
}
/* ******************************* */
#endif /* HAVE_ZMQ */