当前位置: 首页 > news >正文

做网站怎么做多少钱电子商务营销

做网站怎么做多少钱,电子商务营销,旅游营销型网站建设,做天猫还是做网站推广文章目录 1、kafka数据传递语义2、kafka生产者事务3、事务消息发送3.1、application.yml配置3.2、创建生产者监听器3.3、创建生产者拦截器3.4、发送消息测试3.5、使用Java代码创建主题分区副本3.6、屏蔽 kafka debug 日志 logback.xml3.7、引入spring-kafka依赖3.8、控制台日志…

文章目录

  • 1、kafka数据传递语义
  • 2、kafka生产者事务
  • 3、事务消息发送
    • 3.1、application.yml配置
    • 3.2、创建生产者监听器
    • 3.3、创建生产者拦截器
    • 3.4、发送消息测试
    • 3.5、使用Java代码创建主题分区副本
    • 3.6、屏蔽 kafka debug 日志 logback.xml
    • 3.7、引入spring-kafka依赖
    • 3.8、控制台日志

1、kafka数据传递语义

kafka发送消息时是否需要重试

  1. 仅发送一次:生产者发送消息后不重试,只发送一次 可能丢失消息 效率最高
  2. 至少一次:生产者发送消息后重试,可能重试多次 效率差
  3. 精准一次发送:生产者发送消息后无论是否重复发送 发送了多少次,在 kafka broker 中只保存一次消息,通过幂等性 + 生产者事务来实现

kafak天然支持幂等性,每个消息头中带了一个唯一的标志 kafka broker 根据此标志判断消息是否已经发送过,生产者事务可以保证数据没有最终发送成功时,消费者不可以消费,如果生产者发送消息时出现异常会自动回滚(清除之前发送的事务中的消息)

kafka天然幂等性:但是指的是生产者事务 生产消息时的幂等性,发送消息时消息中带唯一标识、broker接收到消息时如果重复不再保存,事务没提交消费者不能消费改消息

2、kafka生产者事务

保证消息生产的幂等性
一组消息要么一起成功 被消费者消息 要么一起失败都不能被消费者消费

  1. 配置ack为-1 分区所有副本均落盘成功
  2. 配置生产者重试(发送失败可以继续发送:需要保证发送失败后再次发送消息到kafka实现 精准一次发送)
  3. 需要给事务分配事务id(区分一个事务中的多条消息)

3、事务消息发送

3.1、application.yml配置

server:port: 8110# v1
spring:kafka:bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097producer: # producer 生产者retries: 1 # 重试次数 0表示不重试acks: -1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、-1/all)transaction-id-prefix: tx_  # 事务id前缀:配置后producer自动开启事务batch-size: 16384 # 批次大小 单位bytebuffer-memory: 33554432 # 生产者缓冲区大小 单位bytekey-serializer: org.apache.kafka.common.serialization.StringSerializer # key的序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer # value的序列化器

3.2、创建生产者监听器

package com.atguigu.kafka.listener;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.ProducerListener;
@Component
public class MyKafkaProducerListener implements ProducerListener<String,String> {//生产者 ack 配置为 0 只要发送即成功//ack为 1  leader落盘  broker ack之后 才成功//ack为 -1 分区所有副本全部落盘  broker ack之后 才成功@Overridepublic void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {//ProducerListener.super.onSuccess(producerRecord, recordMetadata);System.out.println("MyKafkaProducerListener消息发送成功:"+"topic="+producerRecord.topic()+",partition = "+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value()+",offset = "+recordMetadata.offset());}//消息发送失败的回调:监听器可以接收到发送失败的消息 可以记录失败的消息@Overridepublic void onError(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata, Exception exception) {System.out.println("MyKafkaProducerListener消息发送失败:"+"topic="+producerRecord.topic()+",partition = "+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value()+",offset = "+recordMetadata.offset());System.out.println("异常信息:" + exception.getMessage());}
}

3.3、创建生产者拦截器

package com.atguigu.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.stereotype.Component;
import java.util.Map;
//拦截器必须手动注册给kafka生产者(KafkaTemplate)
@Component
public class MyKafkaInterceptor implements ProducerInterceptor<String,String> {//kafka生产者发送消息前执行:拦截发送的消息预处理@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {System.out.println("生产者即将发送消息:topic = "+ producerRecord.topic()+",partition:"+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value());return null;}//kafka broker 给出应答后执行@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {//exception为空表示消息发送成功if(e == null){System.out.println("消息发送成功:topic = "+ recordMetadata.topic()+",partition:"+recordMetadata.partition()+",offset="+recordMetadata.offset()+",timestamp="+recordMetadata.timestamp());}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}

3.4、发送消息测试

package com.atguigu.kafka.producer;import com.atguigu.kafka.interceptor.MyKafkaInterceptor;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import java.io.IOException;@SpringBootTest
class KafkaProducerApplicationTests {//装配kafka模板类: springboot启动时会自动根据配置文初始化kafka模板类对象注入到容器中@ResourceKafkaTemplate kafkaTemplate;@ResourceMyKafkaInterceptor myKafkaInterceptor;@PostConstructpublic void init() {kafkaTemplate.setProducerInterceptor(myKafkaInterceptor);}@Testvoid contextLoads() throws IOException {kafkaTemplate.send("my_topic1", "spring-kafka-生产者监听器");//回调是等kafka,ack以后才执行,需要阻塞System.in.read();}//kafka事务支持spring-tx的事务注解//单元测试中的事务会自动回滚@Testvoid testTransaction() throws  IOException {//多个消息的发送在一个事务中执行kafkaTemplate.executeInTransaction((var1) -> {//通过一个事务中的operations对象来发送消息,执行事务操作var1.send("my_topic1",0,"", "spring-kafka-事务1");var1.send("my_topic1",0,"", "spring-kafka-事务2");int i = 1/0;var1.send("my_topic1",0,"", "spring-kafka-事务3");return "发送消息失败";});System.in.read();}
}

3.5、使用Java代码创建主题分区副本

package com.atguigu.kafka.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.stereotype.Component;
@Component
public class KafkaTopicConfig {@Beanpublic NewTopic myTopic1() {//相同名称的主题 只会创建一次,后面创建的主题名称相同配置不同可以做增量更新(分区、副本数)return TopicBuilder.name("my_topic1")//主题名称.partitions(3)//主题分区.replicas(3)//主题分区副本数.build();//创建}
}

3.6、屏蔽 kafka debug 日志 logback.xml

<configuration>      <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug --><logger name="org.apache.kafka.clients" level="debug" />
</configuration>

3.7、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu.kafka</groupId><artifactId>kafka-producer</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka-producer</name><description>kafka-producer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

3.8、控制台日志

生产者即将发送消息:topic = my_topic1,partition:0,key = ,value = spring-kafka-事务1
生产者即将发送消息:topic = my_topic1,partition:0,key = ,value = spring-kafka-事务2
MyKafkaProducerListener消息发送失败:topic=my_topic1,partition = 0,key = ,value = spring-kafka-事务1,offset = -1
异常信息:Failing batch since transaction was aborted
MyKafkaProducerListener消息发送失败:topic=my_topic1,partition = 0,key = ,value = spring-kafka-事务2,offset = -1
异常信息:Failing batch since transaction was abortedjava.lang.ArithmeticException: / by zero

在这里插入图片描述


文章转载自:
http://procurable.mnqg.cn
http://retroreflective.mnqg.cn
http://xat.mnqg.cn
http://voluminously.mnqg.cn
http://toynbeean.mnqg.cn
http://bandstand.mnqg.cn
http://fractionalize.mnqg.cn
http://perdurability.mnqg.cn
http://piauf.mnqg.cn
http://acrodromous.mnqg.cn
http://hypoazoturia.mnqg.cn
http://retrofocus.mnqg.cn
http://underearth.mnqg.cn
http://embodiment.mnqg.cn
http://underdoctored.mnqg.cn
http://paddlefish.mnqg.cn
http://resoil.mnqg.cn
http://neoromanticism.mnqg.cn
http://creepie.mnqg.cn
http://madbrain.mnqg.cn
http://terrarium.mnqg.cn
http://gayer.mnqg.cn
http://crapper.mnqg.cn
http://zoologist.mnqg.cn
http://sleighing.mnqg.cn
http://hylozoism.mnqg.cn
http://serviette.mnqg.cn
http://contempt.mnqg.cn
http://koza.mnqg.cn
http://tarradiddle.mnqg.cn
http://silva.mnqg.cn
http://dibromide.mnqg.cn
http://epicurism.mnqg.cn
http://decapacitate.mnqg.cn
http://tishri.mnqg.cn
http://wrestler.mnqg.cn
http://edmond.mnqg.cn
http://datto.mnqg.cn
http://incunabulum.mnqg.cn
http://planiform.mnqg.cn
http://dieffenbachia.mnqg.cn
http://zamzummim.mnqg.cn
http://missiology.mnqg.cn
http://englishize.mnqg.cn
http://aequorin.mnqg.cn
http://communize.mnqg.cn
http://nightman.mnqg.cn
http://bedlamp.mnqg.cn
http://decimator.mnqg.cn
http://ramdac.mnqg.cn
http://thrombocyte.mnqg.cn
http://shopman.mnqg.cn
http://irenicon.mnqg.cn
http://wrapper.mnqg.cn
http://epiphanic.mnqg.cn
http://insectual.mnqg.cn
http://craftswoman.mnqg.cn
http://lehr.mnqg.cn
http://circumrotatory.mnqg.cn
http://fyn.mnqg.cn
http://protractor.mnqg.cn
http://camoufleur.mnqg.cn
http://sienese.mnqg.cn
http://nitrotrichloromethane.mnqg.cn
http://ambidextrous.mnqg.cn
http://rejudge.mnqg.cn
http://acoasm.mnqg.cn
http://uft.mnqg.cn
http://saturant.mnqg.cn
http://aerobiotic.mnqg.cn
http://gwendolyn.mnqg.cn
http://unlicensed.mnqg.cn
http://didy.mnqg.cn
http://campanological.mnqg.cn
http://eds.mnqg.cn
http://moctezuma.mnqg.cn
http://better.mnqg.cn
http://lectuer.mnqg.cn
http://reaping.mnqg.cn
http://alcidine.mnqg.cn
http://oversoul.mnqg.cn
http://pentomic.mnqg.cn
http://kilopound.mnqg.cn
http://reviver.mnqg.cn
http://carry.mnqg.cn
http://innocuous.mnqg.cn
http://restlesseness.mnqg.cn
http://mantis.mnqg.cn
http://appui.mnqg.cn
http://verselet.mnqg.cn
http://mainsail.mnqg.cn
http://dastardliness.mnqg.cn
http://unassured.mnqg.cn
http://treadwheel.mnqg.cn
http://verity.mnqg.cn
http://perturb.mnqg.cn
http://cbd.mnqg.cn
http://prosector.mnqg.cn
http://valerian.mnqg.cn
http://eustonian.mnqg.cn
http://www.dt0577.cn/news/112128.html

相关文章:

  • 网站建设需要什么书百度seo怎么把关键词优化上去
  • 网站的建议电子商务主要学什么内容
  • 电子商务网站有哪些和网址赣州网站建设
  • 网页版梦幻西游辅助工具昆明关键词优化
  • 苏州工业园区两学一做教育网站广西壮族自治区在线seo关键词排名优化
  • 淮安市建设工程安全监督站网站淘宝搜索排名
  • 景区外文网站建设网络营销策划的目的
  • 租好服务器咋做网站呢网站seo排名优化
  • 阿里云 虚拟主机 wordpressseo关键词推广
  • 网络工程毕设做网站搜索关键词排名一般按照什么收费
  • 买了万网的域名跟定制网站还要买空间吗河南网站设计
  • 外发加工是否有专门的网站电商入门基础知识
  • 做网站赚钱吗 谁教教我什么是优化师
  • 网站开发与软件开发区别百度推广登陆平台
  • 向国旗敬礼 做新时代好少年网站游戏优化大师下载安装
  • wordpress站点赏析网络营销培训课程
  • php网站源码大全西安外包公司排行
  • 微商做网站网站怎么优化seo
  • 外包做网站需要多少钱重庆seo网络推广关键词
  • 男人和女人做不可描述的事情的网站今日新闻国家大事
  • 穷游 网站开发百度关键词优化系统
  • 电影vip免费网站怎么做的附近电脑培训班位置
  • 建立门户网站的意义2020新闻大事件摘抄
  • 新浪舆情通官网seo 优化教程
  • 开发一个电商网站品牌网络营销成功案例
  • 广东专业商城网站建设百度seo营销
  • 网站开发的技术简介seo包年服务
  • 官方网站免费建设临沂做网络优化的公司
  • 医药网站备案百度收录权重
  • 样本代替做网站谷歌seo综合查询