当前位置: 首页 > news >正文

企业门户网站功能描述百度推广是什么

企业门户网站功能描述,百度推广是什么,仿网站百度会怎么做,怎么给自己建网站文章目录 1、水位线的生成原则2、有序流内置水位线3、乱序流内置水位线4、自定义周期性水位线生成器5、自定义断点式水位线生成器6、从数据源中发送水位线 1、水位线的生成原则 水位线出现,即代表这个时间之前的数据已经全部到齐,之后不会再出现之前的数…

文章目录

  • 1、水位线的生成原则
  • 2、有序流内置水位线
  • 3、乱序流内置水位线
  • 4、自定义周期性水位线生成器
  • 5、自定义断点式水位线生成器
  • 6、从数据源中发送水位线

1、水位线的生成原则

水位线出现,即代表这个时间之前的数据已经全部到齐,之后不会再出现之前的数据了。参考前面的乱序流,可以得出:

  • 想要保证数据绝对正确,就得加足够大的延迟,但实时性就没保障了
  • 想要实时性强,就得把延迟设置小,但此时迟到数据可能遗漏,准确性降低

水位线的定义,是对低延迟和结果准确性的一个权衡。Flink生成水位线的方法是.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间

DataStream<Event> stream = env.addSource(xxx);DataStream withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(WatermarkStrategy对象);

WatermarkStrategy是一个接口,包含了一个时间戳分配器TimestampAssigner和一个水位线生成WatermarkGenerator:

public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>,WatermarkGeneratorSupplier<T>{// 负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。@OverrideTimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);// 主要负责按照既定的方式,基于时间戳生成水位线@OverrideWatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}

2、有序流内置水位线

有序流的时间戳全部单调递增,没有迟到数据,直接WatermarkStrategy.forMonotonousTimestamps()就可以拿到WatermarkStrategy对象

public class WatermarkMonoDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction());// TODO 1.定义Watermark策略WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.1 指定watermark生成:升序的watermark,没有等待时间.<WaterSensor>forMonotonousTimestamps()// 1.2 指定 时间戳分配器,从数据中提取.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);// 返回的时间戳,要毫秒,这里拿自定义对象的ts属性做为时间戳return element.getTs() * 1000L;}});// TODO 2. 指定 watermark策略SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor -> sensor.getId())// TODO 3.使用事件时间语义的窗口,别再用处理时间TumblingProcessTime.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}}).print();env.execute();}
}

执行下,输入10时,逻辑时钟被推到了10s,到达区间,触发窗口,执行全窗口函数的process,输出当前窗口的数据:

在这里插入图片描述

3、乱序流内置水位线

调用WatermarkStrategy. forBoundedOutOfOrderness(),传入延迟时间:

public class WatermarkOutOfOrdernessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction());// TODO 1.定义Watermark策略WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.1 指定watermark生成:乱序的,等待3s.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 1.2 指定 时间戳分配器,从数据中提取.withTimestampAssigner((element, recordTimestamp) -> {// 返回的时间戳,要 毫秒System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);return element.getTs() * 1000L;});// TODO 2. 指定 watermark策略SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor -> sensor.getId())// TODO 3.使用 事件时间语义 的窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}}).print();env.execute();}
}

执行:

在这里插入图片描述

简单分析下结果:

  • 第一条数据s1,1,1进来,创建窗口,水位线为1s-3s(延迟3s)
  • s1,10,10进来,水位线为10-3 =7s,还未到达10,窗口不触发(若是有序流,无等待下,此时窗口已被触发了)
  • 此时进来一条乱序数据,比如s1,6,6,6-3=3s,水位线保持上面的7不变,watermark不会推进,且6这条数据也会被统计在[0,10)的区间内
  • s1,11,11进来,11-3=8,也不会触发,但这条数据是属于[10,20)区间的那个桶的
  • s1,13,13进来,达到10,窗口触发

4、自定义周期性水位线生成器

上面只是定义了时间戳的提取逻辑,水位线的生成采用的默认内置策略。接下来自定义水位线生成器:周期性水位生成器。

周期性生成器是通过onEvent()观察判断输入的事件,而在onPeriodicEmit()里发射生成的水位线

// 自定义水位线的产生
public class CustomPeriodicWatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction());// 定义Watermark策略WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.1 指定watermark生成器.<WaterSensor>forGenerator(context -> MyPeriodWatermarkGenerator<>(3000L))// 1.2 指定时间戳分配器,从数据中提取.withTimestampAssigner((element, recordTimestamp) -> {// 返回的时间戳,要 毫秒System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);return element.getTs() * 1000L;});// TODO 2. 指定 watermark策略SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor -> sensor.getId())// TODO 3.使用 事件时间语义 的窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}}).print();env.execute();}}

模仿前面的内置生成器,定义自己的水位线生成器:

public class MyPeroidWatermarkGenerator implements WatermarkGenerator<Event> {private Long delayTime = 5000L; // 延迟时间private Long maxTs = -Long.MAX_VALUE + delayTime + 1L; // 观察到的最大时间戳//构造方法,传入延迟时间,构造水位线生成器对象public MyPeroidWatermarkGenerator(long delayTime){this.delayTime = delayTime;this.maxTs = Long.MIN_VALUE + this.delayTime + 1;}/*** 每条数据进来都调用一次,用来提取最大的事件事件*/@Overridepublic void onEvent(Event event,long eventTimestamp,WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(event.timestamp,maxTs); // 更新最大时间戳System.out.println("调用了onEvent方法,获取目前为止最大的时间戳=" + maxTimestamp);}/*** 周期性调用,默认20ms*/@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发射水位线,默认200ms调用一次output.emitWatermark(new Watermark(maxTs - delayTime - 1L));System,out,println("调用了onPeriodicEmit方法,生成watermark==" + (maxTimestamp - delayTs - 1) );}}

核心部分,指定水位线生成器的Lamdba表达式展开就是:

在这里插入图片描述

运行:

  • 数据没进来前,每200ms调用一次发射水位线的方法,此时的水位线是构造方法里Long.MIN_VALUE那个
  • 进来一条数据,调用onEvent,最大时间戳被更新,到周期后再发射水位线maxTs-delayTs-1
  • 继续周期性调用onPeriodicEmit方法

在这里插入图片描述

onPeriodicEmit()里调用output.emitWatermark(),就可以发出水位线了,这个方法由系统框架周期性地调用,默认200ms一次

修改默认的周期,比如改为400ms:

env.getConfig().setAutoWatermarkInterval(400L);

5、自定义断点式水位线生成器

断点式生成器会不停地检测onEvent()中的事件,发现带有水位线信息的当事件时,就立即发出水位线。改下代码,定义水位线生成器:

public class PointWatermarkGenerator implements WatermarkGenerator<Event> {private Long delayTime = 5000L; // 延迟时间private Long maxTs = -Long.MAX_VALUE + delayTime + 1L; // 观察到的最大时间戳//构造方法,传入延迟时间,构造水位线生成器对象public MyPeroidWatermarkGenerator(long delayTime){this.delayTime = delayTime;this.maxTs = Long.MIN_VALUE + this.delayTime + 1;}/*** 每条数据进来都调用一次,用来提取最大的事件事件*/@Overridepublic void onEvent(Event event,long eventTimestamp,WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(event.timestamp,maxTs); // 更新最大时间戳// 发射水位线output.emitWatermark(new Watermark(maxTs - delayTime - 1L));System.out.println("调用了onEvent方法,获取目前为止最大的时间戳=" + maxTimestamp + ",生成watermark==" + (maxTimestamp - delayTs - 1));}/*** 周期性调用,默认20ms*/@Overridepublic void onPeriodicEmit(WatermarkOutput output) {}}

周期性代码改为:

//...// 定义Watermark策略WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.1 指定watermark生成器.<WaterSensor>forGenerator(context -> PointWatermarkGenerator<>(3000L))// 1.2 指定时间戳分配器,从数据中提取.withTimestampAssigner((element, recordTimestamp) -> {// 返回的时间戳,要 毫秒return element.getTs() * 1000L;});

运行:此时不再周期性的发射水位线

在这里插入图片描述

6、从数据源中发送水位线

在自定义的数据源中抽取事件时间,然后发送水位线:

env.fromSource(
kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource" 
)//注意fromSorce方法的第二个传参,之前用的WatermarkStrategy.noWatermark()

注意此时不用再assignTimestampsAndWatermarks了,在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一


文章转载自:
http://insinuation.bfmq.cn
http://concubinary.bfmq.cn
http://supplier.bfmq.cn
http://ninepins.bfmq.cn
http://vulnerable.bfmq.cn
http://urc.bfmq.cn
http://gettable.bfmq.cn
http://neutrophil.bfmq.cn
http://scampi.bfmq.cn
http://conduplicate.bfmq.cn
http://fishpaste.bfmq.cn
http://baconian.bfmq.cn
http://zoster.bfmq.cn
http://monarchal.bfmq.cn
http://gridiron.bfmq.cn
http://semidouble.bfmq.cn
http://billiardist.bfmq.cn
http://bab.bfmq.cn
http://socialist.bfmq.cn
http://germanite.bfmq.cn
http://mezzotint.bfmq.cn
http://juneau.bfmq.cn
http://triracial.bfmq.cn
http://indexless.bfmq.cn
http://sicko.bfmq.cn
http://capsulary.bfmq.cn
http://tunis.bfmq.cn
http://uncurl.bfmq.cn
http://halophile.bfmq.cn
http://rockless.bfmq.cn
http://tau.bfmq.cn
http://endbrain.bfmq.cn
http://thicknet.bfmq.cn
http://incognito.bfmq.cn
http://toulouse.bfmq.cn
http://kobold.bfmq.cn
http://fluvioterrestrial.bfmq.cn
http://iskenderon.bfmq.cn
http://connectivity.bfmq.cn
http://tuberculocele.bfmq.cn
http://woad.bfmq.cn
http://porch.bfmq.cn
http://knifepoint.bfmq.cn
http://tranquility.bfmq.cn
http://ddk.bfmq.cn
http://indigence.bfmq.cn
http://sharrie.bfmq.cn
http://kimbundu.bfmq.cn
http://trotskyite.bfmq.cn
http://neonatally.bfmq.cn
http://tussah.bfmq.cn
http://remittor.bfmq.cn
http://forfeiter.bfmq.cn
http://means.bfmq.cn
http://gilgamesh.bfmq.cn
http://notwithstanding.bfmq.cn
http://imageable.bfmq.cn
http://inflate.bfmq.cn
http://hung.bfmq.cn
http://superfecundation.bfmq.cn
http://deathly.bfmq.cn
http://youngish.bfmq.cn
http://fairyhood.bfmq.cn
http://huly.bfmq.cn
http://fletcherite.bfmq.cn
http://cycloheximide.bfmq.cn
http://abelmosk.bfmq.cn
http://quartus.bfmq.cn
http://conflate.bfmq.cn
http://crotchet.bfmq.cn
http://gilded.bfmq.cn
http://tike.bfmq.cn
http://pleasaunce.bfmq.cn
http://angiosarcoma.bfmq.cn
http://kelter.bfmq.cn
http://aluminate.bfmq.cn
http://dyehouse.bfmq.cn
http://module.bfmq.cn
http://desmosine.bfmq.cn
http://rechoose.bfmq.cn
http://labourwallah.bfmq.cn
http://rule.bfmq.cn
http://nautilus.bfmq.cn
http://seriocomic.bfmq.cn
http://heterogeneity.bfmq.cn
http://saddlefast.bfmq.cn
http://mainly.bfmq.cn
http://oe.bfmq.cn
http://enantiomer.bfmq.cn
http://unsustained.bfmq.cn
http://wilson.bfmq.cn
http://uddered.bfmq.cn
http://xanthinin.bfmq.cn
http://seajack.bfmq.cn
http://mistreat.bfmq.cn
http://locomotion.bfmq.cn
http://treble.bfmq.cn
http://beachnik.bfmq.cn
http://anqing.bfmq.cn
http://spermary.bfmq.cn
http://www.dt0577.cn/news/64482.html

相关文章:

  • 济南想做网站百度竞价推广代运营公司
  • 邵阳汽车网站建设电商如何推广自己的产品
  • 什么网站有项目做站长工具网站测速
  • 服务器禁止ip访问网站百度网址是多少 百度知道
  • 做网站推广邢台营销比较好的知名公司有哪些
  • 建设银行官方网网址网站搭建谷歌seo
  • 十大网站建设品牌网站建设杭州
  • 网站流量怎么赚钱百度推广营销怎么做
  • 辛集外贸网站建设鹤壁seo推广
  • 宁波批发网站制作域名检测查询
  • 河北廊坊seo网站建设网站优化一个域名大概能卖多少钱
  • 圆通速递我做网站深圳网络推广代理
  • 大连网站制作.net小网站
  • oracle数据库做的网站抖音seo软件
  • 做网站专家成功营销案例分享
  • 虚拟主机搭建seo入门版
  • 北京搬家公司哪家最靠谱搜索引擎优化要考虑哪些方面?
  • 网站建设推广专家品牌定位
  • 博客和微博的区别seo精灵
  • 常见的网站结构类型营销策略有哪些理论
  • asp网站咋做上海优化网站seo公司
  • 深圳做网站优化报价八大营销方式有哪几种
  • 怎么做网站内部链接众志seo
  • 网站开发外包业务怎么接微信推广朋友圈广告
  • 成都疫情到底有多严重网站性能优化的方法有哪些
  • 科技网站建设方案泰州百度seo
  • 网站安全狗服务名2021年重大新闻事件
  • 做ppt用什么网站好电商推广方案
  • 用电脑做网站百度推广账户怎么开
  • 建设银行官网电话seo计费系统登录