class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup { /** * Start up API for bringing up a single instance of the Kafka server. * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers */ def startup() { try { info("starting")
if(isShuttingDown.get) throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
if(startupComplete.get) return
val canStartup = isStartingUp.compareAndSet(false, true) if (canStartup) { metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime, true)
class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time) extends Logging with KafkaMetricsGroup {
private val endpoints = config.listeners // "listeners" broker监听的端口列表 private val numProcessorThreads = config.numNetworkThreads // "num.network.threads" 配置里设置的网络线程数N private val maxQueuedRequests = config.queuedMaxRequests // "queued.max.requests" 请求队列的长度 private val totalProcessorThreads = numProcessorThreads * endpoints.size // 总共的网络线程数,一个endpoint会有N个网络线程
private val maxConnectionsPerIp = config.maxConnectionsPerIp // "max.connections.per.ip" 每个ip最大的连接数 private val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides // "max.connections.per.ip.overrides" ?
this.logIdent = "[Socket Server on Broker " + config.brokerId + "], "
val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests) // 连接SocketServer和KafkaAPi的逻辑channel,存放两种关键队列:requestQueue、responseQueue private val processors = new Array[Processor](totalProcessorThreads) // 所有的网络线程组成一个Array,每N个网络线程属于同一个endpoint
private[network] val acceptors = mutable.Map[EndPoint, Acceptor]() // acceptor线程 private var connectionQuotas: ConnectionQuotas = _ // 》
private val allMetricNames = (0 until totalProcessorThreads).map { i => val tags = new util.HashMap[String, String]() tags.put("networkProcessor", i.toString) metrics.metricName("io-wait-ratio", "socket-server-metrics", tags) } // 每个processor的io-ratio统计指标
// register the processor threads for notification of responses requestChannel.addResponseListener(id => processors(id).wakeup()) }
class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time) extends Logging with KafkaMetricsGroup { def startup() { this.synchronized { connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides) // 连接数计数
val sendBufferSize = config.socketSendBufferBytes // "socket.send.buffer.bytes" 发送缓存区大小 val recvBufferSize = config.socketReceiveBufferBytes // "socket.receive.buffer.bytes" 接受缓存区大小 val brokerId = config.brokerId // "broker.id"
var processorBeginIndex = 0 // 一个endpoint对应一个acceptor,一个acceptor对应N个processor endpoints.values.foreach { endpoint => val protocol = endpoint.protocolType val processorEndIndex = processorBeginIndex + numProcessorThreads
for (i <- processorBeginIndex until processorEndIndex) { // 初始化Processor线程,主要是初始化Selector processors(i) = newProcessor(i, connectionQuotas, protocol) }
class KafkaRequestHandlerPool(val brokerId: Int, val requestChannel: RequestChannel, val apis: KafkaApis, numThreads: Int) extends Logging with KafkaMetricsGroup {
/* a meter to track the average free capacity of the request handlers */ // handler的idle percent指标是聚合后的结果 private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], " // TODO 弄清楚threads和runnables的区别 val threads = new Array[Thread](numThreads) // 后台启动的handler线程 val runnables = new Array[KafkaRequestHandler](numThreads) // 存放handler线程 for(i <- 0 until numThreads) { runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis) // 以Daemon方式启动线程 threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i)) threads(i).start() }
def run() { // 注册OP_ACCEPT事件,当有新的客户端连接到来时,事件触发 // 类似的事件还有OP_CONNECT,但是OP_CONNECT只能给client端使用,语义:SocketChannel.connect()请求连接成功后就绪 serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) startupComplete() // 计数减1,当计数为0时唤醒所有的await,在这里执行成功之后SocketServer.startup()被唤醒继续执行 try { var currentProcessor = 0 while (isRunning) { try { val ready = nioSelector.select(500) if (ready > 0) { // 如果有新的连接 val keys = nioSelector.selectedKeys() val iter = keys.iterator() while (iter.hasNext && isRunning) { try { val key = iter.next iter.remove() if (key.isAcceptable) // round-robin,processor只是简单地将该连接的channel加到一个list中,等到下一次poll在更新 accept(key, processors(currentProcessor)) else throw new IllegalStateException("Unrecognized key state for acceptor thread.")
// round robin to the next processor thread currentProcessor = (currentProcessor + 1) % processors.length } catch { case e: Throwable => error("Error while accepting connection", e) } } } } catch { // We catch all the throwables to prevent the acceptor thread from exiting on exceptions due // to a select operation on a specific channel or a bad request. We don't want the // the broker to stop responding to requests from other clients in these scenarios. case e: ControlThrowable => throw e case e: Throwable => error("Error occurred", e) } } } finally { debug("Closing server socket and selector.") swallowError(serverChannel.close()) swallowError(nioSelector.close()) shutdownComplete() } }
debug("Accepted connection from %s on %s and assigned it to processor %d, sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]" .format(socketChannel.socket.getRemoteSocketAddress, socketChannel.socket.getLocalSocketAddress, processor.id, socketChannel.socket.getSendBufferSize, sendBufferSize, socketChannel.socket.getReceiveBufferSize, recvBufferSize))
processor.accept(socketChannel) // 将channel转移给processor处理,processor只是简单的加到一个队列中,等待下一轮poll才会真正处理心连接 } catch { case e: TooManyConnectionsException => info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count)) close(socketChannel) } }
override def run() { startupComplete() // 已经启动,计数器减一,不过暂时没有地方等待processor的启动成功 while (isRunning) { try { // setup any new connections that have been queued up // 1.首先注册上一轮poll期间到达的新连接 configureNewConnections() // register any new responses for writing processNewResponses() // 2. 处理这个processor的新的response任务 poll() // 3.selector.poll(300),如果有就绪消息事件,或者超时事件发生,否则会一直block processCompletedReceives() // 4.处理完整接收好的请求,加入到requestQueue,这一步会因为竞争被阻塞 processCompletedSends() // 5.更新inflightResponses队列,将发送完的response从队列中删除 processDisconnected() // 6.有连接关闭,清理inflightResponses队列中相关的response } catch { // We catch all the throwables here to prevent the processor thread from exiting. We do this because // letting a processor exit might cause a bigger impact on the broker. Usually the exceptions thrown would // be either associated with a specific socket channel or a bad request. We just ignore the bad socket channel // or request. This behavior might need to be reviewed if we see an exception that need the entire broker to stop. case e: ControlThrowable => throw e case e: Throwable => error("Processor got uncaught exception.", e) } }
private def configureNewConnections() { while (!newConnections.isEmpty) { val channel = newConnections.poll() try { debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}") val localHost = channel.socket().getLocalAddress.getHostAddress val localPort = channel.socket().getLocalPort val remoteHost = channel.socket().getInetAddress.getHostAddress val remotePort = channel.socket().getPort // 生成唯一的connectionId val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString // 向selector注册channel的OP_READ事件 selector.register(connectionId, channel) } catch { // We explicitly catch all non fatal exceptions and close the socket to avoid a socket leak. The other // throwables will be caught in processor and logged as uncaught exceptions. case NonFatal(e) => // need to close the channel here to avoid a socket leak. close(channel) error(s"Processor $id closed connection from ${channel.getRemoteAddress}", e) } } }
private def processNewResponses() { var curr = requestChannel.receiveResponse(id) while (curr != null) { try { curr.responseAction match { case RequestChannel.NoOpAction => // There is no response to send to the client, we need to read more pipelined requests // that are sitting in the server's socket buffer curr.request.updateRequestMetrics trace("Socket server received empty response to send, registering for read: " + curr) // 请求不需要回复,已经处理完,unmute channel selector.unmute(curr.request.connectionId) case RequestChannel.SendAction => // 将response发送给selector sendResponse(curr) case RequestChannel.CloseConnectionAction => curr.request.updateRequestMetrics trace("Closing socket connection actively according to the response code.") close(selector, curr.request.connectionId) } } finally { curr = requestChannel.receiveResponse(id) // 循环迭代 } } }
/* `protected` for test usage */ protected[network] def sendResponse(response: RequestChannel.Response) { trace(s"Socket server received response to send, registering for write and sending data: $response") val channel = selector.channel(response.responseSend.destination) // `channel` can be null if the selector closed the connection because it was idle for too long if (channel == null) { // 连接关闭了 warn(s"Attempting to send response via channel for which there is no open connection, connection id $id") response.request.updateRequestMetrics() } else { // 将response发送给selector,然后将其加入到inflightResponses队列 selector.send(response.responseSend) inflightResponses += (response.request.connectionId -> response) } }
private def poll() { try selector.poll(300) //在这里沉睡 catch { case e @ (_: IllegalStateException | _: IOException) => error(s"Closing processor $id due to illegal state or IO exception") swallow(closeAll()) shutdownComplete() throw e } }
private def processCompletedReceives() { selector.completedReceives.asScala.foreach { receive => try { val channel = selector.channel(receive.source) val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress) val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol) // 这里会阻塞,requestQueue是ArrayBlockingQueue requestChannel.sendRequest(req) // 为了保证同一个连接上请求的有序性:后来的请求的回复不会先于早来的到达,会mute这个channel,直到请求的回复被对方接受 selector.mute(receive.source) } catch { case e @ (_: InvalidRequestException | _: SchemaException) => // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier error(s"Closing socket for ${receive.source} because of error", e) close(selector, receive.source) } } }
处理发送成功的response
1 2 3 4 5 6 7 8 9 10 11
private def processCompletedSends() { selector.completedSends.asScala.foreach { send => // 将已经发送成功的response从inflightResponse中删除 val resp = inflightResponses.remove(send.destination).getOrElse { throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`") } resp.request.updateRequestMetrics() // 恢复channel的监听 selector.unmute(send.destination) } }
连接关闭收尾
处理两件事:
更新inflightResponse,将相关的response删除
更新ConnectionQuota
1 2 3 4 5 6 7 8 9 10 11 12
private def processDisconnected() { selector.disconnected.asScala.foreach { connectionId => val remoteHost = ConnectionId.fromString(connectionId).getOrElse { throw new IllegalStateException(s"connectionId has unexpected format: $connectionId") }.remoteHost // 删除所有和connectionId相关的response,同时更新metrics inflightResponses.remove(connectionId).foreach(_.request.updateRequestMetrics()) // the channel has been closed by the selector but the quotas still need to be updated // 更新Quota, connectionQuotas.dec(InetAddress.getByName(remoteHost)) } }
def run() { while(true) { try { var req : RequestChannel.Request = null while (req == null) { // We use a single meter for aggregate idle percentage for the thread pool. // Since meter is calculated as total_recorded_value / time_window and // time_window is independent of the number of threads, each recorded idle // time should be discounted by # threads. val startSelectTime = SystemTime.nanoseconds // receiveRequest有两个实现,还有一个无参实现,两者的区别在于调用ArrayBlockingQueue不同的接口 // 带timeout的调用queue.poll(timeout) ,如果没有数据等待timeout后返回空 // 无参的调用queue.take() 如果没有数据,则会挂起等待,直到有数据 req = requestChannel.receiveRequest(300) val idleTime = SystemTime.nanoseconds - startSelectTime aggregateIdleMeter.mark(idleTime / totalHandlerThreads) }
if(req eq RequestChannel.AllDone) { // 当上层关闭KafkaRequestHandlerPool,会依次调用每个handler的shutdown // 每个handler往RequestQueue里发送AllDone请求,通知run loop退出 debug("Kafka request handler %d on broker %d received shut down command".format( id, brokerId)) return } req.requestDequeueTimeMs = SystemTime.milliseconds trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req)) apis.handle(req) // dispatch,请求处理转移到KafkaApis,之后在转到请求对应的组件去 } catch { case e: Throwable => error("Exception when handling request", e) } } }
def handle(request: RequestChannel.Request) { try { trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s". format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal)) ApiKeys.forId(request.requestId) match { // 可以看到,所有的请求都是流经同一个request queue,没有优先级的区别 case ApiKeys.PRODUCE => handleProducerRequest(request) case ApiKeys.FETCH => handleFetchRequest(request) case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request) case ApiKeys.METADATA => handleTopicMetadataRequest(request) case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request) case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request) case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request) case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request) case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request) case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request) case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request) case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request) case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request) case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request) case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request) case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request) case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request) case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request) case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request) case requestId => throw new KafkaException("Unknown api code " + requestId) } } catch { case e: Throwable => if (request.requestObj != null) { request.requestObj.handleError(e, requestChannel, request) error("Error when handling request %s".format(request.requestObj), e) } else { val response = request.body.getErrorResponse(request.header.apiVersion, e) val respHeader = new ResponseHeader(request.header.correlationId)
/* If request doesn't have a default error response, we just close the connection. For example, when produce request has acks set to 0 */ if (response == null) requestChannel.closeConnection(request.processor, request) else requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, response)))
// the callback for sending a fetch response // 2. 定义回调函数来生成fetch response def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) {
val convertedPartitionData = // Need to down-convert message when consumer only takes magic value 0. if (fetchRequest.versionId <= 1) { responsePartitionData.map { case (tp, data) =>
// We only do down-conversion when: // 1. The message format version configured for the topic is using magic value > 0, and // 2. The message set contains message whose magic > 0 // This is to reduce the message format conversion as much as possible. The conversion will only occur // when new message format is used for the topic and we see an old request. // Please note that if the message format is changed from a higher version back to lower version this // test might break because some messages in new message format can be delivered to consumers before 0.10.0.0 // without format down conversion. val convertedData = if (replicaManager.getMessageFormatVersion(tp).exists(_ > Message.MagicValue_V0) && !data.messages.isMagicValueInAllWrapperMessages(Message.MagicValue_V0)) { trace(s"Down converting message to V0 for fetch request from ${fetchRequest.clientId}") new FetchResponsePartitionData(data.error, data.hw, data.messages.asInstanceOf[FileMessageSet].toMessageFormat(Message.MagicValue_V0)) } else data
val mergedPartitionData = convertedPartitionData ++ unauthorizedPartitionData
mergedPartitionData.foreach { case (topicAndPartition, data) => if (data.error != Errors.NONE.code) debug(s"Fetch request with correlation id ${fetchRequest.correlationId} from client ${fetchRequest.clientId} " + s"on partition $topicAndPartition failed due to ${Errors.forCode(data.error).exceptionName}") // record the bytes out metrics only when the response is being sent BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesOutRate.mark(data.messages.sizeInBytes) BrokerTopicStats.getBrokerAllTopicsStats().bytesOutRate.mark(data.messages.sizeInBytes) }
// 生成response数据的地方 def fetchResponseCallback(delayTimeMs: Int) { trace(s"Sending fetch response to client ${fetchRequest.clientId} of " + s"${convertedPartitionData.values.map(_.messages.sizeInBytes).sum} bytes") // 自己重新实现了Response val response = FetchResponse(fetchRequest.correlationId, mergedPartitionData, fetchRequest.versionId, delayTimeMs) // 没有用ByteBuffer的Send操作,重写了一套send方法,大概是多路写提高效率? requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response))) }
// When this callback is triggered, the remote API call has completed request.apiRemoteCompleteTimeMs = SystemTime.milliseconds
// Do not throttle replication traffic if (fetchRequest.isFromFollower) { fetchResponseCallback(0) } else { quotaManagers(ApiKeys.FETCH.id).recordAndMaybeThrottle(fetchRequest.clientId, FetchResponse.responseSize(mergedPartitionData.groupBy(_._1.topic), fetchRequest.versionId), fetchResponseCallback) } }
if (authorizedRequestInfo.isEmpty) sendResponseCallback(Map.empty) else { // 3. call the replica manager to fetch messages from the local replica replicaManager.fetchMessages( fetchRequest.maxWait.toLong, fetchRequest.replicaId, fetchRequest.minBytes, authorizedRequestInfo, sendResponseCallback) } }
// request和processor绑定,因为相应的response也要回到相同的processor上 case class Request(processor: Int, connectionId: String, session: Session, private var buffer: ByteBuffer, startTimeMs: Long, securityProtocol: SecurityProtocol) { // These need to be volatile because the readers are in the network thread and the writers are in the request // handler threads or the purgatory threads // 记录一个请求不同阶段的耗,在调用updateRequestMetrics函数前,记录的是毫秒级别的时间戳 // updateRequestMetrics会在processor执行processNewResponses函数时统一调用,在计算出正确的结果 @volatile var requestDequeueTimeMs = -1L @volatile var apiLocalCompleteTimeMs = -1L @volatile var responseCompleteTimeMs = -1L @volatile var responseDequeueTimeMs = -1L @volatile var apiRemoteCompleteTimeMs = -1L
val requestId = buffer.getShort()
// TODO: this will be removed once we migrated to client-side format // for server-side request / response format // NOTE: this map only includes the server-side request/response handlers. Newer // request types should only use the client-side versions which are parsed with // o.a.k.common.requests.AbstractRequest.getRequest() private val keyToNameAndDeserializerMap: Map[Short, (ByteBuffer) => RequestOrResponse]= Map(ApiKeys.FETCH.id -> FetchRequest.readFrom, ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ControlledShutdownRequest.readFrom )
// TODO: this will be removed once we migrated to client-side format val requestObj = keyToNameAndDeserializerMap.get(requestId).map(readFrom => readFrom(buffer)).orNull
// if we failed to find a server-side mapping, then try using the // client-side request / response format val header: RequestHeader = if (requestObj == null) { buffer.rewind try RequestHeader.parse(buffer) catch { case ex: Throwable => throw new InvalidRequestException(s"Error parsing request header. Our best guess of the apiKey is: $requestId", ex) } } else null val body: AbstractRequest = if (requestObj == null) try { // For unsupported version of ApiVersionsRequest, create a dummy request to enable an error response to be returned later if (header.apiKey == ApiKeys.API_VERSIONS.id && !Protocol.apiVersionSupported(header.apiKey, header.apiVersion)) new ApiVersionsRequest else AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer) } catch { case ex: Throwable => throw new InvalidRequestException(s"Error getting request for apiKey: ${header.apiKey} and apiVersion: ${header.apiVersion}", ex) } else null
buffer = null private val requestLogger = Logger.getLogger("kafka.request.logger")
trace("Processor %d received request : %s".format(processor, requestDesc(true))) }
Response
1 2 3 4 5 6 7 8 9 10 11 12 13 14
case class Response(processor: Int, request: Request, responseSend: Send, responseAction: ResponseAction) { request.responseCompleteTimeMs = SystemTime.milliseconds
/** * This interface models the in-progress sending of data to a destination identified by an integer id. */ public interface Send {
/** * The numeric id for the destination of this send */ public String destination();
/** * Is this send complete? */ public boolean completed();
/** * Write some as-yet unwritten bytes from this send to the provided channel. It may take multiple calls for the send * to be completely written * @param channel The Channel to write to * @return The number of bytes written * @throws IOException If the write fails */ public long writeTo(GatheringByteChannel channel) throws IOException;