-
Notifications
You must be signed in to change notification settings - Fork 2
/
tcp-non-blocking-kqueue-echo-server.c
156 lines (147 loc) · 4.76 KB
/
tcp-non-blocking-kqueue-echo-server.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/event.h>
#include <sys/socket.h>
#include <unistd.h>
#define exit_if(r, ...) \
if (r) \
{ \
printf(__VA_ARGS__); \
printf("error no: %d error msg %s\n", errno, strerror(errno)); \
exit(1); \
}
const int kReadEvent = 1;
const int kWriteEvent = 2;
void setNonBlock(int fd)
{
int flags = fcntl(fd, F_GETFL, 0);
exit_if(flags < 0, "fcntl failed");
int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
exit_if(r < 0, "fcntl failed");
}
void updateEvents(int efd, int fd, int events, bool modify)
{
struct kevent ev[2];
int n = 0;
if (events & kReadEvent)
{
EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, (void *)(intptr_t)fd);
}
else if (modify)
{
EV_SET(&ev[n++], fd, EVFILT_READ, EV_DELETE, 0, 0, (void *)(intptr_t)fd);
}
if (events & kWriteEvent)
{
EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, (void *)(intptr_t)fd);
}
else if (modify)
{
EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, (void *)(intptr_t)fd);
}
printf("%s fd %d events read %d write %d\n",
modify ? "mod" : "add", fd, events & kReadEvent, events & kWriteEvent);
int r = kevent(efd, ev, n, NULL, 0, NULL);
exit_if(r, "kevent failed ");
}
void handleAccept(int efd, int fd)
{
struct sockaddr_in raddr;
socklen_t rsz = sizeof(raddr);
int cfd = accept(fd, (struct sockaddr *)&raddr, &rsz);
exit_if(cfd < 0, "accept failed");
struct sockaddr_in peer, local;
socklen_t alen = sizeof(peer);
int r = getpeername(cfd, (struct sockaddr *)&peer, &alen);
exit_if(r < 0, "getpeername failed");
printf("accept a connection from %s\n", inet_ntoa(raddr.sin_addr));
setNonBlock(cfd);
updateEvents(efd, cfd, kReadEvent | kWriteEvent, false);
}
void handleRead(int efd, int fd)
{
char buf[4096];
int n = 0;
while ((n = read(fd, buf, sizeof buf)) > 0)
{
printf("read %d bytes\n", n);
int r = write(fd, buf, n); //写出读取的数据
//实际应用中,写出数据可能会返回EAGAIN,此时应当监听可写事件,当可写时再把数据写出
exit_if(r <= 0, "write error");
}
if (n < 0 && (errno == EAGAIN || errno == EWOULDBLOCK))
return;
exit_if(n < 0, "read error"); //实际应用中,n<0应当检查各类错误,如EINTR
printf("fd %d closed\n", fd);
close(fd);
}
void handleWrite(int efd, int fd)
{
//实际应用应当实现可写时写出数据,无数据可写才关闭可写事件
updateEvents(efd, fd, kReadEvent, true);
}
void loop_once(int efd, int lfd, int waitms)
{
struct timespec timeout;
timeout.tv_sec = waitms / 1000;
timeout.tv_nsec = (waitms % 1000) * 1000 * 1000;
const int kMaxEvents = 20;
struct kevent activeEvs[kMaxEvents];
int n = kevent(efd, NULL, 0, activeEvs, kMaxEvents, &timeout);
printf("epoll_wait return %d\n", n);
for (int i = 0; i < n; i++)
{
int fd = (int)(intptr_t)activeEvs[i].udata;
int events = activeEvs[i].filter;
if (events == EVFILT_READ)
{
if (fd == lfd)
{
handleAccept(efd, fd);
}
else
{
handleRead(efd, fd);
}
}
else if (events == EVFILT_WRITE)
{
handleWrite(efd, fd);
}
else
{
exit_if(1, "unknown event");
}
}
}
int main()
{
short port = 99;
int epollfd = kqueue();
exit_if(epollfd < 0, "epoll_create failed");
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
exit_if(listenfd < 0, "socket failed");
struct sockaddr_in addr;
memset(&addr, 0, sizeof addr);
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = INADDR_ANY;
int r = bind(listenfd, (struct sockaddr *)&addr, sizeof(struct sockaddr));
exit_if(r, "bind to 0.0.0.0:%d failed %d %s", port, errno, strerror(errno));
r = listen(listenfd, 20);
exit_if(r, "listen failed %d %s", errno, strerror(errno));
printf("fd %d listening at %d\n", listenfd, port);
setNonBlock(listenfd);
updateEvents(epollfd, listenfd, kReadEvent, false);
while (true)
{ //实际应用应当注册信号处理函数,退出时清理资源
loop_once(epollfd, listenfd, 10000);
}
return 0;
}