You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
/* In the "Plaintext" setting, we are using socketChannel to read & write to the network. But for the "SSL" setting,* we encrypt the data before we use socketChannel to write data to the network, and decrypt before we return the responses.* This requires additional buffers to be maintained as we are reading from network, since the data on the wire is encrypted* we won't be able to read exact no.of bytes as kafka protocol requires. We read as many bytes as we can, up to SSLEngine's* application buffer size. This means we might be reading additional bytes than the requested size.* If there is no further data to read from socketChannel selector won't invoke that channel and we've have additional bytes* in the buffer. To overcome this issue we added "stagedReceives" map which contains per-channel deque. When we are* reading a channel we read as many responses as we can and store them into "stagedReceives" and pop one response during* the poll to add the completedReceives. If there are any active channels in the "stagedReceives" we set "timeout" to 0* and pop response and add to the completedReceives.* Atmost one entry is added to "completedReceives" for a channel in each poll. This is necessary to guarantee that * requests from a channel are processed on the broker in the order they are sent. Since outstanding requests added * by SocketServer to the request queue may be processed by different request handler threads, requests on each * channel must be processed one-at-a-time to guarantee ordering.*/
我们知道kafka是基于TCP连接的。其并没有像很多中间件使用netty作为TCP服务器。而是自己基于Java NIO写了一套。关于kafka为什么没有选用netty的原因可以看这里。
对Java NIO不太了解的同学可以先看下这两篇文章,本文需要读者对NIO有一定的了解。
https://segmentfault.com/a/1190000012316621
https://www.jianshu.com/p/0d497fe5484a
更多文章见个人博客:https://github.com/farmerjohngit/myblog
几个重要类
先看下Kafka Client的网络层架构,图片来自于这篇文章。
本文主要分析的是Network层。
Network层有两个重要的类:
Selector
和KafkaChannel
。这两个类和Java NIO层的
java.nio.channels.Selector
和Channel
有点类似。Selector
几个关键字段如下从网络层来看kafka是分为client端(producer和consumer,broker作为从时也是client)和server端(broker)的。本文将分析client端是如何建立连接,以及收发数据的。server也是依靠
Selector
和KafkaChannel
进行网络传输。在Network层两端的区别并不大。建立连接
kafka的client端启动时会调用
Selector#connect
(下文中如无特殊注明,均指org.apache.kafka.common.network.Selector
)方法建立连接。这里的流程和标准的NIO流程差不多,需要单独说下的是
socketChannel#connect
方法返回true的场景,该方法的注释中有提到也就是说在非阻塞模式下,对于
local connection
,连接可能在马上就建立好了,那该方法会返回true,对于这种情况,不会再触发之后的connect
事件。因此kafka用一个单独的集合immediatelyConnectedKeys
将这些特殊的连接记录下来。在接下来的步骤会进行特殊处理。之后会调用poll方法对网络事件监听:
因为
immediatelyConnectedKeys
中的连接不会触发CONNNECT事件,所以在poll时会单独对immediatelyConnectedKeys
的channel调用finishConnect
方法。在明文传输模式下该方法会调用到PlaintextTransportLayer#finishConnect
,其实现如下:关于
immediatelyConnectedKeys
更详细的内容可以看看这里。发送数据
kafka发送数据分为两个步骤:
1.调用
Selector#send
将要发送的数据保存在对应的KafkaChannel
中,该方法并没有进行真正的网络IO。Selector#poll
,在第一步中已经对该channel注册了WRITE事件的监听,所以在当channel可写时,会调用到pollSelectionKeys
将数据真正的发送出去。当可写时,会调用
KafkaChannel#write
方法,该方法中会进行真正的网络IO:接收数据
如果远端有发送数据过来,那调用poll方法时,会对接收到的数据进行处理。
在之后的
addToCompletedReceives
方法中会对该集合进行处理。读出数据后,会先放到stagedReceives集合中,然后在
addToCompletedReceives
方法中对于每个channel都会从stagedReceives取出一个NetworkReceive(如果有的话),放入到completedReceives中。这样做的原因有两点:
mute
掉,即不再从该channel上读取数据。当处理完成之后,才将该channelunmute
,即之后可以从该socket上读取数据。而client端则是通过InFlightRequests#canSendMore
控制。代码中关于这段逻辑的注释如下:
End
本文分析了kafka network层的实现,在阅读kafka源码时,如果不把network层搞清楚会比较迷,比如req/resp的顺序保障机制、真正进行网络IO的不是send方法等等。
The text was updated successfully, but these errors were encountered: