在本章中,我们将分析并发编程的一个更高级的应用:从头开始构建一个工作的非阻塞服务器。我们将介绍socket
模块的复杂用途,例如将用户业务逻辑与回调隔离,并使用内联生成器编写回调逻辑,这两个实例都设计为并发运行。我们还将通过一个例子讨论await
和yield
关键字的使用。
本章将介绍以下主题:
- 使用
socket
模块中的综合 API 从头构建服务器 - Python 生成器和异步生成器的基本信息
- 如何使用带有
await
和yield
关键字的内联生成器将阻塞服务器转换为非阻塞服务器
以下是本章的先决条件列表:
- 确保计算机上安装了 Python 3
- 确保您的计算机上安装了
telnet
- 在下载 GitHub 存储库 https://github.com/PacktPublishing/Mastering-Concurrency-in-Python
- 在本章中,我们将使用名为
Chapter18
的子文件夹 - 查看以下视频以查看代码的运行:http://bit.ly/2KrgWwh
在本章中,我们将使用socket
模块(Python 中的内置库)来构建工作服务器。socket
模块是最常用于实现低级通信协议的模块之一,同时提供了控制这些协议的直观选项。在本节中,我们将介绍实现服务器底层底层体系结构的过程,以及稍后将在我们的示例中使用的模块的关键方法和功能。
请注意,为了成功地遵循本章中的示例,您需要在系统上安装 telnet 程序。Telnet 是一个提供终端命令的程序,可促进双向、交互式、基于文本的通信协议。我们在第 11 章中介绍了 telnet 的安装,与 asyncio建立通信通道;如果您的系统上尚未安装 Telnet,只需导航到(并按照中的说明)该章。
请注意,macOS 系统有一个预先安装的 Telnet 替代方案,称为 Netcat。如果您不希望在 macOS 计算机上安装 Telnet,只需在以下示例中使用命令nc
而不是telnet
,您将获得相同的效果。
在第 11 章中使用异步 IO**构建通信通道中,您遇到了使用aiohttp
模块在更高级别实现异步通信通道的简要示例。在本节中,我们将深入探讨服务器端通信通道的编程结构,以及它如何以高效的方式与其客户机交互。
在网络编程领域,套接字被定义为特定计算机网络节点内的理论端点。套接字负责从其所在的节点接收或发送数据。套接字对于拥有它的节点是唯一的这一事实意味着同一计算机网络中的其他节点理论上无法与套接字交互。换句话说,套接字仅对其相应的节点可用。
要从服务器端打开通信通道,网络程序员必须首先创建套接字并将其绑定到特定地址。此地址通常是一对值,包含有关主机和服务器端口的信息。然后,通过套接字,服务器开始侦听其客户机在网络中创建的任何潜在通信请求。因此,来自客户端的任何连接到服务器的请求都需要通过创建的套接字。
在收到来自潜在客户机的连接请求后,服务器可以决定是否接受该请求。然后将在网络中的两个系统之间建立连接,这意味着它们可以开始相互通信和共享数据。当客户端通过通信信道向服务器发送消息时,服务器随后处理该消息并最终通过相同信道向客户端发送响应;这个过程一直持续到它们之间的连接结束,或者通过其中一个退出连接通道,或者通过一些外部因素。
前面是创建服务器和与潜在客户机建立连接的基本过程。在该过程的每个阶段都实施了多种安全措施,尽管这些措施不是我们关心的问题,也不会在这里讨论。下图还描绘了刚才描述的流程:
Network programming with sockets
请注意,为了创建连接到服务器的请求,潜在客户机还必须为通信通道初始化自己的套接字(如上图所示)。同样,我们只对这个过程的服务器端理论感兴趣,因此,这里不讨论客户端元素。
在本节中,我们将探索socket
模块提供的关键 API,以在前面描述的过程中实现相同的功能。正如我们提到的,socket
模块内置于任何 Python3 发行版中,因此我们可以简单地将模块导入到我们的程序中,而无需运行安装命令。
要创建套接字,我们将使用返回套接字对象的socket.socket()
方法。在实现各种通信协议的大部分过程中,我们将使用这个对象。此外,套接字方法具有以下方法,可帮助我们控制通信协议:
socket.bind()
:此方法将调用套接字绑定到传递给该方法的地址。在我们的示例中,我们将传入一个元组,其中包含主机地址和通信通道的端口。socket.listen()
:此方法允许我们创建的服务器接受来自潜在客户端的连接。另一个可选的正整数参数可以传递给该方法,以指定服务器拒绝新连接之前允许的未接受连接数。在后面的示例中,我们将使用5
作为此方法的任意数字。socket.accept()
:顾名思义,此方法接受调用套接字对象所拥有的特定连接。此调用对象必须首先绑定到地址并侦听连接才能调用此方法。换句话说,此方法将在前面两个方法之后调用。该方法还返回一对值(conn, address)
,其中conn
是接受连接并能够发送和接收数据的新套接字对象,address
是连接另一端的地址(客户端地址)。socket.makefile()
:此方法返回一个与调用的socket
对象关联的file
对象。我们将使用此方法创建一个文件,其中包含从服务器的已接受客户端发送的数据。此file
对象也需要使用close()
方法适当关闭。socket.sendall()
:此方法将作为参数传递的数据发送给调用的socket
对象。我们将使用此方法将数据发送回连接到服务器的客户端。请注意,此方法以字节为单位接收数据,因此我们将在示例中向此方法传递字节字符串。socket.close()
:此方法将调用的socket
对象标记为关闭。在此之后,socket
对象上应用的所有操作都将失败。这将在我们终止服务器时使用。
要真正理解前面描述的方法和函数的使用,最好的方法是在示例程序中查看它们的实际操作。在本节中,我们将构建一个 echo 服务器作为开始示例。这个服务器,顾名思义,将从每个客户机接收到的任何内容发送回客户机。通过本例,您将了解如何设置功能性服务器,以及如何处理来自该服务器的客户端连接和数据,我们将在后面的部分中构建更复杂的服务器。
但是,在开始编写代码之前,让我们先讨论一下将实现此服务器通信逻辑的程序的结构。首先,我们将拥有所谓的反应器,它设置服务器本身,并在潜在客户请求新连接时提供逻辑。具体地说,一旦服务器被设置好,这个反应器将通过一个无限循环来处理服务器接收到的所有连接请求。
如果您已经阅读了前面关于异步编程的章节,那么也可以将这个反应器看作一个事件循环。此事件循环遍历所有要处理的事件(在本例中,它们是请求),并使用事件处理程序逐个处理它们。下图进一步说明了该过程:
An event loop in network programming
然后,我们程序的第二部分是事件循环类比中的事件处理程序,它包含用户业务逻辑:如何处理从客户端接收的数据,以及向每个客户端发送什么。对于我们当前的示例,因为它是一个 echo 服务器,所以我们只发送每个客户端发送到服务器的内容(如果数据有效)。
考虑到这个结构,让我们继续讨论这个服务器的实际实现。从 GitHub 页面下载本章的代码,然后继续导航到Chapter18
文件夹。我们感兴趣的脚本位于Chapter18/example1.py
文件中,如下所示:
# Chapter18/example1.py
import socket
# Main event loop
def reactor(host, port):
sock = socket.socket()
sock.bind((host, port))
sock.listen(5)
print(f'Server up, running, and waiting for call on {host} {port}')
try:
while True:
conn, cli_address = sock.accept()
process_request(conn, cli_address)
finally:
sock.close()
def process_request(conn, cli_address):
file = conn.makefile()
print(f'Received connection from {cli_address}')
try:
while True:
line = file.readline()
if line:
line = line.rstrip()
if line == 'quit':
conn.sendall(b'connection closed\r\n')
return
print(f'{cli_address} --> {line}')
conn.sendall(b'Echoed: %a\r\n' % line)
finally:
print(f'{cli_address} quit')
file.close()
conn.close()
if __name__ == '__main__':
reactor('localhost', 8080)
该程序的结构与我们前面讨论的相同:一个反应器和一个用户业务逻辑处理程序(即process_request()
函数)。首先,reactor 设置服务器(通过创建套接字,将其绑定到参数化主机和端口地址,并调用listen()
方法)。然后,它进入无限循环并促进与客户端的任何潜在连接,首先通过调用socket
对象上的accept()
方法接受连接,然后调用process_request()
函数。如果在上述过程中发生错误,反应堆还负责关闭socket
对象。
另一方面,process_request()
函数将首先创建一个与传递给它的套接字相关联的file
对象。同样,我们的服务器使用这个file
对象从通过特定套接字连接的客户端读取数据。具体来说,在创建了file
对象之后,该函数将进入另一个无限循环,使用readline()
函数不断读取file
对象。如果从文件读取的数据有效,我们将使用sendall()
方法发回相同的数据。
我们还打印出服务器从每个客户端接收到的内容作为服务器输出,包括行print(f'{cli_address} --> {line}')
。另外一个规范是,如果从文件中读取的数据等于字符串quit
,那么我们将关闭与该特定客户端的连接。在连接关闭后,我们将需要小心地处理socket
对象本身以及与之关联的file
对象,使用close()
方法对两者进行处理。
最后,在我们的程序结束时,我们只需调用reactor()
函数并向其传递有关服务器的信息。在这种情况下,我们只需使用服务器的环回接口,端口为8080
。现在,我们将执行脚本来初始化本地服务器。您的输出应类似于以下内容:
> python3 example1.py
Server up, running, and waiting for call on localhost 8080
此时,服务器已启动并正在运行(如输出中所示)。现在,我们想为这个服务器创建一些客户端。为此,打开另一个终端窗口,通过运行telnet localhost 8080
使用 Telnet 程序连接到正在运行的服务器。您的输出应类似于以下内容:
> telnet localhost 8080
Trying 127.0.0.1...
Connected to localhost.
此输出表示 Telnet 客户端已成功连接到我们创建的服务器。现在,我们可以测试服务器是否能够按照我们预期的方式处理其请求。具体来说,输入一些数据,点击返回或输入发送到服务器,您会看到客户端会收到来自服务器的回音消息,就像我们在前面process_request()
函数中实现的那样。同样,客户端可以通过向服务器发送字符串quit
来停止与该服务器的连接。
以下代码显示了输入几个不同短语时的输出:
> telnet localhost 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
Echoed: 'hello'
nice
Echoed: 'nice'
fdkgsnas
Echoed: 'fdkgsnas'
quit
connection closed
Connection closed by foreign host.
查看我们服务器的输出,您还可以看到在此连接过程中发生了什么:
> python3 example1.py
Server up, running, and waiting for call on localhost 8080
Received connection from ('127.0.0.1', 59778)
('127.0.0.1', 59778) --> hello
('127.0.0.1', 59778) --> nice
('127.0.0.1', 59778) --> fdkgsnas
('127.0.0.1', 59778) quit
如前所述,服务器被设计为作为事件循环在反应器中永远运行,可以通过KeyboardInterrupt
异常停止。
我们已经使用socket
模块提供的低级方法成功地实现了我们的第一个 echo 服务器。在下一节中,我们将为服务器实现更高级的功能,并分析将其转换为可同时处理多个客户端的非阻塞服务器的过程。
我们试图实现的功能是有一个简单的请求处理程序,它计算整数列表的和或积,并包含在从客户端发送的数据中。具体来说,如果客户机将字符串1
、2
、4
发送到我们的服务器,那么服务器应该返回7
来计算总和,或者8
来计算乘积。
除了处理来自客户机的请求并将数据处理任务的结果发送给这些客户机之外,每个服务器还实现某种形式的数据处理。因此,该原型将作为更广泛服务器的第一个构建块,具有更复杂的功能。
我们将使用 Python 字符串的split()
方法来提取由字符串中的特定字符分隔的元素。因此,我们将要求所有来自客户端的数据都以这种方式格式化(以逗号分隔的整数),如果客户端发送的内容不是这种格式,我们只需返回一条错误消息,并要求他们生成一条新消息。
基本计算逻辑包含在Chapter18/example2.py
文件中,如下所示:
# Chapter18/example2.py
from operator import mul
from functools import reduce
try:
while True:
line = input('Please enter a list of integer, separated by commas: ')
try:
nums = list(map(int, line.split(',')))
except ValueError:
print('ERROR. Enter only integers separated by commas')
continue
print('Sum of input integers', sum(nums))
print('Product of input integers', reduce(mul, nums, 1))
except KeyboardInterrupt:
print('\nFinished.')
同样,我们使用split()
方法和','
参数来提取特定字符串中的单个数字。显然,sum()
函数用于计算参数数字列表的总和。为了计算聚合积,我们需要从operator
模块中导入mul()
方法(用于乘法),以及从functools
模块中导入reduce()
方法,以将乘法应用于所考虑的数字列表中的每个元素。
作为旁注,传递给reduce()
方法的第三个参数(数字1
是还原过程的起始值。若要了解有关还原操作的更多信息,请阅读流程中的第 7 章、还原运算符(如果您尚未阅读)。
至于我们的实际服务器,我们还将跟踪计算模式。计算模式(默认为执行求和)指示服务器是否应该对输入的数字列表执行求和和和乘法。该模式对于每个客户端连接也是唯一的,并且可以由该客户端切换。具体来说,如果特定客户端发送的数据是字符串sum
,那么我们将把计算模式切换为求和,字符串product
也是如此。
现在,让我们看一看这个服务器在完整的 T0 文件中的完整实现:
# Chapter18/example3.py
import socket
from operator import mul
from functools import reduce
# Main event loop
def reactor(host, port):
sock = socket.socket()
sock.bind((host, port))
sock.listen(5)
print(f'Server up, running, and waiting for call on {host} {port}')
try:
while True:
conn, cli_address = sock.accept()
process_request(conn, cli_address)
finally:
sock.close()
def process_request(conn, cli_address):
file = conn.makefile()
print(f'Received connection from {cli_address}')
mode = 'sum'
try:
conn.sendall(b'<welcome: starting in sum mode>\n')
while True:
line = file.readline()
if line:
line = line.rstrip()
if line == 'quit':
conn.sendall(b'connection closed\r\n')
return
if line == 'sum':
conn.sendall(b'<switching to sum mode>\r\n')
mode = 'sum'
continue
if line == 'product':
conn.sendall(b'<switching to product mode>\r\n')
mode = 'product'
continue
print(f'{cli_address} --> {line}')
try:
nums = list(map(int, line.split(',')))
except ValueError:
conn.sendall(
b'ERROR.
Enter only integers separated by commas\n')
continue
if mode == 'sum':
conn.sendall(b'Sum of input numbers: %a\r\n'
% str(sum(nums)))
else:
conn.sendall(b'Product of input numbers: %a\r\n'
% str(reduce(mul, nums, 1)))
finally:
print(f'{cli_address} quit')
file.close()
conn.close()
if __name__ == '__main__':
reactor('localhost', 8080)
服务器的 reactor 组件与前面的示例保持相同,因为事件循环处理相同类型的逻辑。在我们的用户业务逻辑部分process_request()
函数中,我们仍然使用makefile()
方法返回的file
对象来获取服务器客户端发送的数据。如果客户端发送字符串quit
,则该客户端与服务器之间的连接仍将停止。
这个程序中的第一个新东西是process_request()
函数中的局部变量mode
。此变量指定我们前面讨论的计算模式,并具有字符串sum
的默认值。如您所见,在process_request()
函数中try
块的最末端,该变量决定将何种数据发送回当前客户端:
if mode == 'sum':
conn.sendall(b'Sum of input numbers: %a\r\n'
% str(sum(nums)))
else:
conn.sendall(b'Product of input numbers: %a\r\n'
% str(reduce(mul, nums, 1)))
此外,如果从客户端发送的数据等于字符串sum
,则mode
变量将设置为sum
,同样适用于字符串product
。客户还将收到一条消息,宣布计算模式已更改。此逻辑包含在代码的以下部分中:
if line == 'sum':
conn.sendall(b'<switching to sum mode>\r\n')
mode = 'sum'
continue
if line == 'product':
conn.sendall(b'<switching to product mode>\r\n')
mode = 'product'
continue
现在,让我们看看这个服务器在实际实验中的性能。执行该程序以运行服务器,您将看到与上一示例类似的输出:
> python3 example3.py
Server up, running, and waiting for call on localhost 8080
同样,我们将使用 Telnet 为此服务器创建客户端。当您通过 Telnet 客户端连接到服务器时,请尝试输入一些数据以测试我们实现的服务器逻辑。以下代码显示了我通过各种类型的输入获得的结果:
> telnet localhost 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
<welcome: starting in sum mode>
1,2
Sum of input numbers: '3'
4,9
Sum of input numbers: '13'
product
<switching to product mode>
0,-3
Product of input numbers: '0'
5,-9,10
Product of input numbers: '-450'
hello
ERROR. Enter only integers separated by commas
a,1
ERROR. Enter only integers separated by commas
quit
connection closed
Connection closed by foreign host.
您可以看到,我们的服务器可以按预期处理请求。具体来说,它可以计算给定格式正确的输入字符串的总和和乘积;可适当切换计算模式;如果输入字符串格式不正确,它可以向客户端发送错误消息。同样,这个一直在运行的服务器可以通过KeyboardInterrupt
异常停止。
我们将发现,我们当前拥有的服务器不是非阻塞的。换句话说,它不能同时处理多个客户端。在本节中,您将学习如何在当前服务器上进行构建,使其成为非阻塞服务器,除了使用socket
模块中的低级功能外,还使用有助于并发编程的 Python 关键字。
现在我们将说明,我们当前拥有的服务器不能同时拥有多个客户端。首先,执行Chapter18/example3.py
文件再次运行服务器,如下所示:
> python3 example3.py
Server up, running, and waiting for call on localhost 8080
与我们在前面的示例中所做的类似,现在让我们打开另一个终端,并在运行的服务器中使用 Telnet:
> telnet localhost 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
<welcome: starting in sum mode>
要为此服务器创建第二个客户端,请打开另一个终端并键入相同的telnet
命令,如下所示:
> telnet localhost 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
在这里,我们已经可以看到服务器没有正确处理第二个客户端:它没有将欢迎消息(<welcome: starting in sum mode>
)发送回该客户端。如果我们查看服务器的输出,还可以看到它只注册了一个客户机,这是两个客户机中的第一个:
> python3 example3.py
Server up, running, and waiting for call on localhost 8080
Received connection from ('127.0.0.1', 61099)
接下来,我们将尝试输入来自每个客户机的输入。我们将看到服务器只成功地处理来自第一个客户机的请求。具体来说,以下是我从第一个客户机的输出,包括各种类型的输入:
> telnet localhost 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
<welcome: starting in sum mode>
hello
ERROR. Enter only integers separated by commas
1,5
Sum of input numbers: '6'
product
<switching to product mode>
6,7
Product of input numbers: '42'
现在,在第一个客户端仍保持与服务器的连接的情况下,切换到第二个客户端的终端并尝试输入自己的输入。您将看到,与第一个客户端不同,此客户端没有从服务器接收任何消息:
> telnet localhost 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
1,5
product
6,7
如果查看服务器输出,我们将看到服务器仅处理来自第一个客户端的请求:
> python3 example3.py
Server up, running, and waiting for call on localhost 8080
Received connection from ('127.0.0.1', 61099)
('127.0.0.1', 61099) --> hello
('127.0.0.1', 61099) --> 1,5
('127.0.0.1', 61099) --> 6,7
第二个客户端能够与服务器交互的唯一方法是第一个客户端与服务器断开连接,换句话说,当我们停止第一个客户端与服务器之间的连接时,如下所示:
> telnet localhost 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
<welcome: starting in sum mode>
hello
ERROR. Enter only integers separated by commas
1,5
Sum of input numbers: '6'
product
<switching to product mode>
6,7
Product of input numbers: '42'
quit
connection closed
Connection closed by foreign host.
现在,如果切换到第二个客户机的终端,您将看到该客户机将被来自服务器的消息刷新,这些消息本应在之前接收到:
> telnet localhost 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
1,5
product
6,7
<welcome: starting in sum mode>
ERROR. Enter only integers separated by commas
Sum of input numbers: '6'
<switching to product mode>
Product of input numbers: '42'
来自服务器的所有适当回复现在都已存在,但它们是一次性发送的,而不是在每个输入消息之后发送的。我们的服务器终端的输出也说明了同样的信息激增,如下所示:
> python3 example3.py
Server up, running, and waiting for call on localhost 8080
Received connection from ('127.0.0.1', 61099)
('127.0.0.1', 61099) --> hello
('127.0.0.1', 61099) --> 1,5
('127.0.0.1', 61099) --> 6,7
('127.0.0.1', 61099) quit
Received connection from ('127.0.0.1', 61100)
('127.0.0.1', 61100) --> hello
('127.0.0.1', 61100) --> 1,5
('127.0.0.1', 61100) --> 6,7
这个输出使服务器看起来好像在第一个客户端退出后才从第二个客户端接收到连接,而实际上,我们创建了两个客户端,并让它们同时与服务器通信。这是因为我们当前的服务器一次只能处理一个客户机,并且只有在当前客户机退出后,它才能移动到请求通信通道的下一个客户机。我们称之为阻塞服务器。
在下一节中,我们将讨论如何将当前拥有的阻塞服务器转换为非阻塞服务器,同时保留计算功能。为了做到这一点,我们首先需要研究 Python 编程中的另一个概念,称为生成器。您可能已经使用过 Python 生成器,但为了重述一下,我们将在本节中介绍生成器的关键功能。
生成器是返回迭代器的函数,可以动态暂停和恢复。生成器的返回值通常与列表对象进行比较,因为生成器迭代器是惰性(https://en.wikipedia.org/wiki/Lazy_evaluation 且仅在明确要求时产生结果。由于这个原因,生成器迭代器在内存管理方面更有效,因此在涉及大量数据时,它通常优于列表。
每个生成器都被定义为一个函数,但是我们没有在函数块中使用关键字return
,而是使用了yield
,这表示返回值只是暂时的,在获得返回值后,整个生成器本身仍然可以恢复。让我们看看 Python 生成器在一个示例中是如何工作的,该示例包含在Chapter18/example4.py
文件中,如下所示:
# Chapter18/example4.py
def read_data():
for i in range(5):
print('Inside the inner for loop...')
yield i * 2
result = read_data()
for i in range(6):
print('Inside the outer for loop...')
print(next(result))
print('Finished.')
这里,我们有一个名为read_data()
的生成器,它以惰性方式返回 2 的倍数,从 0 到 8。这是通过关键字yield
完成的,该关键字位于正常函数中返回值的前面:i * 2
。请注意,yield
关键字放在迭代器中应发回的单个元素前面,这有助于延迟生成。
现在,在我们的主程序中,我们正在获取整个迭代器并将其存储在变量result
中。然后,我们使用next()
函数在该迭代器中循环六次(显然,该函数返回传入的迭代器中的下一个元素)。执行代码后,您的输出应类似于以下内容:
> python3 example4.py
Inside the outer for loop...
Inside the inner for loop...
0
Inside the outer for loop...
Inside the inner for loop...
2
Inside the outer for loop...
Inside the inner for loop...
4
Inside the outer for loop...
Inside the inner for loop...
6
Inside the outer for loop...
Inside the inner for loop...
8
Inside the outer for loop...
Traceback (most recent call last):
File "example4.py", line 11, in <module>
print(next(result))
StopIteration
您可以看到,即使迭代器是在我们循环之前从read_data()
生成器生成并返回的,但只有在我们试图从迭代器获取更多项时,才执行生成器中的实际指令。
输出中的 print 语句彼此交替放置(一个 print 语句来自外部for
循环,另一个来自内部for
循环)说明了这一点:执行流首先进入外部for
循环,尝试访问迭代器中的下一项,进入发电机,并进入其自身的for
回路。一旦执行流到达yield
关键字,它就会返回到主程序。此过程持续到for
循环之一终止;在我们的例子中,发电机中的for
回路首先停止,因此我们在最后遇到了StopIteration
错误。
迭代器生成过程中的惰性来自这样一个事实:生成器在到达yield
关键字时停止执行,并且只有在外部指令(在本例中是通过next()
函数)请求时才继续执行。同样,这种形式的数据生成在内存管理中比简单地生成可能需要迭代的所有内容(如列表)要高效得多。
生成器与构建异步服务器的目的有什么关系?我们当前的服务器无法处理多个客户机的原因是,我们在用户业务逻辑部分使用的readline()
函数是一个阻塞函数,用于获取客户机数据,只要当前file
对象仍然打开,就可以防止执行流流向其他潜在客户机。这就是为什么,当当前客户机停止与服务器的连接时,下一个客户机会立即收到我们前面看到的大量信息。
如果我们可以将此函数重写为异步函数,允许执行流在不同的客户端之间切换,而这些客户端都连接到服务器,那么该服务器将成为非阻塞的。我们将通过使用异步生成器为服务器同时从多个客户机并发生成数据来实现这一点。
为了查看我们将为服务器使用的异步生成器的底层结构,让我们首先考虑一下 HORT T0 文件,如下:
# Chapter18/example5.py
import types
@types.coroutine
def read_data():
def inner(n):
try:
print(f'Printing from read_data(): {n}')
callback = gen.send(n * 2)
except StopIteration:
pass
data = yield inner
return data
async def process():
try:
while True:
data = await read_data()
print(f'Printing from process(): {data}')
finally:
print('Processing done.')
gen = process()
callback = gen.send(None)
def main():
for i in range(5):
print(f'Printing from main(): {i}')
callback(i)
if __name__ == '__main__':
main()
我们仍在考虑打印出 2 的倍数,介于 0 和 8 之间。在本例中,process()
函数是我们的异步发电机。您可以看到,事实上,生成器中没有yield
关键字;这是因为我们使用的是await
关键字。此异步生成器负责打印由另一个生成器read_data()
计算的 2 的倍数。
@types.coroutine
装饰器用于将生成器read_data()
转换为一个协程函数,该函数返回一个基于生成器的协程,该协程仍然可以用作常规生成器,但也可以等待。这种基于生成器的协同路由是将阻塞服务器转换为非阻塞服务器的关键。协同程序使用send()
方法执行计算,这是一种向生成器提供输入的方法(在本例中,我们向process()
生成器提供 2 的倍数)。
这个协同程序返回一个回调,稍后主程序可以调用它。这就是为什么在主程序中循环通过range(5)
之前,我们需要跟踪process()
生成器本身(存储在变量gen
中)和返回的回调(存储在变量callback
中)。具体来说,回调是gen.send(None)
的返回值,用于启动process()
生成器的执行。最后,我们简单地循环前面提到的range
对象,并使用适当的输入调用callback
对象。
关于这种使用异步发电机的方法背后的理论已经说了很多。现在,让我们看看它的实际行动。执行该程序,您将获得以下输出:
> python3 example5.py
Printing from main(): 0
Printing from read_data(): 0
Printing from process(): 0
Printing from main(): 1
Printing from read_data(): 1
Printing from process(): 2
Printing from main(): 2
Printing from read_data(): 2
Printing from process(): 4
Printing from main(): 3
Printing from read_data(): 3
Printing from process(): 6
Printing from main(): 4
Printing from read_data(): 4
Printing from process(): 8
Processing done.
在输出(特别是 print 语句)中,我们仍然可以观察到任务切换事件,这些事件对于前面章节中讨论的异步编程和延迟生成输出的生成器都是至关重要的。本质上,我们实现了与前一个示例相同的目标(打印 2 的倍数),但在这里,我们使用异步生成器(带有async
和await
关键字)来促进任务切换事件,并且我们还能够通过回调将特定参数传递给生成器。这些技术结合起来,形成了将应用于当前阻塞服务器的基本结构。
最后,我们将考虑再次实现非阻塞服务器的问题。这里,我们将应用前面讨论的异步生成器,以方便异步读取和处理从服务器客户端接收的数据。服务器的实际代码包含在Chapter18/example6.py
文件中;我们将经历它的各个部分,因为它是一个相对较长的项目。让我们将注意力转向本程序中的全局变量,如下所示:
# Chapter18/example6.py
from collections import namedtuple
###########################################################################
# Reactor
Session = namedtuple('Session', ['address', 'file'])
sessions = {} # { csocket : Session(address, file)}
callback = {} # { csocket : callback(client, line) }
generators = {} # { csocket : inline callback generator }
为了能够同时成功地为多个客户机提供服务,我们将允许服务器同时拥有多个会话(每个客户机一个会话),因此,我们需要跟踪多个字典,每个字典将保存有关当前会话的一条特定信息。
具体而言,sessions
字典将客户机套接字连接映射到Session
对象,该对象是 Pythonnamedtuple
对象,包含客户机地址和与该客户机连接关联的file
对象。callback
字典将客户端套接字连接映射到回调,该回调是我们稍后将实现的异步生成器的返回值;每个回调都将其相应的客户端套接字连接和从该客户端读取的数据作为参数。最后,generators
字典将客户端套接字连接映射到相应的异步生成器。
现在,让我们看一下函数的函数:
# Chapter18/example6.py
import socket, select
# Main event loop
def reactor(host, port):
sock = socket.socket()
sock.bind((host, port))
sock.listen(5)
sock.setblocking(0) # Make asynchronous
sessions[sock] = None
print(f'Server up, running, and waiting for call on {host} {port}')
try:
while True:
# Serve existing clients only if they already have data ready
ready_to_read, _, _ = select.select(sessions, [], [], 0.1)
for conn in ready_to_read:
if conn is sock:
conn, cli_address = sock.accept()
connect(conn, cli_address)
continue
line = sessions[conn].file.readline()
if line:
callback[conn](conn, line.rstrip())
else:
disconnect(conn)
finally:
sock.close()
除了之前的阻塞服务器之外,我们还添加了一些指令:我们使用socket
模块中的setblocking()
方法来潜在地使服务器异步或非阻塞;在启动服务器时,我们还将该特定套接字注册到sessions
字典中,暂时使用None
值。
在我们的无限while
循环(事件循环)中,是我们试图实现的新非阻塞特性的一部分。首先,我们使用select
模块中的select()
方法从sessions
字典中挑出准备读取的套接字(换句话说,具有可用数据的套接字)。由于该方法的第一个参数用于读取数据,第二个参数用于写入数据,第三个参数用于异常数据,因此我们仅在第一个参数中传递sessions
字典。第四个参数指定方法的超时时间(秒);如果未指定,该方法将无限阻塞,直到sessions
中至少有一项可用,这不适用于我们的非阻塞服务器。
接下来,对于每个准备好读取的客户机套接字连接,如果该连接对应于我们的原始服务器套接字,我们将接受该连接并调用connect()
函数(我们将很快看到)。在这个for
循环中,我们还将处理回调方法。具体来说,我们将访问当前套接字连接会话的file
属性(回想一下,每个会话都有address
属性和file
属性),并将使用readline()
方法从中读取数据。现在,如果我们读取的是有效数据,那么我们将把它(连同当前客户端连接)传递给相应的回调;否则,我们将终止连接。
请注意,即使我们的服务器通过将套接字设置为非阻塞来实现异步,前面的readline()
方法仍然是一个阻塞函数。readline()
函数在其输入数据(ASCII 中的'\r'
字符)中到达回车时返回。这意味着,如果客户端发送的数据不包含回车符,那么readline()
函数将无法返回。但是,由于服务器仍然是非阻塞的,因此将引发错误异常,以便其他客户端不会被阻塞。
现在,让我们看一下新的助手函数:
# Chapter18/example6.py
def connect(conn, cli_address):
sessions[conn] = Session(cli_address, conn.makefile())
gen = process_request(conn)
generators[conn] = gen
callback[conn] = gen.send(None) # Start the generator
def disconnect(conn):
gen = generators.pop(conn)
gen.close()
sessions[conn].file.close()
conn.close()
del sessions[conn]
del callback[conn]
connect()
函数将在客户端连接具有准备读取的数据时调用,该函数将在与客户端的有效连接开始时启动启动指令。首先,它初始化与特定客户端连接相关联的namedtuple
对象(我们仍在使用makefile()
方法创建file
对象)。该函数的其余部分是我们在前面讨论的异步生成器的使用模式中看到的:我们将客户端连接传递给process_request()
,它现在是一个异步生成器;在generators
字典中注册;呼叫send(None)
启动发电机;并将返回值存储到callback
字典中,以便以后可以调用它(特别是在我们刚才看到的反应堆中事件循环的最后一部分)。
另一方面,disconnect()
功能有助于在与客户端的连接停止时执行各种清洁指令。它从generators
字典中删除与客户端连接关联的生成器,并关闭生成器、存储在sessions
字典中的file
对象以及客户端连接本身。最后,它从其余的字典中删除与客户端连接对应的键。
我们来关注一下新的process_request()
函数,它现在是一个异步发电机:
# Chapter18/example6.py
from operator import mul
from functools import reduce
###########################################################################
# User's Business Logic
async def process_request(conn):
print(f'Received connection from {sessions[conn].address}')
mode = 'sum'
try:
conn.sendall(b'<welcome: starting in sum mode>\n')
while True:
line = await readline(conn)
if line == 'quit':
conn.sendall(b'connection closed\r\n')
return
if line == 'sum':
conn.sendall(b'<switching to sum mode>\r\n')
mode = 'sum'
continue
if line == 'product':
conn.sendall(b'<switching to product mode>\r\n')
mode = 'product'
continue
print(f'{sessions[conn].address} --> {line}')
try:
nums = list(map(int, line.split(',')))
except ValueError:
conn.sendall(
b'ERROR. Enter only integers separated by commas\n')
continue
if mode == 'sum':
conn.sendall(b'Sum of input integers: %a\r\n'
% str(sum(nums)))
else:
conn.sendall(b'Product of input integers: %a\r\n'
% str(reduce(mul, nums, 1)))
finally:
print(f'{sessions[conn].address} quit')
处理客户端数据和执行计算的逻辑保持不变,此新函数的唯一区别是async
关键字(放在def
关键字前面)和与新readline()
函数一起使用的await
关键字。本质上,这些差异将我们的process_request()
函数转换为非阻塞函数,条件是新的readline()
函数也是非阻塞的:
# Chapter18/example6.py
import types
@types.coroutine
def readline(conn):
def inner(conn, line):
gen = generators[conn]
try:
callback[conn] = gen.send(line) # Continue the generator
except StopIteration:
disconnect(conn)
line = yield inner
return line
与我们在前面的示例中看到的类似,我们正在从 Python 导入types
模块,并使用@types.coroutine
装饰器使readline()
函数成为基于生成器的协同例程,这是非阻塞的。每次调用回调(接收一个客户端连接和一行数据)时,执行流都会进入该协程中的inner()
函数并执行指令。
具体地说,它将数据行发送到生成器,这将使process_request()
中的指令能够异步处理它,并将返回值存储到相应的回调中,除非到达生成器的末尾,在这种情况下,将调用disconnect()
函数。
我们的最后一项任务是测试此服务器是否能够同时处理多个客户端。要执行此操作,请首先执行以下脚本:
> python3 example6.py
Server up, running, and waiting for call on localhost 8080
与前面看到的类似,打开另外两个终端,并将 Telnet 与以下两个终端一起使用到正在运行的服务器中:
> telnet localhost 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
<welcome: starting in sum mode>
正如您所看到的,两个客户端都得到了正确的处理:都能够连接,并且都接收到欢迎消息。服务器输出也说明了这一点,如下所示:
> python3 example6.py
Server up, running, and waiting for call on localhost 8080
Received connection from ('127.0.0.1', 63855)
Received connection from ('127.0.0.1', 63856)
进一步的测试可能涉及同时向服务器发送消息,服务器仍然可以处理这些消息。服务器还可以跟踪各个客户端特有的各个计算模式(换句话说,假设每个客户端都有单独的计算模式)。我们已经从零开始成功地构建了一个非阻塞的并发服务器。
通常情况下,低级网络编程涉及套接字的操作和处理(定义为特定计算机网络节点内的理论端点,负责从节点接收或发送数据)。服务器端通信的体系结构由涉及套接字处理的多个步骤组成,例如绑定、侦听、接受、读取和写入。socket
模块提供了一个直观的 API,便于执行这些步骤。
要使用socket
模块创建非阻塞服务器,需要实现异步生成器,以便执行流在任务和数据之间切换。这个过程还涉及使用回调,这些回调可以在以后由执行流运行。这两个元素允许服务器同时读取和处理来自多个客户端的数据,从而使服务器成为非阻塞的。
我们将在下一章以设计和实现并发程序的实用技术作为本书的结尾。具体地说,我们将讨论如何系统有效地测试、调试和调度并发应用。
- 什么是插座?它与网络编程有什么关系?
- 当潜在客户端请求连接时,服务器端通信的过程是什么?
socket
模块提供了哪些方法来促进服务器端的底层网络编程?- 什么是发电机?与 Python 列表相比,它们的优势是什么?
- 什么是异步发电机?如何将它们应用于构建非阻塞服务器?
有关更多信息,请参阅以下链接:
- 并发性主题演讲,PyBay 2017,雷蒙德·赫廷格(https://pybay.com/site_media/slides/raymond2017-keynote/async_examples.html
- 一个简单的 Python Web 服务器,Stephen C.Phillips(blog.scphillips.com/posts/2012/12/A-simple-Python-webserver/)
- 如何在 Python中使用 TCP 套接字,Alexander Stepanov(steelkiwi.com/blog/working-TCP-Sockets/)
- Python 套接字编程、Nathan Jennings(realpython.com/Python sockets/#多连接客户端和服务器)
- Python 生成器简介(realpython.com/Introduction-to-Python-Generators/)