当前位置: 首页 > news >正文

asp网站部署百度访问量统计

asp网站部署,百度访问量统计,花溪区生态文明建设局网站,网站怎么做宣传先看有哪些send方法 首先说红圈的 有3个红圈。归类成3种发送方式。假设前提条件,发送的topic,有3个broker,每个broker总共4个write队列,总共有12个队列。 普通发送。负载均衡12个队列。指定超时时间指定MessageQueue,发送&#…

先看有哪些send方法

在这里插入图片描述

首先说红圈的

有3个红圈。归类成3种发送方式。假设前提条件,发送的topic,有3个broker,每个broker总共4个write队列,总共有12个队列。

  • 普通发送。负载均衡12个队列。指定超时时间
  • 指定MessageQueue,发送,指定超时时间
  • 指定selector器,指定特定参数,指定超时时间。一般用于局部有序,比如相同userId的,到同一个队列

默认超时时间时3秒

再说蓝圈

  • sendDefaultImpl 负载均衡的方式,选择队列。然后调sendKernelImpl
  • sendSelectImpl 指定队列selector和arg的方式,选择队列。然后调sendKernelImpl
  • sendKernelImpl 最核心的方式。这里已经明确队列,做真实的消息发送

很明显,只需要简单解读sendDefaultImpl和sendSelectImpl如何选择队列。然后重点在于查看sendKernelImpl方法实现

sendDefaultImpl选择队列分析

先看源码

private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID = random.nextLong();long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;long endTimestamp = beginTimestampFirst;TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {boolean callTimeout = false;MessageQueue mq = null;Exception exception = null;SendResult sendResult = null;int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;String[] brokersSent = new String[timesTotal];for (; times < timesTotal; times++) {String lastBrokerName = null == mq ? null : mq.getBrokerName();MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);if (mqSelected != null) {mq = mqSelected;brokersSent[times] = mq.getBrokerName();try {beginTimestampPrev = System.currentTimeMillis();if (times > 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) {callTimeout = true;break;}sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {case ASYNC:return null;case ONEWAY:return null;case SYNC:if (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl

第一步,通过topic查找路由信息tryToFindTopicPublishInfo
先从内存中获取。内存是DefaultMQProducerImpl#topicPublishInfoTable
如果内存没有,则从nameserver获取
org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String)

内存是什么时候添加的呢?是有定时器任务更新的。详情看我写的文章rocketmq-push模式-消费侧重平衡-类流程图分析

第二步、设定默认重试3次(包含首次),选择topic的其中一个队列
org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {if (lastBrokerName == null) {return selectOneMessageQueue();} else {for (int i = 0; i < this.messageQueueList.size(); i++) {int index = this.sendWhichQueue.incrementAndGet();int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0)pos = 0;MessageQueue mq = this.messageQueueList.get(pos);if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}}return selectOneMessageQueue();}
}

可以发现,topic对应的TopicPublishInfo,维护者一个ThreadLocalIndex对象。
每个线程先会获取一个index,然后对index取模,得到某一个队列。
这意味着,sendDefaultImpl中,队列的负载均衡是线程独立的。每个线程维护着自己的index,每发送一次,index+1。

public int incrementAndGet() {Integer index = this.threadLocalIndex.get();if (null == index) {index = Math.abs(random.nextInt());this.threadLocalIndex.set(index);}this.threadLocalIndex.set(++index);return Math.abs(index & POSITIVE_MASK);}

第三步、选择完MessageQueue后,调用sendKernelImpl发送消息

sendSelectImpl选择队列分析

先看源码

private SendResult sendSelectImpl(Message msg,MessageQueueSelector selector,Object arg,final CommunicationMode communicationMode,final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {MessageQueue mq = null;try {List<MessageQueue> messageQueueList =mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());Message userMessage = MessageAccessor.cloneMessage(msg);String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());userMessage.setTopic(userTopic);mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));} catch (Throwable e) {throw new MQClientException("select message queue threw exception.", e);}long costTime = System.currentTimeMillis() - beginStartTime;if (timeout < costTime) {throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");}if (mq != null) {return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);} else {throw new MQClientException("select message queue return null.", null);}}validateNameServerSetting();throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);}

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl

第一步,通过topic查找路由信息tryToFindTopicPublishInfo。分析同上
第二步,通过MessageQueueSelector,找出发送的MessageQueue
MessageQueueSelector的实现方式,可以自定义。提供了2种
SelectMessageQueueByRandom 随机一个
SelectMessageQueueByHash 根据arg的hashcode取模一个。适合局部有序

public class SelectMessageQueueByHash implements MessageQueueSelector {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {int value = arg.hashCode() % mqs.size();if (value < 0) {value = Math.abs(value);}return mqs.get(value);}
}

第三步、选择完MessageQueue后,调用sendKernelImpl发送消息

sendKernelImpl发送分析

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl
第一步、通过MessageQueue,获取对应的master节点地址
第二步、设置消息的唯一id。详情看以下实现。明显是客户端生成的,(由于不是分布式唯一ID的创建方式,有点怀疑会重复。后续查看)
org.apache.rocketmq.common.message.MessageClientIDSetter#createUniqID
第三步、对消息body做消息压缩
第四步、判断该消息是否是事务消息。给sysFlag位标志变量加标志
第五步、发送前可做一些自定义的检查CheckForbiddenHook、SendMessageHook
第六步、构建SendMessageRequestHeader requestHeader,将msg的一些内容设置到header上
第七部、根据发送模式communicationMode,调用不同的sendMessage方法
org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessage

switch (communicationMode) {case ASYNC:Message tmpMessage = msg;boolean messageCloned = false;if (msgBodyCompressed) {//If msg body was compressed, msgbody should be reset using prevBody.//Clone new message using commpressed message body and recover origin massage.//Fix bug:https://github.com/apache/rocketmq-externals/issues/66tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;msg.setBody(prevBody);}if (topicWithNamespace) {if (!messageCloned) {tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;}msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeAsync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),tmpMessage,requestHeader,timeout - costTimeAsync,communicationMode,sendCallback,topicPublishInfo,this.mQClientFactory,this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);break;case ONEWAY:case SYNC:long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);break;default:assert false;break;
}

第八步、最终会调用NettyRemotingClient的发送方法
SYNC:
org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync
ONEWAY:
org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeOneway
ASYNC:
org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeAsync

总结

product的发送有几种API模式,其实目的都是为了选择MessageQueue

  • 默认的发送,是根据topic的队列,做负载均衡的方式,topicPublishInfo内部维护着ThreadLocalIndex对象,做线程级别的负载均衡。而且默认都3次重试机会,意味可以选择不同队列做发送;
  • 指定messageQueue,是调用方明确知道发送的MessageQueue,这种失败不会做重试;
  • 指定MessageQueueSelector等,这种是通过传入的参数,计算出对应的MessageQueue,这种失败不会做重试,适合作为局部有序的发送方式

选择好队列后,就会调用org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl方法,主要是构建SendMessageRequestHeader,执行自定义的发送before和after的处理。
sendKernelImpl最终会调用NettyRemotingClient提供的接口,分别处理SYNC、ONEWAY、ASYNC的三种模式


文章转载自:
http://basketballer.pwmm.cn
http://intrepidly.pwmm.cn
http://multicolor.pwmm.cn
http://somnambule.pwmm.cn
http://southwestern.pwmm.cn
http://lanceolar.pwmm.cn
http://clerk.pwmm.cn
http://expectative.pwmm.cn
http://revealment.pwmm.cn
http://dunite.pwmm.cn
http://urediospore.pwmm.cn
http://ghastfulness.pwmm.cn
http://vmtp.pwmm.cn
http://clothespin.pwmm.cn
http://isomorphism.pwmm.cn
http://adventure.pwmm.cn
http://sleeve.pwmm.cn
http://womanish.pwmm.cn
http://narcotherapy.pwmm.cn
http://mesocolon.pwmm.cn
http://kinetophonograph.pwmm.cn
http://bedizen.pwmm.cn
http://smudgy.pwmm.cn
http://euhemeristically.pwmm.cn
http://introspective.pwmm.cn
http://palaeethnology.pwmm.cn
http://play.pwmm.cn
http://primogeniturist.pwmm.cn
http://omar.pwmm.cn
http://diphonia.pwmm.cn
http://mend.pwmm.cn
http://receptaculum.pwmm.cn
http://hereditable.pwmm.cn
http://heptaglot.pwmm.cn
http://normalize.pwmm.cn
http://bathetic.pwmm.cn
http://unfair.pwmm.cn
http://ceroplastic.pwmm.cn
http://etymologist.pwmm.cn
http://mongolism.pwmm.cn
http://gley.pwmm.cn
http://vector.pwmm.cn
http://silicidize.pwmm.cn
http://schema.pwmm.cn
http://jeux.pwmm.cn
http://nachtlokal.pwmm.cn
http://cornettist.pwmm.cn
http://riven.pwmm.cn
http://spurry.pwmm.cn
http://fleetly.pwmm.cn
http://lengthwise.pwmm.cn
http://camerlengo.pwmm.cn
http://reshape.pwmm.cn
http://millepore.pwmm.cn
http://enframe.pwmm.cn
http://harvesttime.pwmm.cn
http://leftward.pwmm.cn
http://market.pwmm.cn
http://inlander.pwmm.cn
http://moviedom.pwmm.cn
http://dowser.pwmm.cn
http://lordliness.pwmm.cn
http://lumpish.pwmm.cn
http://bulbil.pwmm.cn
http://criticastry.pwmm.cn
http://mechanise.pwmm.cn
http://era.pwmm.cn
http://messidor.pwmm.cn
http://exorable.pwmm.cn
http://tidier.pwmm.cn
http://victoria.pwmm.cn
http://biocoenosis.pwmm.cn
http://tirade.pwmm.cn
http://achromaticity.pwmm.cn
http://boned.pwmm.cn
http://outworker.pwmm.cn
http://calfhood.pwmm.cn
http://linecut.pwmm.cn
http://scrivener.pwmm.cn
http://sothic.pwmm.cn
http://crushmark.pwmm.cn
http://leotard.pwmm.cn
http://salford.pwmm.cn
http://predestinate.pwmm.cn
http://palpitate.pwmm.cn
http://shittah.pwmm.cn
http://cortes.pwmm.cn
http://tweedy.pwmm.cn
http://uintahite.pwmm.cn
http://hotchkiss.pwmm.cn
http://misdoubt.pwmm.cn
http://constructor.pwmm.cn
http://impo.pwmm.cn
http://jokey.pwmm.cn
http://electee.pwmm.cn
http://elastivity.pwmm.cn
http://saucebox.pwmm.cn
http://eider.pwmm.cn
http://mining.pwmm.cn
http://haggard.pwmm.cn
http://www.dt0577.cn/news/87109.html

相关文章:

  • 网站seo注意事项自助建站网站
  • 卓成建设集团有限公司网站专业关键词排名软件
  • 网站建设价格请咨询兴田德润营销软文范文
  • 企业网站新模式广州知名网络推广公司
  • 门户网站的测试方法b站在哪付费推广
  • frontpage做网站青岛网站快速排名提升
  • 哪些网站可以直接做英文字谜网页推广方案
  • 软件网站是怎么做的吗谷歌搜索引擎营销
  • 梧州做网站建设html网站模板免费
  • 如何进行网站管理百度游戏风云榜
  • 网站换了服务器seo站长
  • 一个dede管理两个网站发稿网
  • 企业门户网站建设 北京互联网app推广具体怎么做
  • 青浦b2c网站制作价格百度下载免费
  • 做网站标题图片大小连云港seo
  • 做外贸大大小小的网站有哪些我国的网络营销公司
  • 做网站的创始人网络营销渠道策略研究
  • 为什么要做网站网络营销和网络推广有什么区别
  • 深圳专业网站建设制作价格低百度seo引流怎么做
  • 群晖 nas 做网站百度seo公司报价
  • 网站建设需求书打开百度网页版
  • 网站做的支付宝接口吗百度快照排名
  • 内蒙古有做购物网站的吗百度云网盘资源链接
  • 性男女做视频网站抖音seo推荐算法
  • 网站做系统叫什么成都网站快速排名
  • java如何进行网站开发信息流广告文案
  • 广州建设交易中心网站怎么推广自己的网站?
  • 哪个网站的地图可以做分析图互联网企业营销策略
  • 网站推广的预算百度站点
  • 不改变网站怎么做关键词优化营销方式和渠道