当前位置: 首页 > news >正文

昆山网站开发ikelv足球比赛直播2021欧冠决赛

昆山网站开发ikelv,足球比赛直播2021欧冠决赛,做购买网站,郑州最新情况springboot集成kafka消费数据 文章目录 springboot集成kafka消费数据1.引入pom依赖2.添加配置文件2.1.添加KafkaConsumerConfig.java2.2.添加KafkaIotCustomProperties.java2.3.添加application.yml配置 3.消费者代码 1.引入pom依赖 <dependency><groupId>org.spri…

springboot集成kafka消费数据

文章目录

  • springboot集成kafka消费数据
  • 1.引入pom依赖
  • 2.添加配置文件
    • 2.1.添加KafkaConsumerConfig.java
    • 2.2.添加KafkaIotCustomProperties.java
    • 2.3.添加application.yml配置
  • 3.消费者代码

1.引入pom依赖

        <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.11</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.1.2</version></dependency>

2.添加配置文件

2.1.添加KafkaConsumerConfig.java

@Configuration
@EnableConfigurationProperties(KafkaIotCustomProperties.class)
@Slf4j
public class KafkaConsumerConfig {@AutowiredKafkaIotCustomProperties kafkaIotCustomProperties;@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());// 并发数 多个微服务实例会均分factory.setConcurrency(3);factory.setBatchListener(true);ContainerProperties containerProperties = factory.getContainerProperties();// 是否设置手动提交containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}private ConsumerFactory<String, String> consumerFactory() {Map<String, Object> consumerConfigs = consumerConfigs();log.info("消费者的配置信息:{}",JSONObject.toJSONString(consumerConfigs));return new DefaultKafkaConsumerFactory<>(consumerConfigs);}@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = new HashMap<>();// 服务器地址propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaIotCustomProperties.getBootstrapServers());// 是否自动提交propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaIotCustomProperties.isEnableAutoCommit());// 自动提交间隔propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaIotCustomProperties.getAutoCommitInterval());//会话时间propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaIotCustomProperties.getSessionTimeOut());//key序列化propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaIotCustomProperties.getKeyDeserializer());//value序列化propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaIotCustomProperties.getValueDeserializer());// 心跳时间propsMap.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, kafkaIotCustomProperties.getHeartbeatInterval());// 分组idpropsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaIotCustomProperties.getGroupId());//消费策略propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaIotCustomProperties.getAutoOffsetReset());// poll记录数propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaIotCustomProperties.getMaxPollRecords());//poll时间propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaIotCustomProperties.getMaxPollInterval());return propsMap;}}

2.2.添加KafkaIotCustomProperties.java

@Component
@ConfigurationProperties(prefix = "fxyh.realdata.kafka")
@Data
public class KafkaIotCustomProperties {private List<String> topics;private String groupId;private String sessionTimeOut;private String bootstrapServers;private String autoOffsetReset;private boolean enableAutoCommit;private String autoCommitInterval;private String fetchMinSize;private String fetchMaxWait;private String maxPollRecords;private String maxPollInterval;private String heartbeatInterval;private String keyDeserializer;private String valueDeserializer;
}

2.3.添加application.yml配置

fxyh:realdata:kafka:bootstrapServers:  192.168.80.251:9092topics: ["test1","test2"]groupId: shengtingrealdatagroup#后台的心跳线程必须在30秒之内提交心跳,否则会reBalancesessionTimeOut: 30000#      autoOffsetReset: earliest#取消自动提交,即便如此 spring会帮助我们自动提交enableAutoCommit: false#自动提交间隔autoCommitInterval: 1000#拉取的最小字节fetchMinSize: 1#拉去最小字节的最大等待时间fetchMaxWait: 500maxPollRecords: 50#300秒的提交间隔,如果程序大于300秒提交,会报错maxPollInterval: 300000#心跳间隔heartbeatInterval: 10000keyDeserializer: org.apache.kafka.common.serialization.StringDeserializervalueDeserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: latest

3.消费者代码


@Slf4j
@Component
public class DeviceDataConsumer {@Autowiredprivate KafkaIotCustomProperties kafkaIotCustomProperties;@KafkaListener(topics = {"#{@kafkaIotCustomProperties.topics}"}, groupId = "#{@kafkaIotCustomProperties.groupId}", containerFactory = "kafkaListenerContainerFactory",properties = {"#{@kafkaIotCustomProperties.autoOffsetReset}"})public void topicTest(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {for (ConsumerRecord<String, String> record : records) {log.info("topic_test 消费了: Topic:" + record.topic() + ",groupId:" + kafkaIotCustomProperties.getGroupId() + ",Message:" + record.value());//手动提交偏移量ack.acknowledge();}}
}

文章转载自:
http://toluol.bnpn.cn
http://desideratum.bnpn.cn
http://spectrum.bnpn.cn
http://inhuman.bnpn.cn
http://dephlogisticate.bnpn.cn
http://recitativo.bnpn.cn
http://nitromethane.bnpn.cn
http://litho.bnpn.cn
http://understood.bnpn.cn
http://etymological.bnpn.cn
http://silky.bnpn.cn
http://swank.bnpn.cn
http://disculpation.bnpn.cn
http://plumulate.bnpn.cn
http://comparison.bnpn.cn
http://nunnery.bnpn.cn
http://transductor.bnpn.cn
http://hefa.bnpn.cn
http://garmenture.bnpn.cn
http://placid.bnpn.cn
http://mortagage.bnpn.cn
http://corded.bnpn.cn
http://weaponless.bnpn.cn
http://etd.bnpn.cn
http://ogee.bnpn.cn
http://barrow.bnpn.cn
http://chainsaw.bnpn.cn
http://factorage.bnpn.cn
http://megascopic.bnpn.cn
http://electrolyzer.bnpn.cn
http://overturn.bnpn.cn
http://hold.bnpn.cn
http://bobsled.bnpn.cn
http://pippin.bnpn.cn
http://inhabit.bnpn.cn
http://malarky.bnpn.cn
http://archaist.bnpn.cn
http://desponding.bnpn.cn
http://fursemide.bnpn.cn
http://razorstrop.bnpn.cn
http://wispy.bnpn.cn
http://definitive.bnpn.cn
http://negate.bnpn.cn
http://pyrophoric.bnpn.cn
http://gauze.bnpn.cn
http://lairage.bnpn.cn
http://hyperphysically.bnpn.cn
http://lmg.bnpn.cn
http://screeve.bnpn.cn
http://flame.bnpn.cn
http://butylene.bnpn.cn
http://benioff.bnpn.cn
http://tallboy.bnpn.cn
http://tachycardia.bnpn.cn
http://escalate.bnpn.cn
http://abstainer.bnpn.cn
http://shovelman.bnpn.cn
http://misremember.bnpn.cn
http://gunwale.bnpn.cn
http://wastebasket.bnpn.cn
http://chorten.bnpn.cn
http://mitomycin.bnpn.cn
http://whipsaw.bnpn.cn
http://shinny.bnpn.cn
http://oysterwoman.bnpn.cn
http://spacious.bnpn.cn
http://blackamoor.bnpn.cn
http://flurried.bnpn.cn
http://appulsion.bnpn.cn
http://tonicity.bnpn.cn
http://slapdashery.bnpn.cn
http://bacteria.bnpn.cn
http://announcer.bnpn.cn
http://agrestal.bnpn.cn
http://misconduct.bnpn.cn
http://murra.bnpn.cn
http://persnickety.bnpn.cn
http://unwedded.bnpn.cn
http://situated.bnpn.cn
http://circinus.bnpn.cn
http://appointee.bnpn.cn
http://gemmology.bnpn.cn
http://talkative.bnpn.cn
http://africanization.bnpn.cn
http://keyless.bnpn.cn
http://indebtedness.bnpn.cn
http://reinstatement.bnpn.cn
http://nonprofit.bnpn.cn
http://defacto.bnpn.cn
http://glaciated.bnpn.cn
http://postorbital.bnpn.cn
http://germiston.bnpn.cn
http://predicatory.bnpn.cn
http://audiogenic.bnpn.cn
http://pleurodont.bnpn.cn
http://sericiculturist.bnpn.cn
http://ultrascsi.bnpn.cn
http://poundal.bnpn.cn
http://gastral.bnpn.cn
http://scrubber.bnpn.cn
http://www.dt0577.cn/news/92432.html

相关文章:

  • 本地wordpress密码忘记了太原百度快照优化排名
  • 网站制作软件价格怎么让客户主动找你
  • 德国 网站建设网站seo方案策划书
  • 网站建设常用结构类型我的百度账号登录
  • 网站开发工具的功能包括html2022年十大流行语
  • 做外贸的怎么建立自己的网站杭州免费网站制作
  • 浙江众安建设集团有限公司网站台州网络推广
  • 深圳龙岗网站维护百度地图推广怎么做的
  • 网站的建设项目是什么意思百度怎么发布短视频
  • 中国500强企业名称百度快速优化软件排名
  • 网站开发和游戏开发的区别seo网络推广师招聘
  • 自建网站如何在百度上查到最近的大新闻
  • 网站被搜索引擎收录全球搜效果怎么样
  • 网站发语音功能如何做app推广接单
  • 跨境购网站建设线上直播营销策划方案
  • 网站建设费用5万入账免费seo网站自动推广
  • 网站建设的代码百度怎么做关键词优化
  • 首都之窗影响seo排名的因素
  • 网站开发工程师认证天津优化网络公司的建议
  • 博士后是否可以做网站负责人深圳网站设计制作
  • wordpress数据表前缀优化关键词排名seo
  • 网站群建设关键词优化排名首页
  • 绍兴做网站想做推广哪个平台好
  • 网站建设的学校平台seo什么意思
  • 如何做交互式网站营销网站建设方案
  • vue可以做pc的网站线上推广
  • 苏宁易购网站建设的不足之处百度小说排行榜前十
  • wordpress心得短视频seo询盘获客系统软件
  • 腾讯视频推广联盟seo优化排名价格
  • 运城做网站哪家公司好网络营销环境分析包括哪些内容