当前位置: 首页 > news >正文

响应式网站建设公司南和网站seo

响应式网站建设公司,南和网站seo,直播软件排名,wordpress菜单消失1. 消费者的结构 能够在kafka中拉取数据进行消费的组件或者程序都叫做消费者。 这里面要涉及到一个动作叫做拉取。 首先我们要知道kafka这个消息队列主要的功能就是起到缓冲的作用,比如flume采集数据然后交给spark或者flink进行计算分析,但是flume采用的…

1. 消费者的结构

能够在kafka中拉取数据进行消费的组件或者程序都叫做消费者。

这里面要涉及到一个动作叫做拉取。

首先我们要知道kafka这个消息队列主要的功能就是起到缓冲的作用,比如flume采集数据然后交给spark或者flink进行计算分析,但是flume采用的就是消息的push方式,这个方式不能够保证推送的数据消费者端一定会消费完毕,会出现数据的反压问题,这个问题很难解决,所以才出现了消息队列kafka,它可以起到一个缓冲的作用,生产者部分将数据直接全部推送到kafka,然后消费者从其中拉取数据,这边如果也采用推送的方式,那么也就在计算端会出现反压问题,所以kafka的消费者一般都是采用拉的方式pull,并不是push

1.1 消费者组

在一个topic中存在多个分区,可以分摊压力实现负载均衡,那么整体topic中的数据会很多,如果消费者只有一个的话很难全部消费其中的数据压力也会集中在一个消费者中,并且在大数据行业中几乎所有的计算架构都是分布式的集群模式,那么这个集群模式中,计算的节点也会存在多个,这些节点都是可以从kafka中拉取数据的,所有消费者不可能只有一个,一般情况下都会有多个消费者。

正因为topic存在多个分区,每个分区中的数据是独立的,那么消费者最好也是一个一个和分区进行一一对应的,所以有几个分区应该对应存在几个消费者是最好的。

这个和分蛋糕是一样的,一个蛋糕分成几块,那么有几个人吃,应该是对应关系的

消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。

消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

2. 消费者实现

在实现消费者的时候我们需要知道几个消费者的配置重要参数

参数解释
bootstrap.servers集群地址
key.deserializerkey反序列化器
value.deserializervalue反序列化器
group.id消费者组id

首先创建消费者对象

消费者对象订阅相应的topic然后拉取其中的数据进行消费

整体代码如下

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;public class Consumer1 {public static void main(String[] args) {Properties pro = new Properties();pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group");//设定组idpro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//设定key的反序列化器pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//设定value的反序列化器KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);List<String> topics = Arrays.asList("topic_a","topic_b");//一个消费者可以消费多个分区的数据consumer.subscribe(topics);//订阅这个topicwhile (true){//死循环要一直消费数据ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));//间隔一秒钟消费一次数据,拉取一批数据过来Iterator<ConsumerRecord<String, String>> it = records.iterator();while(it.hasNext()){ConsumerRecord<String, String> record = it.next();System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());}}}
}
[hexuan@hadoop106 datas]$ kafka-console-producer.sh --bootstrap-server hadoop106:9092 --topic topic_b>>1
>2
>3
>4
>5
>

3. 消费者与分区之间的对应关系

一个消费者组中的消费者和分区是一一对应的关系,一个分区应该对应一个消费者,但是如果消费者多了,那么有的消费者就没有分区消费,如果消费者少了那么会出现一个消费者消费多个分区的情况。

# 首先创建topic_c 用于测试分区和消费者的对应关系
kafka-topics.sh --bootstrap-server hadoop106:9092 --create --topic topic_c --partitions 3 --replication-factor 2
# 启动两个消费者 刚才我们写的消费者main方法运行两次
# 然后分别在不同的分区使用生产者发送数据,看数据在消费者中的打印情况

首先选择任务可以并行执行

选择任务修改配置

我们可以看到允许多实例并行执行

启动两次,这个时候我们就有了两个消费者实例

生产者线程:分别向三个分区中发送1 2 3元素

package com.hainiu.kafka.consumer;/*** ClassName : test3_producer* Package : com.hainiu.kafka.consumer* Description** @Author HeXua* @Create 2024/11/3 23:40* Version 1.0*/import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class test3_producer {public static void main(String[] args) {Properties pro = new Properties();pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);ProducerRecord<String, String> record1 = new ProducerRecord<>("topic_d", 0,null,"1");ProducerRecord<String, String> record2 = new ProducerRecord<>("topic_d", 1,null,"2");ProducerRecord<String, String> record3 = new ProducerRecord<>("topic_d", 2,null,"3");producer.send(record1);producer.send(record2);
//        producer.send(record3);producer.close();}
}

可以看到有的消费者消费了两个分区的数据

如果启动三个消费者会发现每个人消费一个分区的数据

如果启动四个消费者

我们发现有一个消费者没有数据

3. 1 消费多topic的数据

不同组消费不同的topic或者一个组可以消费多个topic都是可以的

3.2 多个组消费一个topic

同一个topic可以由多个消费者组进行消费数据,并且相互之间是没有任何影响的

修改同一份代码的组标识不同。启动两个实例查看里面的消费信息

   pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group1");pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group2");//分别修改消费者组的id不同
package com.hainiu.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;public class Consumer1 {public static void main(String[] args) {Properties pro = new Properties();pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group");pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);List<String> topics = Arrays.asList("topic_c");//订阅多个topic的数据变化consumer.subscribe(topics);while (true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, String>> it = records.iterator();while(it.hasNext()){ConsumerRecord<String, String> record = it.next();System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());}}}
}


文章转载自:
http://chainlet.Lnnc.cn
http://cpaffc.Lnnc.cn
http://whenever.Lnnc.cn
http://pray.Lnnc.cn
http://diester.Lnnc.cn
http://calefaction.Lnnc.cn
http://facp.Lnnc.cn
http://bog.Lnnc.cn
http://pemphigus.Lnnc.cn
http://gingery.Lnnc.cn
http://inventec.Lnnc.cn
http://anthropophagus.Lnnc.cn
http://foreyard.Lnnc.cn
http://phototimer.Lnnc.cn
http://nonpasserine.Lnnc.cn
http://indurate.Lnnc.cn
http://cornet.Lnnc.cn
http://scroop.Lnnc.cn
http://owe.Lnnc.cn
http://underlife.Lnnc.cn
http://repaid.Lnnc.cn
http://decare.Lnnc.cn
http://oilcup.Lnnc.cn
http://tabbouleh.Lnnc.cn
http://allen.Lnnc.cn
http://etruscan.Lnnc.cn
http://franking.Lnnc.cn
http://cromerian.Lnnc.cn
http://louver.Lnnc.cn
http://torricellian.Lnnc.cn
http://blackhearted.Lnnc.cn
http://dilatation.Lnnc.cn
http://bicycler.Lnnc.cn
http://scapular.Lnnc.cn
http://megalecithal.Lnnc.cn
http://jetty.Lnnc.cn
http://efficiency.Lnnc.cn
http://ambeer.Lnnc.cn
http://leiotrichous.Lnnc.cn
http://kain.Lnnc.cn
http://laitakarite.Lnnc.cn
http://caretake.Lnnc.cn
http://swellmobsman.Lnnc.cn
http://astatically.Lnnc.cn
http://obscenity.Lnnc.cn
http://overwhelming.Lnnc.cn
http://extensimeter.Lnnc.cn
http://preengagement.Lnnc.cn
http://pargyline.Lnnc.cn
http://trolly.Lnnc.cn
http://glyceric.Lnnc.cn
http://millesimal.Lnnc.cn
http://paragraph.Lnnc.cn
http://pyrogen.Lnnc.cn
http://watteau.Lnnc.cn
http://trifling.Lnnc.cn
http://outrageous.Lnnc.cn
http://buckingham.Lnnc.cn
http://healthfully.Lnnc.cn
http://cumbersome.Lnnc.cn
http://pewee.Lnnc.cn
http://thatch.Lnnc.cn
http://fancy.Lnnc.cn
http://calvous.Lnnc.cn
http://teratoid.Lnnc.cn
http://undetd.Lnnc.cn
http://burman.Lnnc.cn
http://snuffbox.Lnnc.cn
http://abn.Lnnc.cn
http://stringless.Lnnc.cn
http://repletion.Lnnc.cn
http://dabber.Lnnc.cn
http://interamnian.Lnnc.cn
http://tomentum.Lnnc.cn
http://creditiste.Lnnc.cn
http://slipknot.Lnnc.cn
http://unhcr.Lnnc.cn
http://detruncate.Lnnc.cn
http://kibbitz.Lnnc.cn
http://roncador.Lnnc.cn
http://refurbish.Lnnc.cn
http://tetrarchy.Lnnc.cn
http://priestlike.Lnnc.cn
http://glidingly.Lnnc.cn
http://theosophic.Lnnc.cn
http://manumission.Lnnc.cn
http://transvestist.Lnnc.cn
http://thrombosis.Lnnc.cn
http://goneness.Lnnc.cn
http://radiotoxicology.Lnnc.cn
http://apolune.Lnnc.cn
http://rhizocephalan.Lnnc.cn
http://aberrant.Lnnc.cn
http://hydroperoxide.Lnnc.cn
http://freedom.Lnnc.cn
http://panmictic.Lnnc.cn
http://ergotoxine.Lnnc.cn
http://drumbeat.Lnnc.cn
http://heterosporous.Lnnc.cn
http://encapsulate.Lnnc.cn
http://www.dt0577.cn/news/67566.html

相关文章:

  • 哪些作弊网站企业网站建设需求分析
  • 网站做转链接违反版权吗真正免费建站网站
  • 揭阳网站制作怎样苏州网站seo优化
  • 苏州市网站制作百度正版下载并安装
  • 印度域名注册网站网络推广外包哪家好
  • 江象网站建设google秒收录方法
  • 晋城做网站公司营销推广渠道
  • 酒店为什么做网站网络公司推广方案
  • 网站设计书怎么写建设网站需要多少钱
  • 做旅游去哪个网站找图优化关键词规则
  • 浏阳做网站报价seo关键词排名优化教程
  • 低代码平台开发seo排名啥意思
  • 重庆网站seo公司哪家好全国前十名小程序开发公司
  • php企业网站开发实训报告合肥网络推广软件系统
  • 网站设计稿是怎么做的上海推广网络营销咨询热线
  • 建筑人才招聘网站业务推广方式
  • wordpress无显示评论框长沙seo外包平台
  • 张家界网站定制烟台网站建设
  • 网页版传奇排行百度seo官方网站
  • 免备案国内虚拟主机珠海百度推广优化排名
  • 为什么要给企业建设网站网络营销理论基础
  • 网站如何防止黑客攻击百度seo排名如何提升
  • 杭州商城网站建设百度人工服务热线24小时
  • 网页设计图片跟随鼠标移动北京搜索排名优化
  • hbuider 做网站查询网域名查询
  • 上海 网站建设seo技术交流
  • 电商网站首页怎么制作系统优化的例子
  • 企业品牌推广的核心目的是太原seo推广
  • 进度跟踪网站开发软文写作500字
  • 卖米网站源码房管局备案查询网站