0%

kafka通用请求链路

(这是笔者在美团实习时做的一次源码分享,当时直接将本文当作slide,所以很多地方写得比较简略)
本文以Kafka 0.10.0为例,追踪broker端通用请求处理链路。本文侧重一个请求在Kafka的组件中如何流转,不会涉及太多网络层(Java NIO)和请求具体的处理实现。

总体流程

TODO 流程图,时序

一个请求完整的请求链路:

  1. 客户端请求建立连接
  2. acceptor监听到连接事件,客户端的新连接到来
  3. acceptor将新连接分配给某个processor,连接建立成功,processor等待收到客户端的请求就绪
  4. 客户端发送请求,processor监听到请求就绪事件
  5. 解析请求,转换成内部的request数据,加入到requestQueue中
  6. handler从request queue中poll到请求
  7. 将请求转移给KafkaAPIs组件处理
  8. handler或者延时组件生成对应的response,写到请求所在processor的response queue中
  9. processor每次poll从自己的response queue取出response
  10. processor将response写入对应的Socket Channel
  11. 网络线程将response返回给客户端

关键组件

通过KafkaServer的启动流程,了解到和通用请求处理相关的组件有三个:

  1. SocketServer:负责连接的建立、接受request和发送response
    • RequestChannel:这个组件比较特殊,它是连接Socket和Apis的逻辑channel,由SocketServer负责管理
  2. KafkaApis:统一的请求处理入口,将请求转交给对应的组件处理。同时Apis也要负责控制请求的频率
  3. KafkaRequestHandlerPool:存放requestHandler的pool

TODO 逻辑视图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
/**
* Start up API for bringing up a single instance of the Kafka server.
* Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
*/
def startup() {
try {
info("starting")

if(isShuttingDown.get)
throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")

if(startupComplete.get)
return

val canStartup = isStartingUp.compareAndSet(false, true)
if (canStartup) {
metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime, true)

brokerState.newState(Starting)

/* start scheduler */
kafkaScheduler.startup()

/* setup zookeeper */
zkUtils = initZk()

/* start log manager */
logManager = createLogManager(zkUtils.zkClient, brokerState)
logManager.startup()

/* generate brokerId */
config.brokerId = getBrokerId
this.logIdent = "[Kafka Server " + config.brokerId + "], "

// 1. 初始化好acceptor线程和processor线程,注册好监听事件
socketServer = new SocketServer(config, metrics, kafkaMetricsTime)
socketServer.startup()
// 到这里,broker其实已经能够接受连接和请求了,只是请求还没法处理

// ... replica、controller、group coordinator、authorizer等组件的初始化和启动

/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager, groupCoordinator,
kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer)
// 初始化好requestHandler线程,完成后broker可以开始正常工作
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
brokerState.newState(RunningAsBroker)
// ...
}
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
isStartingUp.set(false)
shutdown()
throw e
}
}
}

kafka启动时,会先初始化好socket server,然后再去初始化接口KafkaApis和handler线程。下面我们依次深入三个关键组件的初始化和启动

SocketServer

SocketServer负责网络相关的事件:连接的管理、request和response的收发。SocketServer的核心是network NIO,简单来说,就是用户以异步的方式来等待网络数据准备好,不会因为等待某个请求,使得整个线程阻塞住。从使用上来看,线程每隔一段时间就去网络层那里查看有没有传输完的请求,如果有就处理,没有就随用户操作。

初始化

SocketServer的初始化主要是根据配置,为acceptor线程和processor线程的array预留好空间,同时初始化好RequestChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time) extends Logging with KafkaMetricsGroup {

private val endpoints = config.listeners // "listeners" broker监听的端口列表
private val numProcessorThreads = config.numNetworkThreads // "num.network.threads" 配置里设置的网络线程数N
private val maxQueuedRequests = config.queuedMaxRequests // "queued.max.requests" 请求队列的长度
private val totalProcessorThreads = numProcessorThreads * endpoints.size // 总共的网络线程数,一个endpoint会有N个网络线程

private val maxConnectionsPerIp = config.maxConnectionsPerIp // "max.connections.per.ip" 每个ip最大的连接数
private val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides // "max.connections.per.ip.overrides" ?

this.logIdent = "[Socket Server on Broker " + config.brokerId + "], "

val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests) // 连接SocketServer和KafkaAPi的逻辑channel,存放两种关键队列:requestQueue、responseQueue
private val processors = new Array[Processor](totalProcessorThreads) // 所有的网络线程组成一个Array,每N个网络线程属于同一个endpoint

private[network] val acceptors = mutable.Map[EndPoint, Acceptor]() // acceptor线程
private var connectionQuotas: ConnectionQuotas = _ // 》

private val allMetricNames = (0 until totalProcessorThreads).map { i =>
val tags = new util.HashMap[String, String]()
tags.put("networkProcessor", i.toString)
metrics.metricName("io-wait-ratio", "socket-server-metrics", tags)
} // 每个processor的io-ratio统计指标

// register the processor threads for notification of responses
requestChannel.addResponseListener(id => processors(id).wakeup())
}

SocketServer负责RequstChannel的初始化 - 主要是两种队列的初始化:requestQueue和responseQueue

class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
// 主要是两种队列的初始化,需要注意的是
// 所有线程共用一个requestQueue
// 每个processor线程各自拥有ResponseQueue
private var responseListeners: List[(Int) => Unit] = Nil
private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
for(i <- 0 until numProcessors)
responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
}

启动

用户可以给Kafka配置多个监听地址endpoint,SocketServer启动时会为每个endpoint启动一个acceptor线程,然后每一个acceptor线程配N个processor线程。当acceptor线程启动成功后,SocketServer的启动也随之完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time) extends Logging with KafkaMetricsGroup {
def startup() {
this.synchronized {
connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides) // 连接数计数

val sendBufferSize = config.socketSendBufferBytes // "socket.send.buffer.bytes" 发送缓存区大小
val recvBufferSize = config.socketReceiveBufferBytes // "socket.receive.buffer.bytes" 接受缓存区大小
val brokerId = config.brokerId // "broker.id"

var processorBeginIndex = 0
// 一个endpoint对应一个acceptor,一个acceptor对应N个processor
endpoints.values.foreach { endpoint =>
val protocol = endpoint.protocolType
val processorEndIndex = processorBeginIndex + numProcessorThreads

for (i <- processorBeginIndex until processorEndIndex) {
// 初始化Processor线程,主要是初始化Selector
processors(i) = newProcessor(i, connectionQuotas, protocol)
}

// 在这里启动的processor线程
// 初始化acceptor线程,同时启动绑定的processor线程
val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
acceptors.put(endpoint, acceptor)

// 这里为什么不用daemon方式启动线程?
Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port), acceptor, false).start()
acceptor.awaitStartup() // 在这里等待acceptor启动成功后才恢复执行(acceptor注册OP_ACCEPT事件成功)
processorBeginIndex = processorEndIndex
}
}

newGauge("NetworkProcessorAvgIdlePercent",
new Gauge[Double] {
def value = allMetricNames.map( metricName =>
metrics.metrics().get(metricName).value()).sum / totalProcessorThreads
}
)

info("Started " + acceptors.size + " acceptor threads")
}
}

KafkaApis

不展开看了,主要是配额器quotaManager的初始化。KafkaApis的主要功能就是将request dispatch到相应的组件上

KafkaRequestHandlerPool

在初始化KafkaRequestHandlerPool时,SocketServer已经启动好了,说明所有的acceptor和processor已经就绪,broker已经能够接受请求了,所以KafkaRequestHandlerPool的初始化主要完成KafkaRequestHandler线程的启动,启动成功后就可以开始处理请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: KafkaApis,
numThreads: Int) extends Logging with KafkaMetricsGroup {

/* a meter to track the average free capacity of the request handlers */
// handler的idle percent指标是聚合后的结果
private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)

this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
// TODO 弄清楚threads和runnables的区别
val threads = new Array[Thread](numThreads) // 后台启动的handler线程
val runnables = new Array[KafkaRequestHandler](numThreads) // 存放handler线程
for(i <- 0 until numThreads) {
runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)
// 以Daemon方式启动线程
threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
threads(i).start()
}

def shutdown() {
info("shutting down")
for(handler <- runnables)
handler.shutdown
for(thread <- threads)
thread.join
info("shut down completely")
}
}

线程模型

通过前面的总体流程,我们可以看到一个请求会在不同的线程中处理和转发,在这一部分,我们会关注每个线程具体的操作。总的来说,Kafka的网络线程模型采用的 Reactor 多线程模型:一个1+N+M模型:1个acceptor,N个processor,M个handler

  • acceptor:监听端口,接受新连接的到来,将新连接以轮询的方式派发给processor
  • processor:接受request,发送response
  • handler:通过requestQueue,处理processor发来的request,生成response加入到response queue中

网络模型除了reactor模型外,还有Proactor模型,本文不再展开,如果有兴趣可以参阅高性能网络框架:Reactor 和 Proactor TOOD link

acceptor

acceptor的主要工作是监听端口,将新来的连接以轮询的方式派发给自己拥有的processor线程。因为acceptor处于连接入口,所以acceptor同时肩负着控制连接数的重任。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def run() {
// 注册OP_ACCEPT事件,当有新的客户端连接到来时,事件触发
// 类似的事件还有OP_CONNECT,但是OP_CONNECT只能给client端使用,语义:SocketChannel.connect()请求连接成功后就绪
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
startupComplete() // 计数减1,当计数为0时唤醒所有的await,在这里执行成功之后SocketServer.startup()被唤醒继续执行
try {
var currentProcessor = 0
while (isRunning) {
try {
val ready = nioSelector.select(500)
if (ready > 0) {
// 如果有新的连接
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
try {
val key = iter.next
iter.remove()
if (key.isAcceptable)
// round-robin,processor只是简单地将该连接的channel加到一个list中,等到下一次poll在更新
accept(key, processors(currentProcessor))
else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")

// round robin to the next processor thread
currentProcessor = (currentProcessor + 1) % processors.length
} catch {
case e: Throwable => error("Error while accepting connection", e)
}
}
}
}
catch {
// We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
// to a select operation on a specific channel or a bad request. We don't want the
// the broker to stop responding to requests from other clients in these scenarios.
case e: ControlThrowable => throw e
case e: Throwable => error("Error occurred", e)
}
}
} finally {
debug("Closing server socket and selector.")
swallowError(serverChannel.close())
swallowError(nioSelector.close())
shutdownComplete()
}
}

新连接到来时acceptor具体的操作:为新来的连接生成好socketChannel,然后将socketChannel转移给processor。但是processor收到新连接,并不会马上处理,而是将其暂时缓存,到下一轮poll统一处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// acceptor
def accept(key: SelectionKey, processor: Processor) {
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
val socketChannel = serverSocketChannel.accept() // accept什么?
try {
// 这里如果同一个address的连接数超过限制则会抛出TooManyConnectionsException异常拒绝处理
connectionQuotas.inc(socketChannel.socket().getInetAddress)
socketChannel.configureBlocking(false)
socketChannel.socket().setTcpNoDelay(true) // 开启TCP_NODELAY,禁用Nagle算法,允许小包发送
socketChannel.socket().setKeepAlive(true)
socketChannel.socket().setSendBufferSize(sendBufferSize)

debug("Accepted connection from %s on %s and assigned it to processor %d, sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]"
.format(socketChannel.socket.getRemoteSocketAddress, socketChannel.socket.getLocalSocketAddress, processor.id,
socketChannel.socket.getSendBufferSize, sendBufferSize,
socketChannel.socket.getReceiveBufferSize, recvBufferSize))

processor.accept(socketChannel) // 将channel转移给processor处理,processor只是简单的加到一个队列中,等待下一轮poll才会真正处理心连接
} catch {
case e: TooManyConnectionsException =>
info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count))
close(socketChannel)
}
}

// processor
def accept(socketChannel: SocketChannel) {
newConnections.add(socketChannel)
wakeup() // 如果selector因为IO block,那么唤醒它
}

processor线程

processor负责接收客户端的请求,同时将请求结果回复给客户端。processor一次循环会依次处理:

  1. 注册上一轮poll期间acceptor分配来的新连接
  2. 处理新的response任务
  3. 执行selector.poll,等待IO事件
  4. 处理新的就绪请求,将其加入到RequstQueue
  5. 处理发送成功的response,将发送完的response从inflightResponse队列中删除
  6. 如果有连接关闭,将与其相关的response从inflightResponse队列中删除

TODO 这里的顺序可以变动吗

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
override def run() {
startupComplete() // 已经启动,计数器减一,不过暂时没有地方等待processor的启动成功
while (isRunning) {
try {
// setup any new connections that have been queued up
// 1.首先注册上一轮poll期间到达的新连接
configureNewConnections()
// register any new responses for writing
processNewResponses() // 2. 处理这个processor的新的response任务
poll() // 3.selector.poll(300),如果有就绪消息事件,或者超时事件发生,否则会一直block
processCompletedReceives() // 4.处理完整接收好的请求,加入到requestQueue,这一步会因为竞争被阻塞
processCompletedSends() // 5.更新inflightResponses队列,将发送完的response从队列中删除
processDisconnected() // 6.有连接关闭,清理inflightResponses队列中相关的response
} catch {
// We catch all the throwables here to prevent the processor thread from exiting. We do this because
// letting a processor exit might cause a bigger impact on the broker. Usually the exceptions thrown would
// be either associated with a specific socket channel or a bad request. We just ignore the bad socket channel
// or request. This behavior might need to be reviewed if we see an exception that need the entire broker to stop.
case e: ControlThrowable => throw e
case e: Throwable =>
error("Processor got uncaught exception.", e)
}
}

debug("Closing selector - processor " + id)
swallowError(closeAll())
shutdownComplete()
}

注册新连接

主要要向selector注册新连接的ON_READ事件,后续收到数据才能被唤醒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private def configureNewConnections() {
while (!newConnections.isEmpty) {
val channel = newConnections.poll()
try {
debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")
val localHost = channel.socket().getLocalAddress.getHostAddress
val localPort = channel.socket().getLocalPort
val remoteHost = channel.socket().getInetAddress.getHostAddress
val remotePort = channel.socket().getPort
// 生成唯一的connectionId
val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString
// 向selector注册channel的OP_READ事件
selector.register(connectionId, channel)
} catch {
// We explicitly catch all non fatal exceptions and close the socket to avoid a socket leak. The other
// throwables will be caught in processor and logged as uncaught exceptions.
case NonFatal(e) =>
// need to close the channel here to avoid a socket leak.
close(channel)
error(s"Processor $id closed connection from ${channel.getRemoteAddress}", e)
}
}
}

处理新的response

将新的response发送给selector后,将其加入到inflightResponse中,inflightResponse表示正在发送的response

TODO 插入一点NIO的基本知识

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private def processNewResponses() {
var curr = requestChannel.receiveResponse(id)
while (curr != null) {
try {
curr.responseAction match {
case RequestChannel.NoOpAction =>
// There is no response to send to the client, we need to read more pipelined requests
// that are sitting in the server's socket buffer
curr.request.updateRequestMetrics
trace("Socket server received empty response to send, registering for read: " + curr)
// 请求不需要回复,已经处理完,unmute channel
selector.unmute(curr.request.connectionId)
case RequestChannel.SendAction =>
// 将response发送给selector
sendResponse(curr)
case RequestChannel.CloseConnectionAction =>
curr.request.updateRequestMetrics
trace("Closing socket connection actively according to the response code.")
close(selector, curr.request.connectionId)
}
} finally {
curr = requestChannel.receiveResponse(id) // 循环迭代
}
}
}

/* `protected` for test usage */
protected[network] def sendResponse(response: RequestChannel.Response) {
trace(s"Socket server received response to send, registering for write and sending data: $response")
val channel = selector.channel(response.responseSend.destination)
// `channel` can be null if the selector closed the connection because it was idle for too long
if (channel == null) {
// 连接关闭了
warn(s"Attempting to send response via channel for which there is no open connection, connection id $id")
response.request.updateRequestMetrics()
}
else {
// 将response发送给selector,然后将其加入到inflightResponses队列
selector.send(response.responseSend)
inflightResponses += (response.request.connectionId -> response)
}
}

poll

主要是调用selector.poll函数,这个函数以non-blocking的方式完成网络IO操作,如果没有就绪的请求,最多等待300ms返回

1
2
3
4
5
6
7
8
9
10
private def poll() {
try selector.poll(300) //在这里沉睡
catch {
case e @ (_: IllegalStateException | _: IOException) =>
error(s"Closing processor $id due to illegal state or IO exception")
swallow(closeAll())
shutdownComplete()
throw e
}
}

处理新的就绪请求

processor将就绪请求封装成内部的Request数据,然后将其加入到RequestQueue队列, 这一步会发生锁竞争导致堵塞。

另外,需要注意的是,为了保证同一个连接的请求保持一来一回的顺序,请求之间保持有序,Kafka使用了mute机制

TODO 疑问 为什么要共用一把锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private def processCompletedReceives() {
selector.completedReceives.asScala.foreach { receive =>
try {
val channel = selector.channel(receive.source)
val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
channel.socketAddress)
val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
// 这里会阻塞,requestQueue是ArrayBlockingQueue
requestChannel.sendRequest(req)
// 为了保证同一个连接上请求的有序性:后来的请求的回复不会先于早来的到达,会mute这个channel,直到请求的回复被对方接受
selector.mute(receive.source)
} catch {
case e @ (_: InvalidRequestException | _: SchemaException) =>
// note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
error(s"Closing socket for ${receive.source} because of error", e)
close(selector, receive.source)
}
}
}

处理发送成功的response

1
2
3
4
5
6
7
8
9
10
11
private def processCompletedSends() {
selector.completedSends.asScala.foreach { send =>
// 将已经发送成功的response从inflightResponse中删除
val resp = inflightResponses.remove(send.destination).getOrElse {
throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
}
resp.request.updateRequestMetrics()
// 恢复channel的监听
selector.unmute(send.destination)
}
}

连接关闭收尾

处理两件事:

  • 更新inflightResponse,将相关的response删除
  • 更新ConnectionQuota
1
2
3
4
5
6
7
8
9
10
11
12
private def processDisconnected() {
selector.disconnected.asScala.foreach { connectionId =>
val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
}.remoteHost
// 删除所有和connectionId相关的response,同时更新metrics
inflightResponses.remove(connectionId).foreach(_.request.updateRequestMetrics())
// the channel has been closed by the selector but the quotas still need to be updated
// 更新Quota,
connectionQuotas.dec(InetAddress.getByName(remoteHost))
}
}

KafkaRequestHandler线程

handler从requestQueue中拉取请求,然后通过KafkaApis将请求dispatch到相应的组件上继续处理,handler主要负责两件事:

  • 请求的执行,一般会把请求从KafkaApis转移到其他组件继续执行
  • 生成response,一般是把生成response的操作封装成一个函数,将其传入到其他组件。当其他组件处理完请求时,调用回调函数生成response消息,并将其追加到相应processor的response queue中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def run() {
while(true) {
try {
var req : RequestChannel.Request = null
while (req == null) {
// We use a single meter for aggregate idle percentage for the thread pool.
// Since meter is calculated as total_recorded_value / time_window and
// time_window is independent of the number of threads, each recorded idle
// time should be discounted by # threads.
val startSelectTime = SystemTime.nanoseconds
// receiveRequest有两个实现,还有一个无参实现,两者的区别在于调用ArrayBlockingQueue不同的接口
// 带timeout的调用queue.poll(timeout) ,如果没有数据等待timeout后返回空
// 无参的调用queue.take() 如果没有数据,则会挂起等待,直到有数据
req = requestChannel.receiveRequest(300)
val idleTime = SystemTime.nanoseconds - startSelectTime
aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
}

if(req eq RequestChannel.AllDone) {
// 当上层关闭KafkaRequestHandlerPool,会依次调用每个handler的shutdown
// 每个handler往RequestQueue里发送AllDone请求,通知run loop退出
debug("Kafka request handler %d on broker %d received shut down command".format(
id, brokerId))
return
}
req.requestDequeueTimeMs = SystemTime.milliseconds
trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
apis.handle(req) // dispatch,请求处理转移到KafkaApis,之后在转到请求对应的组件去
} catch {
case e: Throwable => error("Exception when handling request", e)
}
}
}

KafkaApis.handle根据不同的请求,调用不同的函数处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def handle(request: RequestChannel.Request) {
try {
trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal))
ApiKeys.forId(request.requestId) match {
// 可以看到,所有的请求都是流经同一个request queue,没有优先级的区别
case ApiKeys.PRODUCE => handleProducerRequest(request)
case ApiKeys.FETCH => handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request)
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request)
case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request)
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
case e: Throwable =>
if (request.requestObj != null) {
request.requestObj.handleError(e, requestChannel, request)
error("Error when handling request %s".format(request.requestObj), e)
} else {
val response = request.body.getErrorResponse(request.header.apiVersion, e)
val respHeader = new ResponseHeader(request.header.correlationId)

/* If request doesn't have a default error response, we just close the connection.
For example, when produce request has acks set to 0 */
if (response == null)
requestChannel.closeConnection(request.processor, request)
else
requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, response)))

error("Error when handling request %s".format(request.body), e)
}
} finally
request.apiLocalCompleteTimeMs = SystemTime.milliseconds
}

下面我们以「follower fetch」为例,梳理一下KafkaApi侧完成一次请求需要的流程:

验证fetch请求合法性

  • 定义回调函数sendResponseCallback,重新定义了fetch请求的response结构和send结构
  • 转到replica组件继续执行fetch请求,同时传入sendResponseCallback等结束调用生成response
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
/**
* Handle a fetch request
*/
def handleFetchRequest(request: RequestChannel.Request) {
val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]

// 1. 验证请求合法性,
val (authorizedRequestInfo, unauthorizedRequestInfo) = fetchRequest.requestInfo.partition {
case (topicAndPartition, _) => authorize(request.session, Read, new Resource(Topic, topicAndPartition.topic))
}

val unauthorizedPartitionData = unauthorizedRequestInfo.mapValues { _ =>
FetchResponsePartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, MessageSet.Empty)
}

// the callback for sending a fetch response
// 2. 定义回调函数来生成fetch response
def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) {

val convertedPartitionData =
// Need to down-convert message when consumer only takes magic value 0.
if (fetchRequest.versionId <= 1) {
responsePartitionData.map { case (tp, data) =>

// We only do down-conversion when:
// 1. The message format version configured for the topic is using magic value > 0, and
// 2. The message set contains message whose magic > 0
// This is to reduce the message format conversion as much as possible. The conversion will only occur
// when new message format is used for the topic and we see an old request.
// Please note that if the message format is changed from a higher version back to lower version this
// test might break because some messages in new message format can be delivered to consumers before 0.10.0.0
// without format down conversion.
val convertedData = if (replicaManager.getMessageFormatVersion(tp).exists(_ > Message.MagicValue_V0) &&
!data.messages.isMagicValueInAllWrapperMessages(Message.MagicValue_V0)) {
trace(s"Down converting message to V0 for fetch request from ${fetchRequest.clientId}")
new FetchResponsePartitionData(data.error, data.hw, data.messages.asInstanceOf[FileMessageSet].toMessageFormat(Message.MagicValue_V0))
} else data

tp -> convertedData
}
} else responsePartitionData

val mergedPartitionData = convertedPartitionData ++ unauthorizedPartitionData

mergedPartitionData.foreach { case (topicAndPartition, data) =>
if (data.error != Errors.NONE.code)
debug(s"Fetch request with correlation id ${fetchRequest.correlationId} from client ${fetchRequest.clientId} " +
s"on partition $topicAndPartition failed due to ${Errors.forCode(data.error).exceptionName}")
// record the bytes out metrics only when the response is being sent
BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesOutRate.mark(data.messages.sizeInBytes)
BrokerTopicStats.getBrokerAllTopicsStats().bytesOutRate.mark(data.messages.sizeInBytes)
}

// 生成response数据的地方
def fetchResponseCallback(delayTimeMs: Int) {
trace(s"Sending fetch response to client ${fetchRequest.clientId} of " +
s"${convertedPartitionData.values.map(_.messages.sizeInBytes).sum} bytes")
// 自己重新实现了Response
val response = FetchResponse(fetchRequest.correlationId, mergedPartitionData, fetchRequest.versionId, delayTimeMs)
// 没有用ByteBuffer的Send操作,重写了一套send方法,大概是多路写提高效率?
requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response)))
}


// When this callback is triggered, the remote API call has completed
request.apiRemoteCompleteTimeMs = SystemTime.milliseconds

// Do not throttle replication traffic
if (fetchRequest.isFromFollower) {
fetchResponseCallback(0)
} else {
quotaManagers(ApiKeys.FETCH.id).recordAndMaybeThrottle(fetchRequest.clientId,
FetchResponse.responseSize(mergedPartitionData.groupBy(_._1.topic),
fetchRequest.versionId),
fetchResponseCallback)
}
}

if (authorizedRequestInfo.isEmpty)
sendResponseCallback(Map.empty)
else {
// 3. call the replica manager to fetch messages from the local replica
replicaManager.fetchMessages(
fetchRequest.maxWait.toLong,
fetchRequest.replicaId,
fetchRequest.minBytes,
authorizedRequestInfo,
sendResponseCallback)
}
}

数据接口

整个请求链路中,存在两个对外的数据接口:Request和Response,分别对应着请求链路的入口和出口。

request

生成request时候需要传入processor和connectId信息,因为response是根据request生成的,生成好后需要根据request的processor信息将response放到相同processor的responseQueue中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

// request和processor绑定,因为相应的response也要回到相同的processor上
case class Request(processor: Int, connectionId: String, session: Session, private var buffer: ByteBuffer, startTimeMs: Long, securityProtocol: SecurityProtocol) {
// These need to be volatile because the readers are in the network thread and the writers are in the request
// handler threads or the purgatory threads
// 记录一个请求不同阶段的耗,在调用updateRequestMetrics函数前,记录的是毫秒级别的时间戳
// updateRequestMetrics会在processor执行processNewResponses函数时统一调用,在计算出正确的结果
@volatile var requestDequeueTimeMs = -1L
@volatile var apiLocalCompleteTimeMs = -1L
@volatile var responseCompleteTimeMs = -1L
@volatile var responseDequeueTimeMs = -1L
@volatile var apiRemoteCompleteTimeMs = -1L

val requestId = buffer.getShort()

// TODO: this will be removed once we migrated to client-side format
// for server-side request / response format
// NOTE: this map only includes the server-side request/response handlers. Newer
// request types should only use the client-side versions which are parsed with
// o.a.k.common.requests.AbstractRequest.getRequest()
private val keyToNameAndDeserializerMap: Map[Short, (ByteBuffer) => RequestOrResponse]=
Map(ApiKeys.FETCH.id -> FetchRequest.readFrom,
ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ControlledShutdownRequest.readFrom
)

// TODO: this will be removed once we migrated to client-side format
val requestObj =
keyToNameAndDeserializerMap.get(requestId).map(readFrom => readFrom(buffer)).orNull

// if we failed to find a server-side mapping, then try using the
// client-side request / response format
val header: RequestHeader =
if (requestObj == null) {
buffer.rewind
try RequestHeader.parse(buffer)
catch {
case ex: Throwable =>
throw new InvalidRequestException(s"Error parsing request header. Our best guess of the apiKey is: $requestId", ex)
}
} else
null
val body: AbstractRequest =
if (requestObj == null)
try {
// For unsupported version of ApiVersionsRequest, create a dummy request to enable an error response to be returned later
if (header.apiKey == ApiKeys.API_VERSIONS.id && !Protocol.apiVersionSupported(header.apiKey, header.apiVersion))
new ApiVersionsRequest
else
AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer)
} catch {
case ex: Throwable =>
throw new InvalidRequestException(s"Error getting request for apiKey: ${header.apiKey} and apiVersion: ${header.apiVersion}", ex)
}
else
null

buffer = null
private val requestLogger = Logger.getLogger("kafka.request.logger")

trace("Processor %d received request : %s".format(processor, requestDesc(true)))
}

Response

1
2
3
4
5
6
7
8
9
10
11
12
13
14
case class Response(processor: Int, request: Request, responseSend: Send, responseAction: ResponseAction) {
request.responseCompleteTimeMs = SystemTime.milliseconds

def this(processor: Int, request: Request, responseSend: Send) =
this(processor, request, responseSend, if (responseSend == null) NoOpAction else SendAction)

def this(request: Request, send: Send) =
this(request.processor, request, send)
}

trait ResponseAction
case object SendAction extends ResponseAction
case object NoOpAction extends ResponseAction
case object CloseConnectionAction extends ResponseAction

与常见的Response有些不同,Kafka的Response不关注返回数据本身,而是关注数据如何写入socket channel,所以response主要保存的是一个函数接口send,这个接口描述了response数据以何种方式写入到channel中。processor收到这个response时,也只是将其传入NIO的队列,等待NIO网络线程想要执行发送操作时,才会调用writeTo函数,按照上层定义的方式写入数据。Send接口的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* This interface models the in-progress sending of data to a destination identified by an integer id.
*/
public interface Send {

/**
* The numeric id for the destination of this send
*/
public String destination();

/**
* Is this send complete?
*/
public boolean completed();

/**
* Write some as-yet unwritten bytes from this send to the provided channel. It may take multiple calls for the send
* to be completely written
* @param channel The Channel to write to
* @return The number of bytes written
* @throws IOException If the write fails
*/
public long writeTo(GatheringByteChannel channel) throws IOException;

/**
* Size of the send
*/
public long size();

}

后续版本变动

KIP-291
Kafka-4453

0.10.0版本下,所有的控制流请求和数据流请求都在同一个request queue,如果拥堵,会导致控制流请求没法得到及时更新。所以后续的社区更新将两个流拆分成两个独立的链路,从endpoint、acceptor、processor、handler、requestQueue、APIs等,都互不干扰。当然,两者占有的资源并不是等同的,毕竟控制流请求相对会更少,一般会配备1个acceptor线程、1个processor线程和20个位置requestQueue

尾记

文本通过源码阅读的方式,简略梳理了broker侧的请求处理链路,但仍有一些问题遗留:

  • 为什么acceptor、processor以普通用户线程运行,而handler则是以daemon线程运行
  • 网络层selector仍是一个黑盒子,Kafka在Java network NIO的基础上进行了一些封装。
  • 如果界定一个请求的边界,怎么定义一个请求的收到
  • 为什么会有组件自己实现Send接口,而不是简单地使用ByteNetworkSend

reference