当前位置: 首页 > news >正文

网站所有权变更郑州网站建设公司排行榜

网站所有权变更,郑州网站建设公司排行榜,网站设计制作,黄石做网站联系kafka基本概念 producer: 生产者,负责发布消息到kafka cluster(kafka集群)中。生产者可以是web前端产生的page view,或者是服务器日志,系统CPU、memory等。 consumer: 消费者,每个consumer属于一个特定的c…

kafka基本概念

producer: 生产者,负责发布消息到kafka cluster(kafka集群)中。生产者可以是web前端产生的page view,或者是服务器日志,系统CPU、memory等。

consumer: 消费者,每个consumer属于一个特定的consuer group(可为每个consumer指定group name,若不指定group name则属于默认的group)。创建消费者时,要指定消费者接受的消息的topic,该消费者只会接受该topic的消息。

topic: 每条发布到Kafka集群的消息都有一个类别,这个类别被称为topic。(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处)。

broker: kafka集群包含一个或多个服务器,这些服务器就叫做broker。

本机安装kafka测试

安装kafka(mac下)

kafka下载: 从官网下载 kafka_2.13-2.7.0.tgz,直接解压即可。

本机测试kafka

1、进入到kafka的解压目录,输入命令启动zookeeper:

./bin/zookeeper-server-start.sh config/zookeeper.properties

复制

打开另一个终端输入命令启动kafka:

./bin/kafka-server-start.sh config/server.properties 

复制

2、服务启起来后,可以创建生产者和消费者了。 再打开另一个终端输入命令创建生产者:

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic

复制

broker-list: 参数指定生产者所使用的broker localhost: 9092 参数表示broker,这个broker为本机(127.0.0.1),且使用的端口是kafka的默认端口号是9092 topic: 参数表示生产者生产的消息的topic 为 “test_topic”

最后再打开另一个终端创建消费者:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic --from-beginning

复制

bootstrap-server: 是指定consumer从哪里(broker)取出消息 topic: 指定消费者consumer取出的 topic 为“test_topic”的消息。 from-beginning: Kafka实际环境有可能会出现Consumer全部宕机,虽然基于Kafka的高可用特性,消费者群组中的消费者可以实现再均衡,所有Consumer不处理数据的情况很少,但是还是有可能会出现,此时就要求Consumer重启的时候能够读取在宕机期间Producer发送的数据。基于消费者订阅模式默认是无法实现的,因为只能订阅最新发送的数据。通过消费者命令行可以实现,只要在命令行中加上–from-beginning即可

3、都创建完了可以通过生产者输入消息,消费者来接收并显示消息,效果图如下:

springboot整合kafka(IDEA)

注意: kafka要是部署在服务器的话,本机就 要和服务器之间能ping通。

1、创建springboot项目:

2、创建两个类,分别为生产者和消费者 项目目录结构:

配置文件application.yml:(一般项目自动生成的是applicaiton.properties,但为了书写简便,改成yml)

spring:kafka:bootstrap-servers: 127.0.0.1:9092 #服务器的ip及端口,可以写多个,服务器之间用“:”间隔producer: #生产者配置key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer: #消费者配置group-id: test #设置消费者的组idenable-auto-commit: true
# auto-commit-interval: 1000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

复制

springboot启动类入口,KafkaStudyApplication.java:

package com.study.kafka.kafka_study;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class KafkaStudyApplication { public static void main(String[] args) { SpringApplication.run(KafkaStudyApplication.class, args);}}

复制

TestKafkaProducerController.java:(生产者)

package com.study.kafka.kafka_study;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController     //定义这是一个控制器,可以通过浏览器访问
@RequestMapping("/kafka")
public class TestKafkaProducerController { @Autowired
private KafkaTemplate<String, String> kafkaTemplate;
//当在浏览器上输入http://localhost:8080/kafka/send?msg=abc,就会发送abc到服务器上去让消费者接收,msg对应下面的String msg
@RequestMapping("/producerSend")
public String send(String msg){ kafkaTemplate.send("test_topic", msg); //使用kafka模板发送信息
return "success";
}
}

复制

TestConsumer.java:(消费者)

package com.study.kafka.kafka_study;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class TestConsumer { /** * 定义此消费者接收topic为“test_topic”的消息,监听服务器上的kafka是否有相关的消息发过来 * @param record record变量代表消息本身,可以通过ConsumerRecord<?,?>类型的record变量来打印接收的消息的各种信息 * */
@KafkaListener(topics = "test_topic")
public void listen (ConsumerRecord<?, ?> record) throws Exception { System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
}
}

复制

测试

1、运行KafkaStudyApplication.java之后,终端上输入消息时,不仅终端上(服务器)运行的测试消费者能收到,IDEA上的程序也能收到。

2、在浏览器上输入http://localhost:8080/kafka/producerSend?msg=web world31231,不仅IDEA上的消费者能收到,在终端(服务器)上运行的测试消费者也能收到:(其中8080是tomcat服务器的端口,springboot默认下带的是tomcat)

http://www.dt0577.cn/news/10742.html

相关文章:

  • 佛山建站网站模板自贡网站seo
  • 做it的兼职网站有哪些安卓优化大师旧版本下载
  • 中央人事任免令seo关键词搜索优化
  • 深圳教育科技网站建设站长统计幸福宝下载
  • 建设网站的优点跟缺点站长之家备案查询
  • wordpress首页分类seo观察网
  • 班级响应式网站html格式注册公司网站
  • 手机端网站开发信息推广平台
  • 做网站界面一般用什么来做外包公司到底值不值得去
  • 如何对上传的网站做代码修改关键词你们都搜什么
  • 刷钻网站推广免费网页设计与制作作业成品
  • 手机建设网站目的泰州百度seo
  • 城子河网站建设品牌策划案例
  • 做旅游网站图片哪里找怎么让百度搜出自己
  • 网站移动端和PC端自适应怎么做十大搜索引擎排名
  • 无码一级a做爰片免费网站什么是关键词广告
  • 简单的静态网站网站创建
  • 大连市城乡建设委员会官网南京seo优化
  • 莆田网站建设开发网页设计流程步骤
  • 直播做网站网站制作的费用
  • 微信建立免费网站搜狗站长平台验证网站
  • 做网站买服务器怎么样网站友情链接检测
  • 网站后台管理 源码外贸接单网站
  • 东莞网站视频江苏seo团队
  • 鹤岗市建设局网站免费行情网站大全搜狐网
  • 做网站建设工资高吗seo优化收费
  • 网站想换个风格怎么做赣州seo唐三
  • wordpress主题视频站微博指数
  • 进入深圳市住房和建设局网站网页优化方案
  • 大庆市让胡路区规划建设局网站百度自动点击器怎么用