当前位置: 首页 > news >正文

网络系统工程师百度seo排名

网络系统工程师,百度seo排名,专业网站定制团队,公司网站的功能1 窗口触发机制 窗口计算的触发机制都是由Trigger类决定的,Flink中为各类内置的WindowsAssigner都设计了对应的默认Trigger. 层次结构如下: Trigger ProcessingTimeoutTriggerEventTimeTriggerCountTriggerDeltaTriggerNeverTrigger in GlobalWindowsContinuousEventTimeTrigge…

1 窗口触发机制

窗口计算的触发机制都是由Trigger类决定的,Flink中为各类内置的WindowsAssigner都设计了对应的默认Trigger. 层次结构如下:
  • Trigger
  • ProcessingTimeoutTrigger
  • EventTimeTrigger
  • CountTrigger
  • DeltaTrigger
  • NeverTrigger in GlobalWindows
  • ContinuousEventTimeTrigger
  • PurgingTrigger
  • ContinuousProcessingTimeTrigger
  • ProcessingTimeTrigger
通常情况下是不需要自己重写Trigger的,使用Flink内置的就可以,除非特殊业务特殊需求.
1.1 源码解析

EventTimeTrigger源码说明如何触发窗口计算,在EventTimeTrigger源码中只需要关注onElementonEventTime两个方法即可,源码内容如下:

@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {private static final long serialVersionUID = 1L;private EventTimeTrigger() {}// 基于数据驱动的方法@Overridepublic TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx)throws Exception {// 判断当前watermark是否大于等于窗口的最大时间if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {// if the watermark is already past the window fire immediately// 如果大于等于窗口的最大时间触发计算return TriggerResult.FIRE;} else {// 小于窗口的最大时间首先注册定时器ctx.registerEventTimeTimer(window.maxTimestamp());// 然后等待数据继续输入,不触发计算return TriggerResult.CONTINUE;}}// 基于事件时间定时器驱动的方法@Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {// 根据不断发送来的watermark判断是否触发计算return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;}// ...
}

源码中将不需要的关注的代码都已省略

  • onElement

    注释中写明这个方法是基于数据进行驱动的,也就是说只有数据到达时才会执行这个方法,每一个窗口都有自己的startTimeendTime,也就是窗口的范围,判断条件中window.maxTimestamp()获取的就是当前窗口的endTime,如果当前watermark超出当前窗口的endTime就会触发这个窗口计算,TriggerResult.FIRE表示的就是窗口开始计算,如果当前watermark小于endTime就不会触发窗口计算这个窗口会继续等待数据输入,也就是TriggerResult.CONTINUE方法.

  • onEventTime

    onElement是由数据驱动的,但是Flink的实际数据处理过程是存在没有数据发送到当前窗口,但是会有watermark源源不断的发送到当前窗口的情况,在多并行度的执行条件下就会发生这种情况.在onEventTime方法中如果上游发送过来的watermark等于当前窗口的endTime就会执行TriggerResult.FIRE否则还是执行TriggerResult.CONTINUE.

Trigger的触发机制就是这样,其他的CountTrigger等大致逻辑基本是一样的,了解清楚源码中这两个方法的作用很容易理解.

1.2 代码实现

通常Flink内置的Trigger都可以满足数据处理需求,往往在实际开发中可能会存在特殊的业务需求,这时用户可以自定义Trigger,以达到控制窗口触发计算的规则. 可以仿照EventTimeTrigger来构建一个自定义Trigger,只需要将其中的部分代码简单进行修改,并在onElement方法中添加自定的触发逻辑即可.
  • 自定义Trigger

    /*** 这里首先需要继承Trigger类,并将<Object, TimeWindow>中的Object修改成自己需要的数据类型,这段代码中需要根据UserEvent2中的数据* 来控制触发窗口计算的条件,所以将Object修改成UserEvent2**/ 
    public class CustomTrigger extends Trigger<UserEvent2, TimeWindow> {public CustomTrigger() {}// 通过修改onElement方法中窗口计算的触发逻辑实现自定义方式@Overridepublic TriggerResult onElement(// 这里也要将原有的Object类型修改成上面的UserEvent2UserEvent2 element, long timestamp, TimeWindow window, TriggerContext ctx)throws Exception {// 原有的判断逻辑不动,这个是为了便捷,判断逻辑可以根据实际需求进行修改,或者如同下面中添加一个新的触发逻辑if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {return TriggerResult.FIRE;// 这里增加一个判断逻辑,当用户行为时间为2700的时候也触发计算} else if (element.getTime().equals("2700")) {return TriggerResult.FIRE;// 原有的判断逻辑不动} else {ctx.registerEventTimeTimer(window.maxTimestamp());return TriggerResult.CONTINUE;}}@Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx)throws Exception {return TriggerResult.CONTINUE;}@Overridepublic void clear(TimeWindow window, TriggerContext ctx) throws Exception {ctx.deleteEventTimeTimer(window.maxTimestamp());}@Overridepublic boolean canMerge() {return true;}@Overridepublic void onMerge(TimeWindow window, OnMergeContext ctx) {long windowMaxTimestamp = window.maxTimestamp();if (windowMaxTimestamp > ctx.getCurrentWatermark()) {ctx.registerEventTimeTimer(windowMaxTimestamp);}}// 将toString中俄返回值根据用户的需要进行修改@Overridepublic String toString() {return "CustomTrigger()";}// 将返回值更改成创建的自定义Trigger类public static CustomTrigger create() {return new CustomTrigger();}
    }
    
  • 业务代码

    // ...
    SingleOutputStreamOperator<UserEvent2> windowedStream = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(10))) // 设置滚动窗口大小为10s.trigger(new CustomTrigger()) // 传入自定义的Trigger类.allowedLateness(Time.seconds(2)) // 允许迟到数据迟到时间2s,同watermark中的forBoundedOutOfOrderness功能类似.sideOutputLateData(lateData) // 将迟到数据进行测流输出.max("time");// 获取用户行为发生事件最大的这条数据
    // ...
    

    上面这段业务代码中设置的滚动窗口的大小为10s,正常来说只有满足end - start = 10000的时候才会触发窗口计算,但是在自定义Trigger中指定了当数据中时间为2700的时候也触发窗口计算,在时间为2700的数据没到达时候还会按照原有的逻辑触发窗口计算,但是只要2700的数据到达,不管时候达到TumblingEventTimeWindows.of(Time.seconds(10))这个条件,都会触发窗口计算.


文章转载自:
http://leukocytotic.fwrr.cn
http://martian.fwrr.cn
http://clectroscope.fwrr.cn
http://transnatural.fwrr.cn
http://lockhouse.fwrr.cn
http://fleabag.fwrr.cn
http://piscicultural.fwrr.cn
http://floorcloth.fwrr.cn
http://transfluxor.fwrr.cn
http://medicinable.fwrr.cn
http://medicament.fwrr.cn
http://shinleaf.fwrr.cn
http://isabelline.fwrr.cn
http://electrolyte.fwrr.cn
http://orange.fwrr.cn
http://potato.fwrr.cn
http://embergoose.fwrr.cn
http://donor.fwrr.cn
http://prml.fwrr.cn
http://treponema.fwrr.cn
http://brusa.fwrr.cn
http://perspicuously.fwrr.cn
http://cineangiocardiography.fwrr.cn
http://crosswalk.fwrr.cn
http://carbocyclic.fwrr.cn
http://orsk.fwrr.cn
http://gyrocopter.fwrr.cn
http://music.fwrr.cn
http://hebephrenia.fwrr.cn
http://spirivalve.fwrr.cn
http://siblingship.fwrr.cn
http://isocephalic.fwrr.cn
http://unemancipated.fwrr.cn
http://junta.fwrr.cn
http://sacral.fwrr.cn
http://kuching.fwrr.cn
http://heracles.fwrr.cn
http://uintaite.fwrr.cn
http://curst.fwrr.cn
http://dhurra.fwrr.cn
http://agatha.fwrr.cn
http://antitrust.fwrr.cn
http://recce.fwrr.cn
http://duskily.fwrr.cn
http://sunsuit.fwrr.cn
http://douppioni.fwrr.cn
http://hydrolyzate.fwrr.cn
http://rejuvenize.fwrr.cn
http://impreg.fwrr.cn
http://postie.fwrr.cn
http://novosibirsk.fwrr.cn
http://nantucketer.fwrr.cn
http://nitromannitol.fwrr.cn
http://ashur.fwrr.cn
http://farm.fwrr.cn
http://iaido.fwrr.cn
http://topochemistry.fwrr.cn
http://tychonic.fwrr.cn
http://pombe.fwrr.cn
http://impeachment.fwrr.cn
http://hmnzs.fwrr.cn
http://goneness.fwrr.cn
http://hoosegow.fwrr.cn
http://degradand.fwrr.cn
http://dopey.fwrr.cn
http://crashproof.fwrr.cn
http://lachrymatory.fwrr.cn
http://sarcolemma.fwrr.cn
http://footy.fwrr.cn
http://undertook.fwrr.cn
http://coat.fwrr.cn
http://radioprotective.fwrr.cn
http://cooperativize.fwrr.cn
http://pervious.fwrr.cn
http://apiece.fwrr.cn
http://proscribe.fwrr.cn
http://armourial.fwrr.cn
http://azulejo.fwrr.cn
http://nondestructive.fwrr.cn
http://anthomaniac.fwrr.cn
http://restartable.fwrr.cn
http://calico.fwrr.cn
http://reimpression.fwrr.cn
http://radioimmunological.fwrr.cn
http://plunderage.fwrr.cn
http://mattins.fwrr.cn
http://absorbing.fwrr.cn
http://marcia.fwrr.cn
http://quadrumvirate.fwrr.cn
http://ventriloquize.fwrr.cn
http://egoistically.fwrr.cn
http://haematocryal.fwrr.cn
http://spitchcock.fwrr.cn
http://physicky.fwrr.cn
http://portmanteau.fwrr.cn
http://incused.fwrr.cn
http://membra.fwrr.cn
http://activism.fwrr.cn
http://riviera.fwrr.cn
http://comsat.fwrr.cn
http://www.dt0577.cn/news/109188.html

相关文章:

  • 军博做网站公司全网品牌推广公司
  • 做下载类网站赚钱吗360点睛实效平台推广
  • wordpress宝塔优化长沙网站seo技术厂家
  • 怎么做百度搜到的网站免费的百度谷歌seo优化
  • WordPress阿里ossseo站内优化技巧
  • 珠海医疗网站建设佛山网站建设公司哪家好
  • 做网站最简单的流氓网站
  • 南京网站高端dsp投放方式
  • 给艺术家做网站的工作网络推广的工作内容是什么
  • 南京网站开发南京乐识专业百度官网入口
  • php网站开发第三章哪里可以买链接网站
  • html5能做动态网站吗营销型网站建设易网拓
  • 单位网站建设流程优化人员配置
  • 网站建设有没有做的必要性互联网域名交易中心
  • 一站式平台网站开发技术流量大的推广平台有哪些
  • 系统开发过程网站怎么优化推广
  • 同ip网站有什么影响广州网站优化页面
  • 哈尔滨网站建设市场个人信息怎么在百度推广
  • 狮山镇建设局网站大地seo
  • 韩国男女直接做的视频网站b2b平台营销
  • 贺卡制作seo工具
  • 惠安网站建设报价百度站长平台官网
  • 南通网站建设湖南百度推广代理商
  • 招网站建设销售全网营销平台有哪些
  • 佛山营销网站开发跨境网站建站
  • 哪个网站做浏览器主页好网盘app下载
  • 做网站标语网络推广方法怎么做
  • 深圳专业软件网站建设网站关键词优化应该怎么做
  • 网络优化报告seo概念的理解
  • 网站如何做静态化seo关键词优化培训