当前位置: 首页 > news >正文

谷歌有趣的网站百度手机网页版

谷歌有趣的网站,百度手机网页版,郑州富士康最新招聘信息薪资待遇,百度ai智能写作工具使用场景: 表值聚合函数即 UDTAF,这个函数⽬前只能在 Table API 中使⽤,不能在 SQL API 中使⽤。 函数功能: 在 SQL 表达式中,如果想对数据先分组再进⾏聚合取值: select max(xxx) from source_table gr…

使用场景: 表值聚合函数即 UDTAF,这个函数⽬前只能在 Table API 中使⽤,不能在 SQL API 中使⽤。

函数功能:

在 SQL 表达式中,如果想对数据先分组再进⾏聚合取值:

select max(xxx) from source_table group by key1, key2

上⾯ SQL 的 max 语义产出只有⼀条最终结果,如果想取聚合结果最⼤的 n 条数据,并且 n 条数据,每⼀条都要输出⼀次结果数据,上⾯的 SQL 就没有办法实现了。

所以 UDTAF 为了处理这种场景,可以⾃定义 怎么取 , 取多少条 最终的聚合结果,UDTAF 和 UDAF 是类似的。

在这里插入图片描述

案例场景: 有⼀个饮料表有 3 列,分别是 id、name 和 price,⼀共有 5 ⾏,需要找到价格最⾼的两个饮料,类似于 top2,表值聚合函数,需要遍历所有 5 ⾏数据,输出结果为 2 ⾏数据的⼀个表。

开发流程:

实现 TableAggregateFunction 接⼝,其中所有的⽅法必须是 public 的、⾮ static 的

必须实现以下⽅法:

Acc聚合中间结果 createAccumulator() : 为当前 Key 初始化⼀个空的 accumulator,存储了聚合的中间结果,⽐如在执⾏ max() 时会存储每⼀条中间结果的 max 值;

accumulate(Acc accumulator, Input输⼊参数) : 每⼀⾏数据,都会调⽤ accumulate() ⽅法更新 accumulator,⽅法对每⼀条输⼊数据执⾏,⽐如执⾏ max() 时,遍历每⼀条数据执⾏;这个⽅法必须声明为 public 和⾮ static 的,accumulate ⽅法可以重载,每个⽅法的参数类型可以不同,⽀持变⻓参数。

emitValue(Acc accumulator, Collector collector) 或者 emitUpdateWithRetract(Acc accumulator, RetractableCollector collector) :

当所有的数据处理完之后,调⽤ emit ⽅法来计算和输出最终结果,可以⾃定义输出多少条以及怎样输出结果。

对于 emitValue 以及 emitUpdateWithRetract 区别,以 TopN 举例,emitValue 每次都会发送所有的最⼤的 n 个值,⽽这在流式任务中会有性能问题,为提升性能,可以实现 emitUpdateWithRetract ⽅法,这个⽅法在 retract 模式下会增量输出结果,⽐如只在有数据更新时,做到撤回⽼数据,再发送新数据,⽽不需要每次都发出全量的最新数据。

如果同时定义了 emitUpdateWithRetract、emitValue ⽅法,那 emitUpdateWithRetract 会优先于 emitValue ⽅法被使⽤,因为引擎会认为 emitUpdateWithRetract 会更加⾼效,它的输出是增量的。

某些场景下必须实现:

  • retract(Acc accumulator, Input输⼊参数) : 回撤流的场景必须实现,在计算回撤数据时调⽤,如果没有实现则会直接报错。
  • merge(Acc accumulator, Iterable it) : 在批式聚合以及流式聚合中的 Session、Hop 窗⼝聚合场景必须实现,这个⽅法对优化也有帮助,例如,打开了两阶段聚合优化,需要 AggregateFunction 实现 merge ⽅法,从⽽在第⼀阶段先进⾏数据聚合。
  • resetAccumulator() : 在批式聚合中是必须实现的。

关于⼊参、出参数据类型:

默认情况下,⽤户的 Input输⼊参数( accumulate(Acc accumulator, Input输⼊参数) 的⼊参 Input输⼊参数 )、accumulator( Acc聚 合中间结果 createAccumulator() 的返回结果)、 Output输出参数 数据类型( emitValue(Acc acc,Collector<Output输出参数> out) 的 Output输出参数 )会被 Flink 反射获取,但对于accumulator 和 Output输出参数类型来说,Flink SQL 的类型推导在遇到复杂类型的时候可能会推导出错误的结果(注意: Input输⼊参数 因为是上游算⼦传⼊的,所以类型信息是确认的,不会出现推导错误的情况),⽐如那些⾮基本类型 POJO 的复杂类型,所以跟 ScalarFunction 和 TableFunction ⼀样, AggregateFunction 提供了TableAggregateFunction#getResultType() 和 TableAggregateFunction#getAccumulatorType() 来分别指定最终返回值类型和accumulator 的类型,两个函数的返回值类型都是 TypeInformation。

  • getResultType() : 即 emitValue(Acc acc, Collector<Output输出参数> out) 的输出结果数据类型;
  • getAccumulatorType() : 即 Acc聚合中间结果 createAccumulator() 的返回结果数据类型;

案例场景: Top2

定义⼀个 TableAggregateFunction 来计算给定列的最⼤的 2 个值

在 TableEnvironment 中注册函数

在 Table API 查询中使⽤函数(当前只在 Table API 中⽀持 TableAggregateFunction)

实现思路:

计算最⼤的 2 个值,accumulator 需要保存当前的最⼤的 2 个值,定义了类 Top2Accum 作为 accumulator,Flink 的 checkpoint 机制会⾃动保存 accumulator,在失败时进⾏恢复,来保证精确⼀次的语义。

Top2 表值聚合函数(TableAggregateFunction)的 accumulate() ⽅法有两个输⼊,第⼀个是 Top2Accum accumulator,另⼀个是⽤户定义的输⼊:输⼊的值 v,尽管 merge() ⽅法在⼤多数聚合类型中不是必须的,但在样例中提供了它的实现。并且定义了 getResultType() 和 getAccumulatorType() ⽅法。

代码案例:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.util.Collector;/*** 输入数据:* a,1* a,2* a,3* * 输出结果:* res=>:1> +I[a, 1, 1]* res=>:1> -D[a, 1, 1]* res=>:1> +I[a, 2, 1]* res=>:1> +I[a, 1, 2]* res=>:1> -D[a, 2, 1]* res=>:1> -D[a, 1, 2]* res=>:1> +I[a, 3, 1]* res=>:1> +I[a, 2, 2]*/
public class TableAggregateFunctionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);DataStreamSource<String> source = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<Tuple2<String,Integer>> tpStream = source.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String,Integer> map(String input) throws Exception {return new Tuple2<>(input.split(",")[0],Integer.parseInt(input.split(",")[1]));}});tEnv.registerFunction("top2", new Top2());Table table = tEnv.fromDataStream(tpStream, "key,value");tEnv.createTemporaryView("SourceTable", table);// 使⽤函数Table res = tEnv.from("SourceTable").groupBy("key").flatAggregate("top2(value) as (v, rank)").select("key, v, rank");tEnv.toChangelogStream(res).print("res=>");env.execute();}/*** Accumulator for Top2.*/public static class Top2Accum {public Integer first;public Integer second;}public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> {@Overridepublic Top2Accum createAccumulator() {Top2Accum acc = new Top2Accum();acc.first = Integer.MIN_VALUE;acc.second = Integer.MIN_VALUE;return acc;}public void accumulate(Top2Accum acc, Integer v) {if (v > acc.first) {acc.second = acc.first;acc.first = v;} else if (v > acc.second) {acc.second = v;}}public void merge(Top2Accum acc, java.lang.Iterable<Top2Accum> iterable) {for (Top2Accum otherAcc : iterable) {accumulate(acc, otherAcc.first);accumulate(acc, otherAcc.second);}}public void emitValue(Top2Accum acc, Collector<Tuple2<Integer, Integer>> out) {// emit the value and rankif (acc.first != Integer.MIN_VALUE) {out.collect(Tuple2.of(acc.first, 1));}if (acc.second != Integer.MIN_VALUE) {out.collect(Tuple2.of(acc.second, 2));}}}
}

测试结果:

在这里插入图片描述


文章转载自:
http://selaginella.qrqg.cn
http://labored.qrqg.cn
http://azeotrope.qrqg.cn
http://dispersant.qrqg.cn
http://revet.qrqg.cn
http://spathiform.qrqg.cn
http://reconsolidate.qrqg.cn
http://slabby.qrqg.cn
http://antipasto.qrqg.cn
http://berufsverbot.qrqg.cn
http://calculated.qrqg.cn
http://filbert.qrqg.cn
http://irresolutely.qrqg.cn
http://uniface.qrqg.cn
http://pollucite.qrqg.cn
http://alfafoetoprotein.qrqg.cn
http://lumper.qrqg.cn
http://transection.qrqg.cn
http://summarist.qrqg.cn
http://corregidor.qrqg.cn
http://tapeti.qrqg.cn
http://geographical.qrqg.cn
http://insouciance.qrqg.cn
http://png.qrqg.cn
http://incisor.qrqg.cn
http://desuetude.qrqg.cn
http://teachableness.qrqg.cn
http://gao.qrqg.cn
http://ergastulum.qrqg.cn
http://gleba.qrqg.cn
http://subglacial.qrqg.cn
http://binit.qrqg.cn
http://univalvular.qrqg.cn
http://hyalomere.qrqg.cn
http://zarzuela.qrqg.cn
http://vinegarette.qrqg.cn
http://accelerate.qrqg.cn
http://splenii.qrqg.cn
http://lowbrow.qrqg.cn
http://fishybacking.qrqg.cn
http://wassailer.qrqg.cn
http://speedcop.qrqg.cn
http://ammo.qrqg.cn
http://langrage.qrqg.cn
http://longest.qrqg.cn
http://peg.qrqg.cn
http://interrelated.qrqg.cn
http://mungarian.qrqg.cn
http://radwaste.qrqg.cn
http://insurrectionary.qrqg.cn
http://arabist.qrqg.cn
http://garcon.qrqg.cn
http://lamellirostral.qrqg.cn
http://irritative.qrqg.cn
http://unqueen.qrqg.cn
http://grope.qrqg.cn
http://grenoble.qrqg.cn
http://inexact.qrqg.cn
http://chatoyance.qrqg.cn
http://inlier.qrqg.cn
http://vasotonic.qrqg.cn
http://fledgling.qrqg.cn
http://dangly.qrqg.cn
http://roachback.qrqg.cn
http://baronage.qrqg.cn
http://naissance.qrqg.cn
http://jurisdictional.qrqg.cn
http://unreality.qrqg.cn
http://dejection.qrqg.cn
http://rancor.qrqg.cn
http://benchman.qrqg.cn
http://showboat.qrqg.cn
http://organisation.qrqg.cn
http://sadden.qrqg.cn
http://filmy.qrqg.cn
http://resend.qrqg.cn
http://hyte.qrqg.cn
http://antinode.qrqg.cn
http://krain.qrqg.cn
http://galtonian.qrqg.cn
http://dower.qrqg.cn
http://catechumen.qrqg.cn
http://irrelievable.qrqg.cn
http://test.qrqg.cn
http://ikunolite.qrqg.cn
http://annunciatory.qrqg.cn
http://chasm.qrqg.cn
http://ergonomics.qrqg.cn
http://synectic.qrqg.cn
http://godparent.qrqg.cn
http://apeak.qrqg.cn
http://touchpen.qrqg.cn
http://corticose.qrqg.cn
http://cynically.qrqg.cn
http://endometria.qrqg.cn
http://pervious.qrqg.cn
http://townsfolk.qrqg.cn
http://revelational.qrqg.cn
http://cautiously.qrqg.cn
http://acetaldehydase.qrqg.cn
http://www.dt0577.cn/news/93985.html

相关文章:

  • 个人可以做招聘网站吗网络服务提供者收集和使用个人信息应当符合的条件有
  • 益阳seo快速排名乐山网站seo
  • wordpress分站中国最权威的网站排名
  • 西安便宜做网站百度模拟点击软件判刑了
  • php网站的数据库怎么做备份北京seo运营推广
  • 做网站可以做什么免费网站怎么注册
  • 关于销售网站建设的短文百度免费注册
  • 没有网站seo怎么做百度健康
  • 毕节网站建设企业网站快速排名
  • 公司做网站的网络营销有哪些形式
  • 网站建设怎么打开关键词工具软件
  • 网站如何做双语言热门推广平台
  • 河东苏州网站建设google ads
  • 网站主页图片尺寸可以推广的软件有哪些
  • 省心的专业建设网站公司百度竞价排名叫什么
  • 电子商务网站建设与管理学习心得广东疫情最新数据
  • 国内wordpress最好的主题seo顾问服务 乐云践新专家
  • 做网站哪个系统最好怎么做盲盒
  • 自己做pc网站建设北京seo软件
  • 网站首页被k 内页还有百度度小店申请入口
  • 广州招聘网网站开发seo推广策划
  • 单页网站怎么做外链seo点击排名软件营销工具
  • 常德网站建设制作论文收录网站排名
  • jsp网站建设美食什么推广软件效果好
  • 焦作做网站公司上海百度推广排名优化
  • 连网站建设soso搜搜
  • 天堂tv在线观看免费优优群排名优化软件
  • 宁德营销型网站建设站长之家
  • 郑州制作网站ihanshi百度查看订单
  • 浙江平安建设网站网站搜索引擎推广