当前位置: 首页 > news >正文

wordpress自制游戏北京百度seo价格

wordpress自制游戏,北京百度seo价格,做母亲节网站的素材,wordpress 文章优化前言 Flink 数据流经过 keyBy 分组后,下一步就是 WindowAssigner。 WindowAssigner 定义了 stream 中的元素如何被分发到各个窗口,元素可以被分发到一个或多个窗口中,Flink 内置了常用的窗口分配器,包括:tumbling wi…

前言

Flink 数据流经过 keyBy 分组后,下一步就是 WindowAssigner。

WindowAssigner 定义了 stream 中的元素如何被分发到各个窗口,元素可以被分发到一个或多个窗口中,Flink 内置了常用的窗口分配器,包括:tumbling windows、 sliding windows、 session windows 和 global windows。除了 global windows ,其它分配器都是基于时间来分发数据的。

当然,你也可以继承 WindowAssigner 抽象类实现自定义的窗口分配逻辑。

WindowAssigner

先看一下 WindowAssigner 抽象类的定义:

@PublicEvolving
public abstract class WindowAssigner<T, W extends Window> implements Serializable {private static final long serialVersionUID = 1L;public WindowAssigner() {}public abstract Collection<W> assignWindows(T var1, long var2, WindowAssignerContext var4);public Trigger<T, W> getDefaultTrigger() {return this.getDefaultTrigger(new StreamExecutionEnvironment());}/** @deprecated */@Deprecatedpublic abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment var1);public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig var1);public abstract boolean isEventTime();@PublicEvolvingpublic abstract static class WindowAssignerContext {public WindowAssignerContext() {}public abstract long getCurrentProcessingTime();}
}

四个方法,作用如下:

  • assignWindows 将元素 element 分发到一个或多个窗口,返回值是窗口集合
  • getDefaultTrigger 返回默认的窗口触发器 Trigger
  • getWindowSerializer 返回窗口序列化器(窗口也要在算子间传输)
  • isEventTime 是否基于事件时间语义

Flink 内置的 WindowAssigner 实现类关系图如下:

首先,可以按照基于何种时间语义划分出三大类:

  • 基于事件时间语义
  • 基于处理时间语义
  • 不基于时间语义 --> GlobalWindows

在基于时间语义的大类下面,又可以按照时间窗口算法划分为三个具体实现:

  • 滚动窗口分配算法 tumbling windows
  • 滑动窗口分配算法 sliding windows
  • 会话窗口分配算法 session windows

定义窗口Window

窗口对象被 Flink 统一封装为抽象类org.apache.flink.streaming.api.windowing.windows.Window,Flink 内置了两种实现,分别是:

  • TimeWindow 基于时间范围的窗口,包含开始时间戳和结束时间戳
  • GlobalWindow 全局窗口,与时间无关的窗口

如果内置的这两种窗口无法满足你的需求,你也可以自定义窗口。需要注意的是,窗口本身是要在算子间传输的,所以你在自定义窗口的同时,还必须提供一个窗口序列化器,以便于 Flink 可以将你的窗口对象序列化传输。

如下示例,我们定义了一个基于数字范围的 NumberWindow,可以将一个数字划分到对应的数字范围窗口内。

public class NumberWindow extends Window {private final int min;private final int max;public NumberWindow(int min, int max) {this.min = min;this.max = max;}public int getMin() {return min;}public int getMax() {return max;}@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;NumberWindow that = (NumberWindow) o;return min == that.min && max == that.max;}@Overridepublic int hashCode() {return Objects.hash(min, max);}@Overridepublic long maxTimestamp() {return Long.MAX_VALUE;}
}

Window 实现还必须配套一个序列化器,主要是实现 两个int变量到窗口对象的转换。

public static class Serializer extends TypeSerializerSingleton<NumberWindow> {@Overridepublic boolean isImmutableType() {return true;}@Overridepublic NumberWindow createInstance() {return new NumberWindow(0, 0);}@Overridepublic NumberWindow copy(NumberWindow numberWindow) {return numberWindow;}@Overridepublic NumberWindow copy(NumberWindow numberWindow, NumberWindow t1) {return numberWindow;}@Overridepublic int getLength() {return 8;}@Overridepublic void serialize(NumberWindow numberWindow, DataOutputView dataOutputView) throws IOException {dataOutputView.writeInt(numberWindow.getMin());dataOutputView.writeInt(numberWindow.getMax());}@Overridepublic NumberWindow deserialize(DataInputView dataInputView) throws IOException {return new NumberWindow(dataInputView.readInt(), dataInputView.readInt());}@Overridepublic NumberWindow deserialize(NumberWindow numberWindow, DataInputView dataInputView) throws IOException {return this.deserialize(dataInputView);}@Overridepublic void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {dataOutputView.writeInt(dataInputView.readInt());dataOutputView.writeInt(dataInputView.readInt());}@Overridepublic TypeSerializerSnapshot<NumberWindow> snapshotConfiguration() {return new TimeWindowSerializerSnapshot();}public static final class TimeWindowSerializerSnapshot extends SimpleTypeSerializerSnapshot<NumberWindow> {public TimeWindowSerializerSnapshot() {super(Serializer::new);}}
}

自定义WindowAssigner

窗口对象定义好了,接下来就是定义窗口分配对象。

简单原则,我们把数字划分为三个窗口,分别是:小数窗口、中位数窗口、大数窗口。
如下示例,继承 WindowAssigner 类,重写 assignWindows 方法,把数字划分到对应的窗口中。

public static class MyWindowAssigner extends WindowAssigner<Integer, NumberWindow> {private final int startingMedian;private final int startingLarge;public MyWindowAssigner(int startingMedian, int startingLarge) {this.startingMedian = startingMedian;this.startingLarge = startingLarge;}@Overridepublic Collection<NumberWindow> assignWindows(Integer element, long timestamp, WindowAssignerContext windowAssignerContext) {// 将数字划分到 小数、中位数、大数 窗口NumberWindow window;if (element < startingMedian) {window = new NumberWindow(Integer.MIN_VALUE, startingMedian - 1);} else if (element < startingLarge) {window = new NumberWindow(startingMedian, startingLarge - 1);} else {window = new NumberWindow(startingLarge, Integer.MAX_VALUE);}return List.of(window);}@Overridepublic Trigger<Integer, NumberWindow> getDefaultTrigger(StreamExecutionEnvironment streamExecutionEnvironment) {return null;}@Overridepublic TypeSerializer<NumberWindow> getWindowSerializer(ExecutionConfig executionConfig) {return new NumberWindow.Serializer();}@Overridepublic boolean isEventTime() {return false;}
}

把流程串起来

窗口对象和窗口分配的逻辑都有了,接下来就是把整个流程给串起来。

如下示例程序,我们定义了一个一秒内生成10个一百以内随机数的数据源Source,然后将这些数字流分为一组,并为其指定我们自定义的 MyWindowAssigner 窗口分配策略,策略中划分了三个窗口,数字小于20的归为小数一档、20到80的归为中位数一档、大于80的归为大数一档,根本数字分配对应的窗口。然后我们自定义了 Trigger,当窗口内积攒的数字达到十个,就触发窗口计算并关闭窗口。最终 ProcessWindowFunction 打印窗口内的数字并求和。

public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.addSource(new SourceFunction<Integer>() {@Overridepublic void run(SourceContext<Integer> sourceContext) throws Exception {while (true) {Threads.sleep(100);sourceContext.collect(ThreadLocalRandom.current().nextInt(100));}}@Overridepublic void cancel() {}}).keyBy(i -> "all").window(new MyWindowAssigner(20, 80)).trigger(new Trigger<Integer, NumberWindow>() {@Overridepublic TriggerResult onElement(Integer element, long timestamp, NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {ValueState<Integer> countState = triggerContext.getPartitionedState(new ValueStateDescriptor<>("count", Integer.class));Integer count = Optional.ofNullable(countState.value()).orElse(0) + 1;if (count < 10) {countState.update(count);return TriggerResult.CONTINUE;}countState.update(0);return TriggerResult.FIRE_AND_PURGE;}@Overridepublic TriggerResult onProcessingTime(long timestamp, NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {return null;}@Overridepublic TriggerResult onEventTime(long timestamp, NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {return null;}@Overridepublic void clear(NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {}}).process(new ProcessWindowFunction<Integer, Object, String, NumberWindow>() {@Overridepublic void process(String key, ProcessWindowFunction<Integer, Object, String, NumberWindow>.Context context, Iterable<Integer> iterable, Collector<Object> collector) throws Exception {StringBuilder builder = new StringBuilder("[" + context.window().getMin() + " - " + context.window().getMax() + "] [");int sum = 0;for (Integer value : iterable) {builder.append(value + ",");sum += value;}builder.append("] sum=" + sum);System.err.println(builder.toString());}});environment.execute();
}

运行 Flink 作业,控制台输出:

[20 - 79] [30,32,24,66,63,37,] sum=252
[20 - 79] [71,48,41,55,75,79,] sum=369
[80 - 2147483647] [99,90,88,98,85,99,] sum=559
[20 - 79] [74,30,56,70,36,78,] sum=344

尾巴

Flink 的 WindowAssigner 在数据处理中发挥着关键作用。它决定了如何将源源不断的数据流切分成不同的窗口,以便进行有针对性的聚合、计算和分析。
通过合理配置 WindowAssigner,我们能够根据时间、数量或自定义的逻辑来划分数据,灵活地适应各种业务场景。这使得 Flink 能够对海量的实时数据进行高效且精准的处理,帮助我们从数据中提取有价值的信息和洞察。


文章转载自:
http://caac.mnqg.cn
http://begats.mnqg.cn
http://yesty.mnqg.cn
http://aus.mnqg.cn
http://ringtoss.mnqg.cn
http://permeameter.mnqg.cn
http://filtrability.mnqg.cn
http://borderland.mnqg.cn
http://throb.mnqg.cn
http://spadefoot.mnqg.cn
http://reach.mnqg.cn
http://colatitude.mnqg.cn
http://jawline.mnqg.cn
http://backslash.mnqg.cn
http://antiproton.mnqg.cn
http://valkyr.mnqg.cn
http://unpitied.mnqg.cn
http://unstructured.mnqg.cn
http://zoroastrianism.mnqg.cn
http://aeroballistic.mnqg.cn
http://absolve.mnqg.cn
http://taiwan.mnqg.cn
http://endocranium.mnqg.cn
http://agrologist.mnqg.cn
http://mae.mnqg.cn
http://amateurship.mnqg.cn
http://caloricity.mnqg.cn
http://exconvict.mnqg.cn
http://flirt.mnqg.cn
http://halfpenny.mnqg.cn
http://corkily.mnqg.cn
http://insuperability.mnqg.cn
http://stimulant.mnqg.cn
http://malodorous.mnqg.cn
http://fenland.mnqg.cn
http://clubman.mnqg.cn
http://jointweed.mnqg.cn
http://illawarra.mnqg.cn
http://arteriovenous.mnqg.cn
http://trouper.mnqg.cn
http://technopolis.mnqg.cn
http://fany.mnqg.cn
http://winterbeaten.mnqg.cn
http://oblivion.mnqg.cn
http://telegraphese.mnqg.cn
http://immortally.mnqg.cn
http://band.mnqg.cn
http://hystrichosphere.mnqg.cn
http://devoice.mnqg.cn
http://summons.mnqg.cn
http://electrochronograph.mnqg.cn
http://sovprene.mnqg.cn
http://digynia.mnqg.cn
http://residually.mnqg.cn
http://redemptive.mnqg.cn
http://cassiterite.mnqg.cn
http://voyeuristic.mnqg.cn
http://dactylogram.mnqg.cn
http://torture.mnqg.cn
http://roquefort.mnqg.cn
http://exilian.mnqg.cn
http://dower.mnqg.cn
http://toulouse.mnqg.cn
http://fumarase.mnqg.cn
http://scalprum.mnqg.cn
http://servantgirl.mnqg.cn
http://odette.mnqg.cn
http://guangxi.mnqg.cn
http://billingsgate.mnqg.cn
http://halomethane.mnqg.cn
http://worth.mnqg.cn
http://antialien.mnqg.cn
http://ohms.mnqg.cn
http://cgmp.mnqg.cn
http://protohistory.mnqg.cn
http://tender.mnqg.cn
http://giocoso.mnqg.cn
http://ass.mnqg.cn
http://omphalos.mnqg.cn
http://enteric.mnqg.cn
http://dislocate.mnqg.cn
http://caulome.mnqg.cn
http://johnson.mnqg.cn
http://discreetly.mnqg.cn
http://freshman.mnqg.cn
http://comfortless.mnqg.cn
http://illegalization.mnqg.cn
http://caesarism.mnqg.cn
http://quaggy.mnqg.cn
http://variomatic.mnqg.cn
http://trap.mnqg.cn
http://subastral.mnqg.cn
http://rupturable.mnqg.cn
http://mio.mnqg.cn
http://knighthood.mnqg.cn
http://guff.mnqg.cn
http://spokesman.mnqg.cn
http://canine.mnqg.cn
http://metafiction.mnqg.cn
http://carded.mnqg.cn
http://www.dt0577.cn/news/109236.html

相关文章:

  • 做网站美工搜狗seo怎么做
  • 物流网站模板app营销十大成功案例
  • 曲靖网站制作一条龙百度seo排名如何提升
  • 做旅游网站教程齐三seo顾问
  • 地区性网站制作松原市新闻
  • 微信小程序源码免费下载关键词优化外包
  • 企业网站建设全套流程赣州seo培训
  • 行业门户网站建设方案书网易疫情实时最新数据
  • 做网站的难点是什么网站排名优化客服
  • 贵阳好的网站建设seo外包是什么意思
  • 东莞电子产品网站建设网络培训机构排名前十
  • 网络诚信 网站应怎么做今天微博热搜前十名
  • 现在个人做网站或者app还有收益产品营销推广方案
  • 网站设计专业有前途吗郑州seo网站有优化
  • 邢台网站制作哪里好深圳关键词优化公司哪家好
  • 网站建设软件是什么意思腾讯与中国联通
  • 邢台地区网站建设增加百度指数的四种方法
  • 个人资料库网站怎么做百度知道网页版进入
  • 设计常用网站单页应用seo如何解决
  • 网站建设申请报告seo查询在线
  • wordpress图片上传错误网站建设方案优化
  • 网站域名费会计分录怎么做湖南关键词优化推荐
  • 一级a做爰片免费观看网站谷歌推广代理
  • 四川成都设计公司南京seo优化推广
  • 专注于响应式网站开发培训心得体会800字
  • 武汉建工网站优化软件哪个好
  • 天津网络公司流程厦门seo网站推广优化
  • 如何申请小程序seo排名优化技巧
  • 二手商品网站制作seo管理工具
  • 网站建设网站公司的序网络推广优化方案