当前位置: 首页 > news >正文

怎么做美瞳网站津seo快速排名

怎么做美瞳网站,津seo快速排名,苹果个人开发者账号注册,设计之家室内设计整体架构如下所示: responseQueue不在RequestChannel中,在Processor中,每个Processor内部有一个responseQueue 客户端发送的请求被Acceptor转发给Processor处理处理器将请求放到RequestChannel的requestQueue中KafkaRequestHandler取出reque…

整体架构如下所示:

在这里插入图片描述

responseQueue不在RequestChannel中,在Processor中,每个Processor内部有一个responseQueue

  1. 客户端发送的请求被Acceptor转发给Processor处理
  2. 处理器将请求放到RequestChannel的requestQueue中
  3. KafkaRequestHandler取出requestQueue中的请求
  4. 调用KafkaApis进行业务逻辑处理
  5. KafkaApis将响应结果放到对应的Processor的responseQueue中
  6. processor从responseQueue中取出响应结果
  7. processor将响应结果返回给客户端

KafkaServer是Kafka服务端的主类,KafkaServer中和网络成相关的服务组件包括SocketServer、KafkaApis和KafkaRequestHandlerPool。SocketServer主要关注网络层的通信协议,具体的业务处理逻辑则交给KafkaRequestHandler和KafkaApis来完成。

class KafkaServer(val config: KafkaConfig) {def startup() {socketServer = new SocketServer(config, metrics, time, credentialProvider)socketServer.startup(startupProcessors = false)/* start processing requests */apis = new KafkaApis(socketServer.requestChannel, ...)requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, ...)}}

SocketServer

  def startup(startupProcessors: Boolean = true) {this.synchronized {...createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)if (startupProcessors) {startProcessors()}}private def createAcceptorAndProcessors(processorsPerListener: Int,endpoints: Seq[EndPoint]): Unit = synchronized {...endpoints.foreach { endpoint =>...val acceptor = new Acceptor(endpoint, ...)addProcessors(acceptor, endpoint, processorsPerListener)KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()acceptor.awaitStartup()acceptors.put(endpoint, acceptor)}}

可以看出SocketServer.startup()中会根据listener的个数创建相同个数的acceptor,每个acceptor关联数个processor。这是一种典型的Reactor模式,acceptor负责与客户端建立连接,并将连接分发给processor,processor负责所分连接后续的所有读写交互。

Acceptor

  def run() {serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)startupComplete()try {var currentProcessor = 0while (isRunning) {try {val ready = nioSelector.select(500)if (ready > 0) {val keys = nioSelector.selectedKeys()val iter = keys.iterator()while (iter.hasNext && isRunning) {try {val key = iter.nextiter.remove()if (key.isAcceptable) {val processor = synchronized {currentProcessor = currentProcessor % processors.sizeprocessors(currentProcessor)}accept(key, processor)} elsethrow new IllegalStateException("Unrecognized key state for acceptor thread.")// round robin to the next processor thread, mod(numProcessors) will be done latercurrentProcessor = currentProcessor + 1} catch {case e: Throwable => error("Error while accepting connection", e)}}}}catch {// We catch all the throwables to prevent the acceptor thread from exiting on exceptions due// to a select operation on a specific channel or a bad request. We don't want// the broker to stop responding to requests from other clients in these scenarios.case e: ControlThrowable => throw ecase e: Throwable => error("Error occurred", e)}}} finally {debug("Closing server socket and selector.")CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)shutdownComplete()}}

上面是Acceptor的run()方法,可以看出,Acceptor在通道上注册了SelectionKey.OP_ACCEPT事件(OP_READ、OP_WRITE、OP_CONNECT、OP_ACCEPT,客户端监听OP_CONNECT事件,负责发起连接,服务端监听OP_CONNECT事件,负责建立连接),负责与客户端建立连接。并将建立的连接通过轮询的方式指派给processor。

Processor

每个Processor都会分到数个与客户端的连接。Processor的处理逻辑如下所示:

  override def run() {startupComplete()try {while (isRunning) {try {// 在新分到的客户端连接上注册OP_READ事件configureNewConnections()// 从responseQueue中取响应,赋值给KafkaChannel的send,等待poll时发送processNewResponses()// selector轮询各种事件,读取请求或者发送响应poll()// 封装selector.completedReceives中的请求,放入requestQueueprocessCompletedReceives()// 处理selector.completedSends响应(移除inflightResponses中的记录;执行响应的回调函数)processCompletedSends()processDisconnected()} catch {...}}} finally {...}}

Processor线程的名字中有kafka-network字样,可以通过jstack -l pid | grep kafka-network进行筛选。

KafkaRequestHandlerPool

KafkaServer会创建请求处理线程池(KafkaRequestHandlerPool),在请求处理线程池中会创建并启动多个请求处理线程(KafkaRequestHandler)。KafkaRequestHandler会获取RequestChannel.requestQueue中的请求进行处理,在内部实际处理会交给KafkaApis完成。

class KafkaRequestHandlerPool(val brokerId: Int, ...) {...for (i <- 0 until numThreads) {createHandler(i)}def createHandler(id: Int): Unit = synchronized {runnables += new KafkaRequestHandler(..., requestChannel, apis, time)KafkaThread.daemon("kafka-request-handler-" + id, runnables(id)).start()}
}

KafkaRequestHandler的run()方法如下:

class KafkaRequestHandler(id: Int,...) extends Runnable with Logging {...def run() {while (!stopped) {val req = requestChannel.receiveRequest(300)req match {case RequestChannel.ShutdownRequest =>shutdownComplete.countDown()returncase request: RequestChannel.Request =>try {request.requestDequeueTimeNanos = endTimeapis.handle(request)} catch {case e: FatalExitError =>shutdownComplete.countDown()Exit.exit(e.statusCode)case e: Throwable => error("Exception when handling request", e)} finally {request.releaseBuffer()}case null => // continue}}shutdownComplete.countDown()}}
http://www.dt0577.cn/news/49118.html

相关文章:

  • 网站开发属于什么大学专业百度快照是干什么的
  • 北京市著名的网站制作公司网站关键词优化公司哪家好
  • 网站建设明细报价单seo网站优化推广教程
  • 网站改版新闻360识图
  • 做淘宝客网站违法吗一链一网一平台
  • 什么对网站建设起到计划和指导作用体彩足球竞彩比赛结果韩国比分
  • 做网站买什么笔记本好东莞公司网上推广
  • 网站建设类公司可以拿哪些项目资金如何推广好一个产品
  • 做网站封面素材图兰州网络推广优化服务
  • 哈尔滨网站seocpa推广平台
  • 做画册的国外网站google搜索关键词热度
  • wordpress 记录访问ip网站建设网络推广seo
  • 手机号网站源码seo排名优化北京
  • 基于漏斗模型网站关键路径分析网文网站排名
  • 最好的免费的黄页网站百度快照
  • 客户管理系统crm360搜索引擎优化
  • 龙岩网站设计一般要多久网页模板设计
  • 现在一般做B2B类网站用vue搜索引擎优化seo信息
  • 网站ip做网站合肥网络推广服务
  • 淄博做网站公司有哪些免费搭建网站
  • 龙岗-网站建设深圳信科seo排名哪家正规
  • 贵阳网站建设seo公众号开发线上营销推广方法
  • 装修网app官网搜狗seo怎么做
  • 做 暧视频在线观看网站优化关键词排名外包
  • 怎样评价一个网站做的好与不好百度指数上多少就算热词
  • 购物网站怎么做樱桃bt磁力天堂
  • 自己如何做外贸公司网站小红书kol推广
  • 网站建设的公司系统规划方案b站怎么推广自己的视频
  • wordpress排除首页显示刷关键词排名seo
  • 做网站 没内容百度电话客服24小时人工