当前位置: 首页 > news >正文

建设部网站拆除资质郑州网站顾问

建设部网站拆除资质,郑州网站顾问,短链接生成器手机版,做资料上哪个网站好版本 flink 1.16.0kafka 2.3 流程描述: flink利用KafkaSource,读取kafka的数据,然后经过一系列的处理,通过KafkaSink,采用 EXACTLY_ONCE 的模式,将处理后的数据再写入到新的topic中。 问题描述&#xff1…

版本

  • flink 1.16.0
  • kafka 2.3

流程描述:

flink利用KafkaSource,读取kafka的数据,然后经过一系列的处理,通过KafkaSink,采用 EXACTLY_ONCE 的模式,将处理后的数据再写入到新的topic中。

问题描述:

数据写入到新的topic后,过上几分钟的时间,利用工具offset explorer观察对应topic的数据量,显示为0。
刚写入没多久的数据消失了 ???大写的懵 ???

定位问题:

  • 首先查看kafka的日志:

在这里插入图片描述

  • 阅读flink 官方文档 kafkaSink的介绍:

DeliveryGuarantee.EXACTLY_ONCE: In this mode, the KafkaSink will write
all messages in a Kafka transaction that will be committed to Kafka on
a checkpoint. Thus, if the consumer reads only committed data (see
Kafka consumer config isolation.level), no duplicates will be seen in
case of a Flink restart. However, this delays record visibility
effectively until a checkpoint is written, so adjust the checkpoint
duration accordingly. Please ensure that you use unique
transactionalIdPrefix across your applications running on the same
Kafka cluster such that multiple running jobs do not interfere in
their transactions! Additionally, it is highly recommended to tweak
Kafka transaction timeout (see Kafka producer transaction.timeout.ms)»
maximum checkpoint duration + maximum restart duration or data loss
may happen when Kafka expires an uncommitted transaction.

  • 翻译过来的意思大概就是:

在EXACTLY_ONCE这种模式下,KafkaSink在事务中写入所有的消息,这些消息在checkpoint上提交给kafka。因此,在flink重启的情况下,如果消费者值读取提交的数据,不会看到重复的数据。缺点就是延迟记录可见性,知道写入检查点为止。强烈建议调整kafka的事务超时时间(见Kafka producer transaction.timeout.ms),超时时间要大于【最大检查点持续时间+最大重启持续时间】,否则当Kafka过期未提交的事务时可能会发生数据丢失。

  • 阅读kafka的官网介绍:

Producer Configs:
transaction.timeout.ms:60000(默认值)

参数描述:
The maximum amount of time in ms that the transaction coordinator will
wait for a transaction status update from the producer before
proactively aborting the ongoing transaction.If this value is larger
than the transaction.max.timeout.ms setting in the broker, the request
will fail with a InvalidTransactionTimeout error.

Broker Configs
transaction.max.timeout.ms:900000(默认值)

参数描述:
The maximum allowed timeout for transactions. If a client’s requested
transaction time exceed this, then the broker will return an error in
InitProducerIdRequest. This prevents a client from too large of a
timeout, which can stall consumers reading from topics included in the
transaction.

  • 最后排查
    在flink中设置的超时时间违反了kafka producer对应的参数规定。

解决问题

在kafkaSink的配置中,加入

Properties properties = new Properties();
// 根据上面的介绍自己计算这边的超时时间,满足条件即可
properties.setProperty("transaction.timeout.ms","900000");KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers(bootstrapServers).setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic(sinkTopic).setValueSerializationSchema(new SimpleStringSchema()).build()).setKafkaProducerConfig(properties).setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix("flink-xhaodream-").build();

总结

在使用现有框架和工具的时候,往往只是懂得怎么用,具体底层的逻辑、原理,了解的很少。往往只有真正理解了原理,遇到了问题,才会更快、更准确的定位问题、解决问题。

http://www.dt0577.cn/news/32344.html

相关文章:

  • 自媒体营销代理南京seo排名扣费
  • 梧州论坛蒙山廊坊优化技巧
  • 专门做辅助的扎金花网站郑州网络seo
  • 福田工作招聘公众号排名优化
  • 网站建设图片大小谷歌seo搜索优化
  • 有一个做5s壁纸的网站短视频询盘获客系统
  • 奥数辅导机构网站建设优化设计答案五年级下册
  • 做企业官网设计公司前景seo最新
  • 洛阳做网站找哪家好搜索引擎优化策略应该包括
  • 网站静态和伪静态意思爱链在线
  • 拼多多一键铺货软件南通seo
  • 网站制作综述惠州关键词排名提升
  • wap手机网站开发asp经验爱站网 关键词挖掘工具站长工具
  • 网站设计 术语play商店
  • 陕西建设厅证件查询网站搜狐视频
  • 阿里巴巴网站做销售方案广州网络推广定制
  • 网站服务器崩了怎么办seo代码优化
  • 彩票网站用什么软件做哈尔滨网络推广优化
  • 网页配色网站上海网络排名优化
  • 大创网武安百度seo
  • 开源网站系统西安百度推广优化
  • 在linux上做网站搭建成功的网络营销案例有哪些
  • WordPress 空间模板seo优化网
  • 网站建设公司首页百度如何做推广
  • 勒流网站建设玄幻小说排行榜百度风云榜
  • 优化网站有哪些方法爱上链外链购买平台
  • asp.net获取网站的域名北京百度推广优化排名
  • wordpress默认主题下载地址北京知名seo公司精准互联
  • 做商业网站需要注册公司吗百度搜索广告推广
  • 如何用模板建设网站我国的网络营销公司