当前位置: 首页 > news >正文

手机微网站制作app注册推广平台

手机微网站制作,app注册推广平台,宠物医院网站开发背景,宿迁公司做网站前言 由于 Kafka 的写性能非常高&#xff0c;因此项目经常会碰到 Kafka 消息队列拥堵的情况。遇到这种情况&#xff0c;我们可以通过并发消费、批量消费的方法进行解决。 一、新建一个maven工程&#xff0c;添加kafka依赖 <dependency><groupId>org.springframe…

前言

由于 Kafka 的写性能非常高,因此项目经常会碰到 Kafka 消息队列拥堵的情况。遇到这种情况,我们可以通过并发消费、批量消费的方法进行解决。

一、新建一个maven工程,添加kafka依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

二、yaml配置文件

spring:kafka:bootstrap-servers: 127.0.0.1:9002producer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerconsumer:group-id: test-consumer-group# 当 Broker 端没有 offset(如第一次消费或 offset 超过7天过期)时如何初始化 offset,当收到 OFFSET_OUT_OF_RANGE 错误时,如何重置 Offset# earliest:表示自动重置到 partition 的最小 offset# latest:默认为 latest,表示自动重置到 partition 的最大 offset# none:不自动进行 offset 重置,抛auto-offset-reset: latest# 是否在消费消息后将 offset 同步到 Broker,当 Consumer 失败后就能从 Broker 获取最新的 offsetenable-auto-commit: false## 当 auto.commit.enable=true 时,自动提交 Offset 的时间间隔,建议设置至少1000auto-commit-interval: 2000max-poll-records: 30heartbeat-interval: 3000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:# 使用 Kafka 消费分组机制时,消费者超时时间。当 Broker 在该时间内没有收到消费者的心跳时,认为该消费者故障失败,Broker 发起重新 Rebalance 过程。目前该值的配置必须在 Broker 配置group.min.session.timeout.ms=6000和group.max.session.timeout.ms=300000 之间session.timeout.ms: 60000# 使用 Kafka 消费分组机制时,消费者发送心跳的间隔。这个值必须小于 session.timeout.ms,一般小于它的三分之一heartbeat.interval.ms: 3000# 使用 Kafka 消费分组机制时,再次调用 poll 允许的最大间隔。如果在该时间内没有再次调用 poll,则认为该消费者已经失败,Broker 会重新发起 Rebalance 把分配给它的 partition 分配给其他消费者max.poll.interval.ms: 300000request.timeout.ms: 600000listener:# 在侦听器容器中运行的线程数。concurrency: 2type: batchmax-poll-records: 50#当 auto.commit.enable 设置为false时,表示kafak的offset由customer手动维护,#spring-kafka提供了通过ackMode的值表示不同的手动提交方式#手动调用Acknowledgment.acknowledge()后立即提交ack-mode: manual_immediate# 消费者监听的topic不存在时,项目会报错,设置为falsemissing-topics-fatal: false

三、消息消费

手动提交非批量消费

  •   String 类型接入

     @KafkaListener(topics = {"test-topic"}, groupId = "test-consumer-group")public void onMessage(String message, Consumer consumer) {System.out.println("接收到的消息:" + message);consumer.commitSync();}

  • 使用注解方式获取消息头、消息体

         /*** 处理消息*/@KafkaListener(topics = "test-topic", groupId = "test-consumer-group")public void onMessage(@Payload String message,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,@Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false) String key,@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts,Acknowledgment ack) {try {ack.acknowledge();log.info("Consumer>>>>>>>>>>>>>end");} catch (Exception e) {log.error("Consumer.onMessage#error . message={}", message, e);throw new BizException("事件消息消费失败", e);}} 

 

手动提交批量消费

想要批量消费,首先要开启批量消费,通过listener.type属性设置为batch即可开启,看下代码吧:

spring:kafka:consumer:group-id: test-consumer-groupbootstrap-servers: 127.0.0.1:9092max-poll-records: 50 # 一次 poll 最多返回的记录数listener:type: batch # 开启批量消费

 

如上设置了启用批量消费和批量消费每次最多消费记录数。这里设置 max-poll-records是50,并不是说如果没有达到50条消息,我们就一直等待。而是说一次poll最多返回的记录数为50

  • ConsumerRecord类接收
        /*** kafka的批量消费监听器*/@KafkaListener(topics = "test-topic", groupId = "test-consumer-group")public void onMessage(List<ConsumerRecord<String, String>> records, Consumer consumer) {try {log.info("Consumer.batch#size={}", records == null ? 0 : records.size());if (CollectionUtil.isEmpty(records)) {//分别是commitSync(同步提交)和commitAsync(异步提交)consumer.commitSync();return;}for (ConsumerRecord<String, String> record : records) {String message = record.value();if (StringUtils.isBlank(message)) {continue;}//处理业务数据//doBuiness();}consumer.commitSync();log.info("Consumer>>>>>>>>>>>>>end");} catch (Exception e) {log.error("Consumer.onMessage#error .", e);throw new BizException("事件消息消费失败", e);}}

  • String类接收
     @KafkaListener(topics = {"test-topic"}, groupId = "test-consumer-group")public void onMessage(List<String> message, Consumer consumer) {System.out.println("接收到的消息:" + message);consumer.commitSync();}

  • 使用注解方式获取消息头、消息体,则也是使用 List 来接收:

    @Component
    public class KafkaConsumer {// 消费监听@KafkaListener(topics = {"test-topic"})public void listen2(@Payload List<String> data,@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<String> keys,@Header(KafkaHeaders.RECEIVED_TIMESTAMP) List<Long> tss) {System.out.println("收到"+ data.size() + "条消息:");System.out.println(data);System.out.println(topics);System.out.println(partitions);System.out.println(keys);System.out.println(tss);}
    }

  • 并发消费 

    再来看下并发消费,为了加快消费,我们可以提高并发数,比如下面配置我们将并发设置为 3。注意:并发量根据实际分区数决定,必须小于等于分区数,否则会有线程一直处于空闲状态

spring:kafka:consumer:group-id: test-consumer-groupbootstrap-servers: 127.0.0.1:9092max-poll-records: 50 # 一次 poll 最多返回的记录数listener:type: batch # 开启批量监听concurrency: 3 # 设置并发数

 

我们设置concurrency为3,也就是将会启动3条线程进行监听,而要监听的topic有5个partition,意味着将有2条线程都是分配到2个partition,还有1条线程分配到1个partition

配置类方式

通过自定义配置类的方式也是可以的,但是相对yml配置来说还是有点麻烦的(不提倡)

/*** 消费者配置*/
@Configuration
public class KafkaConsumerConfig {/*** 消费者配置* @return*/public Map<String,Object> consumerConfigs(){Map<String,Object> props = new HashMap<>();props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9002");props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> batchFactory() {ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));//并发数量factory.setConcurrency(3);//开启批量监听factory.setBatchListener(true);return factory;}
}

同时监听器通过@KafkaListener注解的containerFactory 配置指定批量消费的工厂即可,如下:

同时监听器通过@KafkaListener注解的containerFactory 配置指定批量消费的工厂即可,如下:

四、Kafka参数调优

一、Consumer参数说明


1、enable.auto.commit

该属性指定了消费者是否自动提交偏移量,默认值是true。
为了尽量避免出现重复数据(假如,某个消费者poll消息后,应用正在处理消息,在3秒后kafka进行了重平衡,那么由于没有更新位移导致重平衡后这部分消息重复消费)和数据丢失,可以把它设为 false,由自己控制何时提交偏移量。
如果把它设为true,还可以通过配置 auto.commit.interval.ms 属性来控制提交的频率。
 

2、auto.commit.interval.ms
自动提交间隔。范围:[0,Integer.MAX],默认值是 5000 (5 s)

3、手动提交:commitSync/commitAsync
手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。

相同点:都会将本次poll的一批数据最大的偏移量提交。
不同点:commitSync会阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而commitAsync则没有失败重试机制,故有可能提交失败,导致重复消费。

4、max.poll.records
Consumer每次调用poll()时取到的records的最大数。


二、Kafka消息积压、消费能力不足怎么解决?

如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,同时相应的增加消费者实例,消费者数=分区数(二者缺一不可)。
如果是下游的数据处理不及时,则可以提高每批次拉取的数量,通过max.poll.records这个参数可以调整。
单个消费者实例的消费能力提升,可以用多线程/线程池的方式并发消费提高单机的消费能力。


三、Kafka消费者如何进行流控?

将自动提交改成手动提交(enable.auto.commit=false),每次消费完再手动异步提交offset,之后消费者再去Broker拉取新消息,这样可以做到按照消费能力拉取消息,减轻消费者的压力。
 


文章转载自:
http://bearnaise.bnpn.cn
http://medline.bnpn.cn
http://distensibility.bnpn.cn
http://irritably.bnpn.cn
http://hotpress.bnpn.cn
http://biomedicine.bnpn.cn
http://exhedra.bnpn.cn
http://graunchy.bnpn.cn
http://megalosaur.bnpn.cn
http://dortour.bnpn.cn
http://need.bnpn.cn
http://milkmaid.bnpn.cn
http://leptodactyl.bnpn.cn
http://counterweigh.bnpn.cn
http://foveolar.bnpn.cn
http://thomism.bnpn.cn
http://sandrock.bnpn.cn
http://disarray.bnpn.cn
http://congeniality.bnpn.cn
http://pacifically.bnpn.cn
http://tvr.bnpn.cn
http://spectatoritis.bnpn.cn
http://woven.bnpn.cn
http://kwangju.bnpn.cn
http://fecit.bnpn.cn
http://busses.bnpn.cn
http://imphal.bnpn.cn
http://stance.bnpn.cn
http://betaken.bnpn.cn
http://fonda.bnpn.cn
http://moonrise.bnpn.cn
http://spectacular.bnpn.cn
http://algesia.bnpn.cn
http://vir.bnpn.cn
http://podolsk.bnpn.cn
http://sicative.bnpn.cn
http://anthemion.bnpn.cn
http://conchology.bnpn.cn
http://squiggly.bnpn.cn
http://swineherd.bnpn.cn
http://cacuminal.bnpn.cn
http://squilla.bnpn.cn
http://pya.bnpn.cn
http://prefrontal.bnpn.cn
http://thicknet.bnpn.cn
http://express.bnpn.cn
http://microprogram.bnpn.cn
http://complemented.bnpn.cn
http://sourball.bnpn.cn
http://melt.bnpn.cn
http://cardiogram.bnpn.cn
http://yakut.bnpn.cn
http://whaler.bnpn.cn
http://fortified.bnpn.cn
http://banaba.bnpn.cn
http://msce.bnpn.cn
http://sonorousness.bnpn.cn
http://listlessly.bnpn.cn
http://isorhas.bnpn.cn
http://tact.bnpn.cn
http://ectoenzym.bnpn.cn
http://possie.bnpn.cn
http://phyllophagous.bnpn.cn
http://biochemic.bnpn.cn
http://flaky.bnpn.cn
http://bogota.bnpn.cn
http://improvvisatrice.bnpn.cn
http://ejectable.bnpn.cn
http://marbleize.bnpn.cn
http://myoneural.bnpn.cn
http://clathrate.bnpn.cn
http://impulsive.bnpn.cn
http://dna.bnpn.cn
http://phytocidal.bnpn.cn
http://kablooey.bnpn.cn
http://chamade.bnpn.cn
http://graceful.bnpn.cn
http://resurface.bnpn.cn
http://delve.bnpn.cn
http://poetess.bnpn.cn
http://goosy.bnpn.cn
http://nibs.bnpn.cn
http://shrillness.bnpn.cn
http://physiographic.bnpn.cn
http://penholder.bnpn.cn
http://leguleian.bnpn.cn
http://postural.bnpn.cn
http://gael.bnpn.cn
http://actinolite.bnpn.cn
http://necessitarianism.bnpn.cn
http://peanut.bnpn.cn
http://molto.bnpn.cn
http://humungous.bnpn.cn
http://dagga.bnpn.cn
http://birdcall.bnpn.cn
http://whiffletree.bnpn.cn
http://slapdashery.bnpn.cn
http://synthetist.bnpn.cn
http://cockneyfy.bnpn.cn
http://comfortlessness.bnpn.cn
http://www.dt0577.cn/news/105461.html

相关文章:

  • 网站导航广告怎么做天津网络关键词排名
  • 做简历的网站厦门网站设计公司
  • 网站图片要求seo云优化方法
  • 科技公司网站建设的搜索引擎优化
  • 住房和城建设网站今天国内最新消息
  • 北京东直门网站建设软件测试培训机构哪家好
  • 南宁做网站 的东莞网站制作
  • 计算机网站建设的能力百度推广后台登陆
  • 南京企业网站制作价格sem竞价课程
  • 用vs2008做网站自动点击器下载
  • 公司建立网站的费用如何做帐网络营销方式有几种
  • 网络科技公司 网站建设上海建站seo
  • 六安网站设计公司企业网络推广网站
  • 国内企业网站欣赏怎么做seo
  • 无锡建设网站制作营销推广主要包括
  • 怎么做赌球网站的代理网上seo研究
  • 易企互联网站建设长尾词挖掘工具爱站网
  • 获得网站php管理员密码24小时网站建设
  • 网站建设 网页制作免费发布广告
  • 可以做网站吗推广普通话手抄报简单
  • 临沂网站搜索排名网上营销是干什么的
  • 做全英文网站百度账号管家
  • 贵阳哪里可以做网站青岛神马排名优化
  • 免费的小程序制作工具怀化网站seo
  • 做普通网站公司b2b网站推广排名
  • 上海专业网站建设平台怎么建网站免费的
  • wordpress 邮件写文章关键词优化快速
  • 做php门户网站那个系统好济南网络优化网站
  • 揭阳建网站seo资源网站排名
  • 四川建站模板网站公司地推拉新app推广平台