-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy paththread.h
110 lines (87 loc) · 2.47 KB
/
thread.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
/*
* Copyright (C) jlijian3@gmail.com
*/
#ifndef __PS_THREAD_INCLUDE__
#define __PS_THREAD_INCLUDE__
#include <event.h>
#include <pthread.h>
#include <errno.h>
#include "queue.h"
#include "connection.h"
#include "base.h"
struct cq_item {
cq_item() {}
cq_item(int fd, enum conn_states state, int evflags) :
sfd(fd),
init_state(state),
event_flags(evflags)
{
}
int sfd;
enum conn_states init_state;
int event_flags;
struct sockaddr_storage addr;
};
class LibeventThread : public BaseThread {
public:
LibeventThread() : _base(NULL) {
}
~LibeventThread() {
if (_base)
event_base_free(_base);
}
bool init();
bool stop();
struct event_base *get_event_base() {
return _base;
}
void set_event_base(struct event_base *base) {
_base = base;
}
void cq_notify() {
/*
if (write(_notify_send_fd, "", 1) != 1) {
perror("Writing to thread notify pipe");
}
*/
int rv, cnt = 0;
do {
rv = write(_notify_send_fd, "", 1);
} while (rv < 0 && errno == EAGAIN && ++cnt < 100);
}
void push_q_notify(int fd) {
push_q.push(fd);
/*
if (write(_push_send_fd, "", 1) != 1) {
perror("Writing to thread notify pipe");
}
*/
int rv, cnt = 0;
do {
rv = write(_push_send_fd, "", 1);
} while (rv < 0 && errno == EAGAIN && ++cnt < 100);
}
static void thread_libevent_process(int fd, short which, void *arg);
static void thread_push_event_process(int fd, short which, void *arg);
public:
LockQueue<cq_item> cq; /* queue of new connections to handle */
LockQueue<int> push_q; /* queue of new push event to handle */
protected:
int do_thread_func();
private:
struct event_base *_base; /* libevent handle this thread uses */
struct event _notify_event; /* listen event for notify pipe */
int _notify_receive_fd; /* receiving end of notify pipe */
int _notify_send_fd; /* sending end of notify pipe */
struct event _push_event; /* listen event for push pipe */
int _push_receive_fd; /* receiving end of push pipe */
int _push_send_fd; /* sending end of push pipe */
};
void thread_init();
void thread_stop();
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
const struct sockaddr_storage *addr);
void accept_new_conns(bool do_accept);
LibeventThread *get_main_thread();
LibeventThread* get_worker_thread(int i);
#endif /* __PS_THREAD_INCLUDE__ */