-
Notifications
You must be signed in to change notification settings - Fork 0
/
shard_connection.h
174 lines (138 loc) · 5.18 KB
/
shard_connection.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
/*
* Copyright (C) 2011-2017 Redis Labs Ltd.
*
* This file is part of memtier_benchmark.
*
* memtier_benchmark is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, version 2.
*
* memtier_benchmark is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with memtier_benchmark. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef MEMTIER_BENCHMARK_SHARD_CONNECTION_H
#define MEMTIER_BENCHMARK_SHARD_CONNECTION_H
#include <poll.h>
#include <queue>
#include <netdb.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <event2/event.h>
#include <event2/buffer.h>
#include "protocol.h"
#include "generator.h"
// forward decleration
class connections_manager;
struct benchmark_config;
class abstract_protocol;
class object_generator;
enum authentication_state { auth_none, auth_sent, auth_done };
enum select_db_state { select_none, select_sent, select_done };
enum cluster_slots_state { slots_none, slots_sent, slots_done };
enum request_type { rt_unknown, rt_set, rt_get, rt_wait, rt_auth, rt_select_db, rt_cluster_slots };
struct request {
request_type m_type;
struct timeval m_sent_time;
unsigned int m_size;
unsigned int m_keys;
request(request_type type, unsigned int size, struct timeval* sent_time, unsigned int keys);
virtual ~request(void) {}
};
struct verify_request : public request {
char *m_key;
unsigned int m_key_len;
char *m_value;
unsigned int m_value_len;
verify_request(request_type type,
unsigned int size,
struct timeval* sent_time,
unsigned int keys,
const char *key,
unsigned int key_len,
const char *value,
unsigned int value_len);
virtual ~verify_request(void);
};
class shard_connection {
friend void cluster_client_event_handler(evutil_socket_t sfd, short evtype, void *opaque);
public:
shard_connection(unsigned int id, connections_manager* conn_man, benchmark_config* config,
struct event_base* event_base, abstract_protocol* abs_protocol);
~shard_connection();
void set_address_port(const char* address, const char* port);
int connect(struct connect_info* addr);
void disconnect();
void send_wait_command(struct timeval* sent_time,
unsigned int num_slaves, unsigned int timeout);
void send_set_command(struct timeval* sent_time, const char *key, int key_len,
const char *value, int value_len, int expiry, unsigned int offset);
void send_get_command(struct timeval* sent_time,
const char *key, int key_len, unsigned int offset);
void send_mget_command(struct timeval* sent_time, const keylist* key_list);
void send_verify_get_command(struct timeval* sent_time, const char *key, int key_len,
const char *value, int value_len, int expiry, unsigned int offset);
void set_authentication() {
m_authentication = auth_none;
}
void set_select_db() {
m_db_selection = select_none;
}
void set_cluster_slots() {
m_cluster_slots = slots_none;
}
unsigned int get_id() {
return m_id;
}
abstract_protocol* get_protocol() {
return m_protocol;
}
const char* get_address() {
return m_address;
}
const char* get_port() {
return m_port;
}
int check_sockfd_writable();
int check_sockfd_readable();
void gurantee_sockfd_dispatch();
int serverTid; // server thread id it connected to
Generator* intervalGenerator; // used to generate the intervals
// between requests. Set qps for this
// client based on qpsPerClient[serverTid]
uint64_t nextCycleTime; // next time to issue a request
private:
void setup_event();
int setup_socket(struct connect_info* addr);
bool is_conn_setup_done();
void send_conn_setup_commands(struct timeval timestamp);
request* pop_req();
void push_req(request* req);
void process_response(void);
void process_first_request();
void fill_pipeline(void);
void handle_event(short evtype);
unsigned int m_id;
connections_manager* m_conns_manager;
benchmark_config* m_config;
int m_sockfd;
char* m_address;
char* m_port;
struct sockaddr_un* m_unix_sockaddr;
struct evbuffer* m_read_buf;
struct evbuffer* m_write_buf;
struct event_base* m_event_base;
struct event* m_event;
abstract_protocol* m_protocol;
std::queue<request *>* m_pipeline;
int m_pending_resp;
bool m_connected;
enum authentication_state m_authentication;
enum select_db_state m_db_selection;
enum cluster_slots_state m_cluster_slots;
};
#endif //MEMTIER_BENCHMARK_SHARD_CONNECTION_H