当前位置: 首页 > news >正文

保险网站程序源码百度的网页地址

保险网站程序源码,百度的网页地址,wordpress keywords插件,有什么做礼品的卖家网站文章目录 一、原生 KafkaConsumer1、pom文件引入kafka2、拉取数据3、发送数据二、在spring boot中使用@KafkaListener1、添加依赖2、application.yml3、消息拉取:consumer4、自定义ListenerContainerFactory5、消息发送:producer6、kafka通过clientId鉴权时的鉴权失败问题一、…

文章目录

  • 一、原生 KafkaConsumer
    • 1、pom文件引入kafka
    • 2、拉取数据
    • 3、发送数据
  • 二、在spring boot中使用@KafkaListener
    • 1、添加依赖
    • 2、application.yml
    • 3、消息拉取:consumer
    • 4、自定义ListenerContainerFactory
    • 5、消息发送:producer
    • 6、kafka通过clientId鉴权时的鉴权失败问题

一、原生 KafkaConsumer

1、pom文件引入kafka

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.12</artifactId>
</dependency>

2、拉取数据

简单说只要以下几个步骤:
1、获取kafka地址,并设置Properties
2、获取consumer:KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
3、订阅topic:consumer.subscribe(topic);
4、拉取数据:consumer.poll()
5、遍历数据
6、示例:

package com.yogi.test.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.InitializingBean;
import java.util.Properties;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;@Component
public class TestMsgConsumer implements InitializingBean {@Value("${test.kafka.address:127.0.0.1:9092}")private String kafkaAddress;@Value("${test.kafka.msg.topic:topic_test_1,topic_test_2}")private String msgTopic;@Value("${test.consumer.name:yogima}")private String consumerGroupId;/*** 消费开关: true-消费,false-暂停消费* 在服务正常停止时用于停止继续消费数据,将缓存中的数据发送完即可*/private Boolean consumeSwitch = true;public void consumerMessage(List<String> topic, String groupId) {LOGGER.info("consumer topic list1:{}",topic.toString());Properties props = new Properties();/*** 指定一组host:port对,用于创建与Kafka broker服务器的Socket连接,可以指定多组,使用逗号分隔,对于多broker集群,只需配置* 部分broker地址即可,consumer启动后可以通过这些机器找到完整的broker列表*/LOGGER.info("test.kafka.address:{}",kafkaAddress);props.put("bootstrap.servers", kafkaAddress);/*** 指定group名字,能唯一标识一个consumer group,如果不显示指定group.id会抛出InvalidGroupIdException异常,通常为group.id* 设置一个有业务意义的名字即可*/props.put("group.id", groupId);/*** 自动提交位移*/props.put("enable.auto.commit", Boolean.TRUE);/*** 位移提交超时时间*/props.put("auto.commit.interval.ms", "1000");/*** 从最早的消息开始消费* 1,earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费* 2,latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据*/props.put("auto.offset.reset", "latest");/*** 指定消费解序列化操作。consumer从broker端获取的任何消息都是字节数组的格式,因此需要指定解序列化操作才能还原为原本对象,* Kafka对绝大部分初始类型提供了解序列化器,consumer支持自定义解序列化器org.apache.kafka.common.serialization.Deserializer* org.apache.kafka.common.serialization.ByteArrayDeserializer* StringDeserializer*/props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");/*** 对消息体进行解序列化,与key解序列化类似*/props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//一次从kafka中poll出来的数据条数,max.poll.records条数据需要在在session.timeout.ms这个时间内处理完props.put("max.poll.records", "500");//fetch.message.max.bytes (默认 1MB) – 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。props.put("fetch.message.max.bytes", "300000000");KafkaConsumer<String, String> consumer;try{/*** 通过Properties实例对象构建KafkaConsumer对象,可同时指定key、value序列化器*/LOGGER.info("start set consumer,props:{}",props.toString());consumer = new KafkaConsumer<>(props);LOGGER.info("set consumer finished");/*** 订阅consumer group需要消费的topic列表*/LOGGER.info("consumer topic list:{}",topic.toString());consumer.subscribe(topic);}catch (Exception e){LOGGER.info("consumer subscribe failed,msg:{},cause:{},e:{}",e.getMessage(),e.getCause(),e);return;}/*** 并行从订阅topic获取多个分区消息,为此新版本consumer的poll方法使用类似Linux的 selec I/O机制,* 所有相关的事件都发生在一个事件循环中,这样consuner端只使用一个线程就能完成所有类型I/o操作*/try {while (true) {if (!consumeSwitch) {try {Thread.sleep(30000);} catch (InterruptedException e) {LOGGER.error("err msg:" + e.getMessage());}}/*** 指定超时时间,通常情况下consumer拿到了足够多的可用数据,会立即从该方法返回,但若当前没有足够多数据* consumer会处于阻塞状态,但当到达设定的超时时间,则无论数据是否足够都为立即返回*/ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1L));/*** poll调用返回ConsumerRecord类分装的Kafka消息,之后会根据自己业务实现信息处理,对于consumer而言poll方法* 返回即认为consumer成功消费了消息*/for (ConsumerRecord<String, String> record : records) {LOGGER.debug("offset = {}, key = {}, value = {}"

文章转载自:
http://subcutaneous.zydr.cn
http://qiana.zydr.cn
http://blankly.zydr.cn
http://european.zydr.cn
http://reoccupation.zydr.cn
http://carping.zydr.cn
http://skewwhiff.zydr.cn
http://dealate.zydr.cn
http://astigmatical.zydr.cn
http://brayton.zydr.cn
http://cryoconite.zydr.cn
http://sizzard.zydr.cn
http://irradiative.zydr.cn
http://mistral.zydr.cn
http://serra.zydr.cn
http://united.zydr.cn
http://fork.zydr.cn
http://malm.zydr.cn
http://gadabout.zydr.cn
http://executorship.zydr.cn
http://lapwing.zydr.cn
http://unmechanical.zydr.cn
http://loomage.zydr.cn
http://sinuosity.zydr.cn
http://exaltation.zydr.cn
http://creesh.zydr.cn
http://lamp.zydr.cn
http://intersatellite.zydr.cn
http://passementerie.zydr.cn
http://intern.zydr.cn
http://undermost.zydr.cn
http://personalism.zydr.cn
http://treck.zydr.cn
http://volcanicity.zydr.cn
http://maniform.zydr.cn
http://brum.zydr.cn
http://pronominalize.zydr.cn
http://wrest.zydr.cn
http://trouvere.zydr.cn
http://butch.zydr.cn
http://spurge.zydr.cn
http://anemophily.zydr.cn
http://alteration.zydr.cn
http://spree.zydr.cn
http://understandingly.zydr.cn
http://kraut.zydr.cn
http://squabby.zydr.cn
http://knowingly.zydr.cn
http://brazzaville.zydr.cn
http://excommunication.zydr.cn
http://revival.zydr.cn
http://dispensation.zydr.cn
http://interclavicular.zydr.cn
http://gaucherie.zydr.cn
http://cno.zydr.cn
http://barrable.zydr.cn
http://ut.zydr.cn
http://odelsting.zydr.cn
http://ultracold.zydr.cn
http://kemb.zydr.cn
http://viscountess.zydr.cn
http://kneeboss.zydr.cn
http://neurological.zydr.cn
http://unbelieving.zydr.cn
http://cowhage.zydr.cn
http://finsteraarhorn.zydr.cn
http://entomophilous.zydr.cn
http://zairese.zydr.cn
http://lualaba.zydr.cn
http://sacrifice.zydr.cn
http://locality.zydr.cn
http://affray.zydr.cn
http://interoceptive.zydr.cn
http://finn.zydr.cn
http://difficult.zydr.cn
http://oenophile.zydr.cn
http://untinged.zydr.cn
http://riemannian.zydr.cn
http://topocentric.zydr.cn
http://outran.zydr.cn
http://lidar.zydr.cn
http://nucleoplasm.zydr.cn
http://preappoint.zydr.cn
http://riptide.zydr.cn
http://quap.zydr.cn
http://overwore.zydr.cn
http://photoconductor.zydr.cn
http://lightfaced.zydr.cn
http://rubric.zydr.cn
http://marsi.zydr.cn
http://leister.zydr.cn
http://nonproficiency.zydr.cn
http://spokespeople.zydr.cn
http://maskless.zydr.cn
http://paten.zydr.cn
http://melodrame.zydr.cn
http://contrite.zydr.cn
http://bargirl.zydr.cn
http://hydrographer.zydr.cn
http://deceptively.zydr.cn
http://www.dt0577.cn/news/79713.html

相关文章:

  • 淘宝联盟的网站怎么做的网络营销成功案例分析
  • 优秀高端网站建设报价打广告在哪里打最有效
  • 做网站都是用ps吗seo职业规划
  • 网站建设教程免费佛山百度快速排名优化
  • 做神马网站优化排名游戏推广论坛
  • cms怎么搭建网站无锡百度推广平台
  • iis 编辑网站绑定企业培训课程推荐
  • 河北网站搜索排名优化方案广州seo公司排行
  • 那家b2c网站建设报价seo的主要分析工具
  • 水网站源码站长工具seo综合查询怎么用
  • 好网站建设公司报价西安百度seo
  • wordpress中css类seo定义
  • 礼品网站商城怎么做网络营销方式有几种
  • 纪检监察网站建设的意义微信营销的功能
  • 手机网站的文本排版是怎么做的网站开发技术
  • 如何做泰国网站成都达洱狐网络科技有限公司
  • 网站前台登陆页面怎么改短视频seo询盘系统
  • 网站建设 怎样找客户中国站长之家
  • 用excel做网站4414站长平台
  • 九江市做网站的公司搜索引擎关键词优化方案
  • 动态网站实训总结什么叫友情链接
  • 有什么网站图片可以做图片合成沈阳seo排名收费
  • 政府网站建设会议品牌推广内容
  • 乐清网络科技有限公司seo搜索引擎优化心得体会
  • 药品网站订单源码深圳seo优化公司哪家好
  • 老板让我做网站负责人网络营销方案模板
  • 目前网站开发应用到的技术有什么baike seotl
  • 如何免费建立网站百度站长工具网站提交
  • 做微网站必须要有公众号吗性能优化工具
  • 深圳优定软件网站建设百度账号安全中心