forked from booksbyus/zguide
-
Notifications
You must be signed in to change notification settings - Fork 23
/
asyncsrv.cs
183 lines (162 loc) · 4.85 KB
/
asyncsrv.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using ZeroMQ;
namespace Examples
{
static partial class Program
{
static void AsyncSrv_Client(ZContext context, int i)
{
//
// Asynchronous client-to-server (DEALER to ROUTER)
//
// While this example runs in a single process, that is to make
// it easier to start and stop the example. Each task has its own
// context and conceptually acts as a separate process.
//
// Author: metadings
//
// This is our client task
// It connects to the server, and then sends a request once per second
// It collects responses as they arrive, and it prints them out. We will
// run several client tasks in parallel, each with a different random ID.
using (var client = new ZSocket(context, ZSocketType.DEALER))
{
// Set identity to make tracing easier
client.Identity = Encoding.UTF8.GetBytes("CLIENT" + i);
// Connect
client.Connect("tcp://127.0.0.1:5570");
ZError error;
ZMessage incoming;
var poll = ZPollItem.CreateReceiver();
int requests = 0;
while (true)
{
// Tick once per second, pulling in arriving messages
for (int centitick = 0; centitick < 100; ++centitick)
{
if (!client.PollIn(poll, out incoming, out error, TimeSpan.FromMilliseconds(10)))
{
if (error == ZError.EAGAIN)
{
Thread.Sleep(1);
continue;
}
if (error == ZError.ETERM)
return; // Interrupted
throw new ZException(error);
}
using (incoming)
{
string messageText = incoming[0].ReadString();
Console.WriteLine("[CLIENT{0}] {1}", i, messageText);
}
}
using (var outgoing = new ZMessage())
{
outgoing.Add(new ZFrame(client.Identity));
outgoing.Add(new ZFrame("request " + (++requests)));
if (!client.Send(outgoing, out error))
{
if (error == ZError.ETERM)
return; // Interrupted
throw new ZException(error);
}
}
}
}
}
static void AsyncSrv_ServerTask(ZContext context)
{
// This is our server task.
// It uses the multithreaded server model to deal requests out to a pool
// of workers and route replies back to clients. One worker can handle
// one request at a time but one client can talk to multiple workers at
// once.
using (var frontend = new ZSocket(context, ZSocketType.ROUTER))
using (var backend = new ZSocket(context, ZSocketType.DEALER))
{
// Frontend socket talks to clients over TCP
frontend.Bind("tcp://*:5570");
// Backend socket talks to workers over inproc
backend.Bind("inproc://backend");
// Launch pool of worker threads, precise number is not critical
for (int i = 0; i < 5; ++i)
{
int j = i; new Thread(() => AsyncSrv_ServerWorker(context, j)).Start();
}
// Connect backend to frontend via a proxy
ZError error;
if (!ZContext.Proxy(frontend, backend, out error))
{
if (error == ZError.ETERM)
return; // Interrupted
throw new ZException(error);
}
}
}
static void AsyncSrv_ServerWorker(ZContext context, int i)
{
// Each worker task works on one request at a time and sends a random number
// of replies back, with random delays between replies:
using (var worker = new ZSocket(context, ZSocketType.DEALER))
{
worker.Connect("inproc://backend");
ZError error;
ZMessage request;
var rnd = new Random();
while (true)
{
if (null == (request = worker.ReceiveMessage(out error)))
{
if (error == ZError.ETERM)
return; // Interrupted
throw new ZException(error);
}
using (request)
{
// The DEALER socket gives us the reply envelope and message
string identity = request[1].ReadString();
string content = request[2].ReadString();
// Send 0..4 replies back
int replies = rnd.Next(5);
for (int reply = 0; reply < replies; ++reply)
{
// Sleep for some fraction of a second
Thread.Sleep(rnd.Next(1000) + 1);
using (var response = new ZMessage())
{
response.Add(new ZFrame(identity));
response.Add(new ZFrame(content));
if (!worker.Send(response, out error))
{
if (error == ZError.ETERM)
return; // Interrupted
throw new ZException(error);
}
}
}
}
}
}
}
public static void AsyncSrv(string[] args)
{
// The main thread simply starts several clients and a server, and then
// waits for the server to finish.
using (var context = new ZContext())
{
for (int i = 0; i < 5; ++i)
{
int j = i; new Thread(() => AsyncSrv_Client(context, j)).Start();
}
new Thread(() => AsyncSrv_ServerTask(context)).Start();
// Run for 5 seconds then quit
Thread.Sleep(5 * 1000);
}
}
}
}