当前位置: 首页 > news >正文

个人网站建设论文中期报告qq群推广网站免费

个人网站建设论文中期报告,qq群推广网站免费,赌博网站怎么做的,做网站模板赚钱今天的议题是:如何快速处理kafka的消息积压 通常的做法有以下几种: 增加消费者数增加 topic 的分区数,从而进一步增加消费者数调整消费者参数,如max.poll.records增加硬件资源 常规手段不是本文的讨论重点或者当上面的手段已经使…

今天的议题是:如何快速处理kafka的消息积压

通常的做法有以下几种:

  1. 增加消费者数
  2. 增加 topic 的分区数,从而进一步增加消费者数
  3. 调整消费者参数,如max.poll.records
  4. 增加硬件资源

常规手段不是本文的讨论重点或者当上面的手段已经使用过依然存在很严重的消息积压时该怎么办?本文给出一种增加消费者消费速率的方案。我们知道消息积压往往是因为生产速率远大于消费速率,本文的重点就是通过提高消费速率来解决消息积压。

经验判断,消费速率低下的主要原因往往都是数据处理时间长,业务逻辑复杂最终导致一次 poll 的时间被无限拉长,如果可以通过增加数据处理的线程数来降低一次 poll 的时间那么问题就解决了。但是需要注意一下几点:

  1. 业务逻辑对乱序数据不敏感,因为并行一定会导致乱序问题
  2. kafka 的消费者是线程不安全的
  3. 如何提交 offset

基于上述几点,思路就是消费者 poll 下来一批数据,交给多个线程去并行处理,消费者等待所有线程执行完后提交。为了减少线程的创建与销毁则维护一个线程池。代码如下:

第一步:创建一个MultipleConsumer类用于封装消费者和线程池

public class MultipleConsumer {private final KafkaConsumer<String, String> consumer;private final int threadNum;private final ExecutorService threadPool;private boolean isRunning = true;public MultipleConsumer(Properties properties, List<String> topics, int threadNum) {// 实例化消费者consumer = new KafkaConsumer<>(properties);// 订阅主题consumer.subscribe(topics);this.threadNum = threadNum;this.threadPool = Executors.newFixedThreadPool(threadNum);}
}

理论上相较于传统的消费速率可以提升 threadNum 倍。

第二步:因为需要并行处理一批 poll 数据,因此需要对数据进行切分,切分逻辑如下

private Map<Integer, List<ConsumerRecord<String, String>>> splitTask(ConsumerRecords<String, String> consumerRecords) {HashMap<Integer, List<ConsumerRecord<String, String>>> tasks = new HashMap<>();for (int i = 0; i < threadNum; i++) {tasks.put(i, new ArrayList<>());}int recordIndex = 0;for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {tasks.get(recordIndex % threadNum).add(consumerRecord);recordIndex++;}return tasks;}

这里采用轮训的方式且切分的个数与 threadNum 一致,尽可能保证每个线程处理的数据数量相差不大

第三步:定义一个静态内部类用来处理数据,并处理同步逻辑(因为需要等待所有线程执行完再提交 offset)

private static class InnerProcess implements Runnable {private final List<ConsumerRecord<String, String>> records;private final CountDownLatch countDownLatch;public InnerProcess(List<ConsumerRecord<String, String>> records, CountDownLatch countDownLatch) {this.records = records;this.countDownLatch = countDownLatch;}@Overridepublic void run() {try {// 处理消息for (ConsumerRecord<String, String> record : records) {System.out.println("topic: " + record.topic() + ", partition: " + record.partition() + ", offset: " + record.offset() + ", key: " + record.key() + ", value: " + record.value());TimeUnit.SECONDS.sleep(1);}} catch (InterruptedException e) {e.printStackTrace();} finally {countDownLatch.countDown();}}}

使用 CountDownLatch 实现线程同步逻辑,假设每条数据的业务处理时间为 1 s

第四步:消费者 poll 逻辑

public void start() {while (isRunning) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(5));if (!consumerRecords.isEmpty()) {// 分割任务Map<Integer, List<ConsumerRecord<String, String>>> splitTask = splitTask(consumerRecords);CountDownLatch countDownLatch = new CountDownLatch(threadNum);// 提交任务for (int i = 0; i < threadNum; i++) {threadPool.submit(new InnerProcess(splitTask.get(i), countDownLatch));}// 等待任务执行结束try {countDownLatch.await();} catch (InterruptedException e) {throw new RuntimeException(e);}// 提交偏移量consumer.commitAsync((map, e) -> {if (e != null) {System.out.println("提交偏移量失败");}});}}}

完整代码如下:

import org.apache.kafka.clients.consumer.*;import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.*;/*** @author wjun* @date 2023/3/1 14:50* @email wjunjobs@outlook.com* @describe*/
public class MultipleConsumer {private final KafkaConsumer<String, String> consumer;private final int threadNum;private final ExecutorService threadPool;private boolean isRunning = true;public MultipleConsumer(Properties properties, List<String> topics, int threadNum) {// 实例化消费者consumer = new KafkaConsumer<>(properties);// 订阅主题consumer.subscribe(topics);this.threadNum = threadNum;this.threadPool = Executors.newFixedThreadPool(threadNum);}public void start() {while (isRunning) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(5));if (!consumerRecords.isEmpty()) {// 分割任务Map<Integer, List<ConsumerRecord<String, String>>> splitTask = splitTask(consumerRecords);CountDownLatch countDownLatch = new CountDownLatch(threadNum);// 提交任务for (int i = 0; i < threadNum; i++) {threadPool.submit(new InnerProcess(splitTask.get(i), countDownLatch));}// 等待任务执行结束try {countDownLatch.await();} catch (InterruptedException e) {throw new RuntimeException(e);}// 提交偏移量consumer.commitAsync((map, e) -> {if (e != null) {System.out.println("提交偏移量失败");}});}}}private Map<Integer, List<ConsumerRecord<String, String>>> splitTask(ConsumerRecords<String, String> consumerRecords) {HashMap<Integer, List<ConsumerRecord<String, String>>> tasks = new HashMap<>();for (int i = 0; i < threadNum; i++) {tasks.put(i, new ArrayList<>());}int recordIndex = 0;for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {tasks.get(recordIndex % threadNum).add(consumerRecord);recordIndex++;}return tasks;}public void stop() {isRunning = false;threadPool.shutdown();}private static class InnerProcess implements Runnable {private final List<ConsumerRecord<String, String>> records;private final CountDownLatch countDownLatch;public InnerProcess(List<ConsumerRecord<String, String>> records, CountDownLatch countDownLatch) {this.records = records;this.countDownLatch = countDownLatch;}@Overridepublic void run() {try {// 处理消息for (ConsumerRecord<String, String> record : records) {System.out.println("topic: " + record.topic() + ", partition: " + record.partition() + ", offset: " + record.offset() + ", key: " + record.key() + ", value: " + record.value());TimeUnit.SECONDS.sleep(1);}} catch (InterruptedException e) {e.printStackTrace();} finally {countDownLatch.countDown();}}}
}

测试一下:

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;import java.util.ArrayList;
import java.util.List;
import java.util.Properties;/*** @author wjun* @date 2023/3/1 16:03* @email wjunjobs@outlook.com* @describe*/
public class MultipleConsumerTest {private static final Properties properties = new Properties();private static final List<String> topics = new ArrayList<>();public static void before() {properties.put("bootstrap.servers", "localhost:9092");properties.put("group.id", "test");properties.put("enable.auto.commit", "false");properties.put("auto.commit.interval.ms", "1000");properties.put("session.timeout.ms", "30000");properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");topics.add("multiple_demo");}public static void main(String[] args) {new MultipleConsumer(properties, topics, 5).start();}
}

20 条数据的处理事件只需要 4s(threadNume = 5,即缩短 5 倍)

image-20230301172739782

但是此方法的缺点:

  1. 只适用于业务逻辑复杂导致的处理时间长的场景
  2. 对数据乱序不敏感的业务场景

文章转载自:
http://salicylaldehyde.rgxf.cn
http://monopolizer.rgxf.cn
http://glycemia.rgxf.cn
http://afforestation.rgxf.cn
http://wahhabi.rgxf.cn
http://memo.rgxf.cn
http://aonb.rgxf.cn
http://autecism.rgxf.cn
http://preordain.rgxf.cn
http://brainpan.rgxf.cn
http://shearling.rgxf.cn
http://impulse.rgxf.cn
http://polysemous.rgxf.cn
http://neuropsychical.rgxf.cn
http://phototypy.rgxf.cn
http://deflagration.rgxf.cn
http://filiale.rgxf.cn
http://sealery.rgxf.cn
http://aerially.rgxf.cn
http://canary.rgxf.cn
http://clumsily.rgxf.cn
http://ohmmeter.rgxf.cn
http://fantad.rgxf.cn
http://kerulen.rgxf.cn
http://xyloid.rgxf.cn
http://flick.rgxf.cn
http://gascony.rgxf.cn
http://immesh.rgxf.cn
http://beanery.rgxf.cn
http://diagrid.rgxf.cn
http://pisciform.rgxf.cn
http://spyglass.rgxf.cn
http://noir.rgxf.cn
http://unbark.rgxf.cn
http://vain.rgxf.cn
http://emporium.rgxf.cn
http://deconsecrate.rgxf.cn
http://metacompiler.rgxf.cn
http://lo.rgxf.cn
http://cento.rgxf.cn
http://kilohertz.rgxf.cn
http://sprowsie.rgxf.cn
http://heterogen.rgxf.cn
http://pellitory.rgxf.cn
http://hypodiploid.rgxf.cn
http://proudhearted.rgxf.cn
http://spitzenburg.rgxf.cn
http://inaccuracy.rgxf.cn
http://epidendrum.rgxf.cn
http://believer.rgxf.cn
http://serpentry.rgxf.cn
http://ladanum.rgxf.cn
http://satyrid.rgxf.cn
http://dredging.rgxf.cn
http://diabolism.rgxf.cn
http://bursarial.rgxf.cn
http://vive.rgxf.cn
http://tropical.rgxf.cn
http://fusain.rgxf.cn
http://soul.rgxf.cn
http://therapsid.rgxf.cn
http://riverboat.rgxf.cn
http://morrow.rgxf.cn
http://scriptwriter.rgxf.cn
http://mining.rgxf.cn
http://grapeshot.rgxf.cn
http://phraseman.rgxf.cn
http://claimer.rgxf.cn
http://pemphigus.rgxf.cn
http://sourcebook.rgxf.cn
http://jfif.rgxf.cn
http://erotic.rgxf.cn
http://alway.rgxf.cn
http://unknown.rgxf.cn
http://sjaelland.rgxf.cn
http://outright.rgxf.cn
http://micronutrient.rgxf.cn
http://thermionics.rgxf.cn
http://matriculand.rgxf.cn
http://categorical.rgxf.cn
http://unconverted.rgxf.cn
http://sexipolar.rgxf.cn
http://hafnium.rgxf.cn
http://phytopathogene.rgxf.cn
http://budworm.rgxf.cn
http://taoism.rgxf.cn
http://hexaplaric.rgxf.cn
http://hakka.rgxf.cn
http://bigger.rgxf.cn
http://sardonyx.rgxf.cn
http://inapparent.rgxf.cn
http://poolroom.rgxf.cn
http://unsocialized.rgxf.cn
http://multistage.rgxf.cn
http://jrc.rgxf.cn
http://pasteurise.rgxf.cn
http://woodpie.rgxf.cn
http://roomy.rgxf.cn
http://lackadaisical.rgxf.cn
http://effectively.rgxf.cn
http://www.dt0577.cn/news/58843.html

相关文章:

  • 做网站挣钱经历网站产品怎么优化
  • h5技术建设网站软文推广发稿
  • 做app网站设计免费行情网站大全搜狐网
  • 十堰网站设计百度收录入口提交查询
  • 湖北网站设计流程网络营销是网上销售吗
  • 健康管理公司网站建设bt磁力猪
  • 天津的公司能在北京做网站备案吗会计培训机构排名
  • 网络营销的网站建设企业网站建设需要多少钱
  • 品牌企业网站建设深圳债务优化公司
  • 用KEGG网站做KEGG富集分析seo精灵
  • 网站制作字体怎么推广网站
  • 制作网站的走马灯怎么做seo外包顾问
  • 开放一个网站多少钱百度导航最新版本下载安装
  • html居中代码站长之家seo综合
  • 南昌正规网站公司吗0元入驻的电商平台
  • 中文网站建设教程百度统计代码
  • 做cra需要关注的网站网站首页模板
  • wap网站做视频直播建站平台在线提交功能
  • 网站首页收录网页广告调词平台
  • 东莞信科做网站sem是什么品牌
  • 建设部资质网站查询山西seo关键词优化软件搜索
  • 怎么网站设计蚂蚁bt
  • 购物网站开发系统测试企业网站优化公司
  • 群晖 做网站上海百度推广代理商
  • 长沙网站设计多少钱一个月网站排名优化化快排优化
  • 网站缩略图代码百度上海分公司地址
  • 网站被主流搜索引擎收录的网页数量seo手机关键词网址
  • com后缀的网站北京中文seo
  • 卖货到海外的免费平台seo站内优化最主要的是什么
  • 百度网站怎么优化排名靠前网络推广运营途径