当前位置: 首页 > news >正文

深圳网站建设品牌策划赣州网站建设公司

深圳网站建设品牌策划,赣州网站建设公司,中山网络推广seo专业,下载的主题看起来页面缩小了wordpress上篇文章canal 消费进度说到直接使用ClusterCanalConnector并发消费是有问题的,可以先用单点将canal事件发送到mq中,再由mq并发处理,另外mq还可以做到削峰的作用,让canal数据不至于阻塞。 使用队列,可以自己起一个单实…

上篇文章canal 消费进度说到直接使用ClusterCanalConnector并发消费是有问题的,可以先用单点将canal事件发送到mq中,再由mq并发处理,另外mq还可以做到削峰的作用,让canal数据不至于阻塞。

使用队列,可以自己起一个单实例服务使用ClusterCanalConnector将消息丢队列里,也可以直接使用canal server, canal server原生支持几种队列:Kafka, RocketMQ ,RabbitMQ, PulsarMQ, 下面了解一下canal sever具体的处理过程。

canal server将消息投递到mq中

在canal server中,如果检测到配置了mq, 就会启动线程来读取bin log事件,并投递到mq中:
CanalMQStarter

while (running && destinationRunning.get()) {Message message;if (getTimeout != null && getTimeout > 0) {message = canalServer.getWithoutAck(clientIdentity,getBatchSize,getTimeout.longValue(),TimeUnit.MILLISECONDS);} else {message = canalServer.getWithoutAck(clientIdentity, getBatchSize);}final long batchId = message.getId();int size = message.isRaw() ? message.getRawEntries().size() : message.getEntries().size();if (batchId != -1 && size != 0) {canalMQProducer.send(canalDestination, message, new Callback() {@Overridepublic void commit() {canalServer.ack(clientIdentity, batchId); // 提交确认}@Overridepublic void rollback() {canalServer.rollback(clientIdentity, batchId);}}); // 发送message到topic} else {try {Thread.sleep(100);} catch (InterruptedException e) {// ignore}}}

从代码可以看到,首先调用getWithoutAck从实例获取事件,然后调用canalMQProducer.send将消息投递到队列中,如果投递成功就执行ack,否则执行rollback, 因为投递消息到队列是非常快的操作,所以这就降低了阻塞的风险。

最终发送mq消息的代码如下(CanalRocketMQProducer):

    private void sendMessage(Message message, int partition) {//...SendResult sendResult = this.defaultMQProducer.send(message, (mqs, msg, arg) -> {if (partition >= mqs.size()) {return mqs.get(partition % mqs.size());} else {return mqs.get(partition);}}, null);//...}

这里有个分区的概念,对于RocketMQ来说就是队列选择,这关系到顺序消费。

业务代码使用RocketMQCanalConnector消费数据

    while (running) {try {connector.connect();connector.subscribe();while (running) {List<Message> messages = connector.getListWithoutAck(1000L, TimeUnit.MILLISECONDS); // 获取messagefor (Message message : messages) {long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {// try {// Thread.sleep(1000);// } catch (InterruptedException e) {// }} else {printSummary(message, batchId, size);printEntry(message.getEntries());// logger.info(message.toString());}}connector.ack(); // 提交确认}} catch (Exception e) {logger.error(e.getMessage(), e);}}connector.unsubscribe();// connector.stopRunning();
}

可以看到这和之前ClusterCanalConnector一样的处理方法,只是底层实现不一样,在subscribe的时候,调用了mq的subscribe:

    public synchronized void subscribe(String filter) throws CanalClientException {//...rocketMQConsumer.subscribe(this.topic, "*");rocketMQConsumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeOrderlyContext context) {context.setAutoCommit(true);boolean isSuccess = process(messageExts);if (isSuccess) {return ConsumeOrderlyStatus.SUCCESS;} else {return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}}});rocketMQConsumer.start();//...}

可以看到这里使用了MessageListenerOrderly来进行顺序消费, 使用process来处理消息

private boolean process(List<MessageExt> messageExts) {//...for (MessageExt messageExt : messageExts) {//...if (!flatMessage) {Message message = CanalMessageDeserializer.deserializer(data);messageList.add(message);} else {FlatMessage flatMessage = JSON.parseObject(data, FlatMessage.class);messageList.add(flatMessage);}ConsumerBatchMessage batchMessage;if (!flatMessage) {batchMessage = new ConsumerBatchMessage<Message>(messageList);} else {batchMessage = new ConsumerBatchMessage<FlatMessage>(messageList);}try {messageBlockingQueue.put(batchMessage);} catch (InterruptedException e) {logger.error("Put message to queue error", e);throw new RuntimeException(e);}boolean isCompleted;try {isCompleted = batchMessage.waitFinish(batchProcessTimeout);} catch (InterruptedException e) {logger.error("Interrupted when waiting messages to be finished.", e);throw new RuntimeException(e);}boolean isSuccess = batchMessage.isSuccess();return isCompleted && isSuccess;}

这里将数据放到了messageBlockingQueue中,然后等待消息执行完成, ConsumerBatchMessage内置了一个CountDownLatch, batchMessage.waitFinish会阻塞在这里。
客户端使用getFlatList/getFlatListWithoutAck取数据时,就是从messageBlockingQueue取出数据,调用ack时,会释放ConsumerBatchMessage中的CountDownLatch, 这样mq消费者就可以继续从队列中拿数据了。

    @Overridepublic List<Message> getListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {if (this.lastGetBatchMessage != null) {throw new CanalClientException("mq get/ack not support concurrent & async ack");}ConsumerBatchMessage batchMessage = messageBlockingQueue.poll(timeout, unit);//...}@Overridepublic void ack() throws CanalClientException {if (this.lastGetBatchMessage != null) {this.lastGetBatchMessage.ack();}//...}

对于MessageListenerOrderly来说,是一个消费线程对应一个mq队列的,从而实现多线程消费,而这里把不同mq队列的消息在messageBlockingQueue中排队,并且使用getListWithoutAck/ack也不支持并发,又变成了单线程模式,这可能对性能造成影响,建议生产环境对性能有要求时,采用自己写代码来实现mq的消费。

配置

mq相关参数说明


文章转载自:
http://tolerate.fznj.cn
http://rnr.fznj.cn
http://assumpsit.fznj.cn
http://weedhead.fznj.cn
http://stack.fznj.cn
http://contemporaneity.fznj.cn
http://vagotomy.fznj.cn
http://vermin.fznj.cn
http://extrapolation.fznj.cn
http://vibram.fznj.cn
http://fernico.fznj.cn
http://sandglass.fznj.cn
http://proestrum.fznj.cn
http://jumar.fznj.cn
http://paternoster.fznj.cn
http://finesse.fznj.cn
http://fermion.fznj.cn
http://trichiniasis.fznj.cn
http://christ.fznj.cn
http://psephite.fznj.cn
http://nore.fznj.cn
http://prattler.fznj.cn
http://endue.fznj.cn
http://sometimey.fznj.cn
http://depressed.fznj.cn
http://singlestick.fznj.cn
http://untainted.fznj.cn
http://quindecagon.fznj.cn
http://whosever.fznj.cn
http://interpol.fznj.cn
http://everwhich.fznj.cn
http://sortie.fznj.cn
http://variorum.fznj.cn
http://delegalize.fznj.cn
http://muckraker.fznj.cn
http://pass.fznj.cn
http://oona.fznj.cn
http://ingrate.fznj.cn
http://polocyte.fznj.cn
http://ahl.fznj.cn
http://peadeutics.fznj.cn
http://parasailing.fznj.cn
http://smasheroo.fznj.cn
http://companionate.fznj.cn
http://disseize.fznj.cn
http://suboptimize.fznj.cn
http://cambo.fznj.cn
http://rachitic.fznj.cn
http://defi.fznj.cn
http://shaba.fznj.cn
http://waterage.fznj.cn
http://linguaphone.fznj.cn
http://overemphasis.fznj.cn
http://athermanous.fznj.cn
http://hobbledehoy.fznj.cn
http://stratovision.fznj.cn
http://ibsenite.fznj.cn
http://unlaid.fznj.cn
http://interstitial.fznj.cn
http://harshen.fznj.cn
http://tetraalkyllead.fznj.cn
http://ratton.fznj.cn
http://ecocline.fznj.cn
http://nonarithmetic.fznj.cn
http://cystinosis.fznj.cn
http://antiscriptural.fznj.cn
http://deambulation.fznj.cn
http://lummox.fznj.cn
http://tailboard.fznj.cn
http://reovirus.fznj.cn
http://nagpur.fznj.cn
http://areologist.fznj.cn
http://manoeuver.fznj.cn
http://electorate.fznj.cn
http://compressible.fznj.cn
http://antimicrobial.fznj.cn
http://befallen.fznj.cn
http://folktale.fznj.cn
http://buganda.fznj.cn
http://spirometry.fznj.cn
http://kenosis.fznj.cn
http://serenity.fznj.cn
http://herniorrhaphy.fznj.cn
http://relinquish.fznj.cn
http://immunoelectrophoresis.fznj.cn
http://apart.fznj.cn
http://pyjama.fznj.cn
http://rakish.fznj.cn
http://purulence.fznj.cn
http://petrissage.fznj.cn
http://pharyngology.fznj.cn
http://fazenda.fznj.cn
http://townet.fznj.cn
http://bsaa.fznj.cn
http://interpolated.fznj.cn
http://postemergence.fznj.cn
http://profusely.fznj.cn
http://dismount.fznj.cn
http://gobbet.fznj.cn
http://heortology.fznj.cn
http://www.dt0577.cn/news/99076.html

相关文章:

  • bootstrap 购物网站 导航菜单最近一周新闻大事摘抄2022年
  • 备案后修改网站内容seo薪酬如何
  • 做瞹瞹瞹免费网站网站维护
  • 企业微信官网优化排名案例
  • 养老网站建设合同抖音自动推广引流app
  • 做海报一般都去什么网站看网上如何做广告
  • 微信小程序平台官网登录入口seo营销工具
  • 临朐县网站建设矿产网站建设价格
  • 网站权重怎么提升电商的运营模式有几种
  • 政府网站集约化建设规划网站百度不收录
  • 学生做网站作品图片注册网站域名
  • 顺德品牌网站建设快速seo排名优化
  • 索象营销传播集团优化方案
  • 漯河有没有做网站的站长号
  • 做外围网站代理违法吗河南网站推广电话
  • 高端网站开发公司有哪些给我免费播放片高清在线观看
  • 天心区网站建设公司阿里云域名查询和注册
  • 网站样式百度推广运营怎么做
  • 株洲今日头条新闻事件搜索引擎排名优化公司
  • 网站开发教程流程网络营销站点推广的方法
  • 上海新楼盘2022年开盘山西免费网站关键词优化排名
  • 如何制作网站详细教程seo技术培训唐山
  • 北京做机柜空调的网站搜索引擎营销的名词解释
  • 网站和网页百度推广要自己建站吗
  • 网站建设客服话术软件推广赚佣金渠道
  • 佛山智家人网站软文写作范例大全
  • 网站运营与推广简阳seo排名优化课程
  • 制作网站的模板免费友情链接平台
  • 合肥网站建设高端百度指数搜索榜
  • 大学电子商务网站建设方案线上培训机构