当前位置: 首页 > news >正文

扬州网站建设兼职网上营销网站

扬州网站建设兼职,网上营销网站,东阳网站建设dyfwzx,东台网站制作Joins Inner Join 官网说明:和 SQL 的 JOIN 子句类似。关联两张表。两张表必须有不同的字段名,并且必须通过 join 算子或者使用 where 或 filter 算子定义至少一个 join 等式连接谓词。先创建2个表,两个表的字段是相同的,我想验证…

Joins

Inner Join

官网说明:和 SQL 的 JOIN 子句类似。关联两张表。两张表必须有不同的字段名,并且必须通过 join 算子或者使用 where 或 filter 算子定义至少一个 join 等式连接谓词。

先创建2个表,两个表的字段是相同的,我想验证下,是不是必须两个表列名得不同

orders1 = table_env.from_elements([(1,'Jack', 'FRANCE', 10, datetime.now()+timedelta(hours=1)),(2,'Bob', 'USA', 20, datetime.now()+timedelta(hours=2))],DataTypes.ROW([DataTypes.FIELD("id", DataTypes.INT()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("country", DataTypes.STRING()),DataTypes.FIELD("revenue", DataTypes.INT()),DataTypes.FIELD("r_time", DataTypes.TIMESTAMP(3))]))orders2 = table_env.from_elements([(1,'Jack12', 'FRANCE12', 30, datetime.now()+timedelta(hours=1)),(2,'Bob12', 'USA12', 30, datetime.now()+timedelta(hours=2))],DataTypes.ROW([DataTypes.FIELD("id", DataTypes.INT()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("country", DataTypes.STRING()),DataTypes.FIELD("revenue", DataTypes.INT()),DataTypes.FIELD("r_time", DataTypes.TIMESTAMP(3))]))

试着运行

left = orders1.select(col('id'), col('name'), col('country'))
right = orders2.select(col('id'), col('name'), col('country'))
result = left.join(right).where(col('id') == col('id')).select(col('id'), col('name'), col('country'))
result .execute().print()

报错:说是无法区分开country, name, id 这三个字段,所以看样子真不行

 org.apache.flink.table.api.ValidationException: join relations with ambiguous names: [country, name, id]

所以在生成left/right集合的时候alias下字段名:

left = orders1.select(col('id'), col('name'), col('country'))
right = orders2.select(col('id').alias('id1'), col('name').alias('name1'), col('country').alias('country1'))
result = left.join(right).where(col('id') == col('id1')).select(col('id'), col('name1'), col('country'))
result .execute().print()

这样就能将两个列相同的表进行Inner Join 但是这种方式不太靠谱,等以后有别的方式在补充。

+----+-------------+--------------------------------+--------------------------------+
| op |          id |                          name1 |                        country |
+----+-------------+--------------------------------+--------------------------------+
| +I |           1 |                         Jack12 |                         FRANCE |
| +I |           2 |                          Bob12 |                            USA |
+----+-------------+--------------------------------+--------------------------------+

Outer Join

和 SQL LEFT/RIGHT/FULL OUTER JOIN 子句类似。 关联两张表。 两张表必须有不同的字段名,并且必须定义至少一个等式连接谓词。与innter join 差不多

left = orders1.select(col('id'), col('name'), col('country'))
right = orders2.select(col('id').alias('id1'), col('name').alias('name1'), col('country').alias('country1'))
#左
left_outer_result = left.left_outer_join(right, col('id') == col('id1')).select(col('id'), col('name1'), col('country'))
#右
right_outer_result = left.right_outer_join(right, col('id') == col('id1')).select(col('id'), col('name1'), col('country'))
#全
full_outer_result = left.full_outer_join(right, col('id') == col('id1')).select(col('id'), col('name1'), col('country'))
result.execute().print()

Interval Join

Interval join 是可以通过流模式处理的常规 join 的子集。

Interval join 至少需要一个 equi-join 谓词和一个限制双方时间界限的 join 条件。这种条件可以由两个合适的范围谓词(<、<=、>=、>)或一个比较两个输入表相同时间属性(即处理时间或事件时间)的等值谓词来定义。

from pyflink.table.expressions import colleft = orders1.select(col('id'), col('name'), col('country'),col('r_time'))
right = orders2.select(col('id').alias('id1'), col('name').alias('name1'), col('country').alias('country1'),col('r_time').alias('r_time1'))joined_table = left.join(right).where((col('id') == col('id1')) & (col('r_time') >= col('r_time1') - lit(1).hours) & (col('r_time') <= col('r_time1') + lit(2).seconds))
result = joined_table.select(col('id'), col('name1'), col('country'), col('r_time1'))
result.execute().print()

+----+-------------+--------------------------------+--------------------------------+-------------------------+
| op |          id |                          name1 |                        country |                 r_time1 |
+----+-------------+--------------------------------+--------------------------------+-------------------------+
| +I |           1 |                         Jack12 |                         FRANCE | 2023-02-23 15:51:17.793 |
| +I |           2 |                          Bob12 |                            USA | 2023-02-23 16:51:17.793 |
+----+-------------+--------------------------------+--------------------------------+-------------------------+

Inner Join with Table Function (UDTF)

join 表和表函数的结果。左(外部)表的每一行都会 join 表函数相应调用产生的所有行。 如果表函数调用返回空结果,则删除左侧(外部)表的一行。

通过调用UDTF函数来实现一些数据处理。

from pyflink.table.udf import udtf
from pyflink.common import Row
split_res = table_env.from_elements([("1,2",),("3,4",) ],["a"])
# 注册 User-Defined Table Function
# result_type 参数,声明 split function 的结果类型;
@udtf(result_types=[DataTypes.STRING(), DataTypes.STRING()])
def split(s):splits = s.split(",")yield splits[0], splits[1]# join
joined_table = split_res.join_lateral(split(col('a')).alias("s", "t"))
result = joined_table.select(col('a'), col('s'), col('t'))
result.execute().print()
+----+--------------------------------+--------------------------------+--------------------------------+
| op |                              a |                              s |                              t |
+----+--------------------------------+--------------------------------+--------------------------------+
| +I |                            1,2 |                              1 |                              2 |
| +I |                            3,4 |                              3 |                              4 |

这样运行结果是出来了,也没问题但是会报错了,暂时没找到解决办法,后期有机会查查,可能大概是有界流数据运行超时的问题

Py4JJavaError: An error occurred while calling o2665.print.
: java.lang.RuntimeException: Failed to fetch next resultat org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)at org.apache.flink.table.utils.print.TableauStyle.print(TableauStyle.java:147)at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:153)at sun.reflect.GeneratedMethodAccessor112.invoke(Unknown Source)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)at java.lang.Thread.run(Thread.java:748)

Union

和 SQL UNION 子句类似。Union 两张表会删除重复记录。两张表必须具有相同的字段类型。 并集操作。

#生成2张表,table_env一定是有界的,无界表不支持Union
table1 = table_env.from_elements([("hello|word", 1), ("abc|def", 2)], ['a', 'b'])
table2 = table_env.from_elements([("hello|word", 3), ("abc|def", 4)], ['a', 'b'])
result= table1.union(table2)
result.execute().print()
+--------------------------------+----------------------+
|                              a |                    b |
+--------------------------------+----------------------+
|                        abc|def |                    4 |
|                        abc|def |                    2 |
|                     hello|word |                    3 |
|                     hello|word |                    1 |
+--------------------------------+----------------------+

UnionAll

和 SQL UNION ALL 子句类似。Union 两张表。 两张表必须具有相同的字段类型。

UNION ALL 包含重复数据

#生成2张表  支持无界
table1 = table_env.from_elements([("hello|word", 1), ("abc|def", 2)], ['a', 'b'])
table2 = table_env.from_elements([("hello|word", 3), ("abc|def", 4)], ['a', 'b'])
result= table1.union_all(table2)
result.execute().print()

Intersect

和 SQL INTERSECT 子句类似。Intersect 返回两个表中都存在的记录。如果一条记录在一张或两张表中存在多次,则只返回一条记录,也就是说,结果表中不存在重复的记录。两张表必须具有相同的字段类型。

交集操作

#生成2张表 只支持batch
table1 = table_env.from_elements([("hello|word", 1), ("abc|def", 2)], ['a', 'b'])
table2 = table_env.from_elements([("hello|word", 3), ("abc|def", 4)], ['a', 'b'])
result= table1.intersect(table2)
result.execute().print()

IntersectAll

和 SQL INTERSECT ALL 子句类似。IntersectAll 返回两个表中都存在的记录。如果一条记录在两张表中出现多次,那么该记录返回的次数同该记录在两个表中都出现的次数一致,也就是说,结果表可能存在重复记录。两张表必须具有相同的字段类型。

#生成2张表 只支持batch
table1 = table_env.from_elements([("hello|word", 1), ("abc|def", 2)], ['a', 'b'])
table2 = table_env.from_elements([("hello|word", 3), ("abc|def", 4)], ['a', 'b'])
result= table1.intersect_all(table2)
result.execute().print()

Minus

和 SQL EXCEPT 子句类似。Minus 返回左表中存在且右表中不存在的记录。左表中的重复记录只返回一次,换句话说,结果表中没有重复记录。两张表必须具有相同的字段类型。

#生成2张表 只支持batch
table1 = table_env.from_elements([("hello|word", 1), ("abc|def", 2)], ['a', 'b'])
table2 = table_env.from_elements([("hello|word", 3), ("abc|def", 4)], ['a', 'b'])
result= table1.minus(table2)
result.execute().print()

MinusAll

和 SQL EXCEPT ALL 子句类似。MinusAll 返回右表中不存在的记录。在左表中出现 n 次且在右表中出现 m 次的记录,在结果表中出现 (n - m) 次,例如,也就是说结果中删掉了在右表中存在重复记录的条数的记录。两张表必须具有相同的字段类型。

#生成2张表 只支持batch
table1 = table_env.from_elements([("hello|word", 1), ("abc|def", 2)], ['a', 'b'])
table2 = table_env.from_elements([("hello|word", 3), ("abc|def", 4)], ['a', 'b'])
result= table1.minus_all(table2)
result.execute().print()

In

和 SQL IN 子句类似。如果表达式的值存在于给定表的子查询中,那么 In 子句返回 true。子查询表必须由一列组成。这个列必须与表达式具有相同的数据类型。

#生成2张表 都支持
left = orders1.select(col('id'), col('name'), col('country'))
right = orders2.select(col('id'))
result = left.select(col('id'), col('name'), col('country')).where(col('id').in_(right))
result.execute().print()
----+-------------+--------------------------------+--------------------------------+
| op |          id |                           name |                        country |
+----+-------------+--------------------------------+--------------------------------+
| +I |           1 |                           Jack |                         FRANCE |
| +I |           2 |                            Bob |                            USA |
+----+-------------+--------------------------------+--------------------------------+

OrderBy

和 SQL ORDER BY 子句类似。返回跨所有并行分区的全局有序记录。对于无界表,该操作需要对时间属性进行排序或进行后续的 fetch 操作。

如果是无界表只能直接对时间属性排序,如果其他属性需要后续的fetch操作

orders = table_env.from_elements([('Jack', 'FRANCE', 10, datetime.now()+timedelta(hours=2)),('Rose', 'ENGLAND', 30, datetime.now()+timedelta(hours=12)),('Jack', 'FRANCE', 20, datetime.now()+timedelta(hours=22)),('Bob', 'CH', 40, datetime.now()+timedelta(hours=32)),('Bob', 'CH', 50, datetime.now()+timedelta(hours=32)),('YU', 'CH', 100, datetime.now()+timedelta(hours=5))],DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("country", DataTypes.STRING()),DataTypes.FIELD("revenue", DataTypes.INT()),DataTypes.FIELD("r_time", DataTypes.TIMESTAMP(3))]))
#时间排序
result=orders.order_by(col('r_time').asc)
result.execute().print()
+--------------------------------+--------------------------------+-------------+-------------------------+
|                           name |                        country |     revenue |                  r_time |
+--------------------------------+--------------------------------+-------------+-------------------------+
|                           Jack |                         FRANCE |          10 | 2023-02-23 19:42:48.538 |
|                             YU |                             CH |         100 | 2023-02-23 22:42:48.538 |
|                           Rose |                        ENGLAND |          30 | 2023-02-24 05:42:48.538 |
|                           Jack |                         FRANCE |          20 | 2023-02-24 15:42:48.538 |
|                            Bob |                             CH |          40 | 2023-02-25 01:42:48.538 |
|                            Bob |                             CH |          50 | 2023-02-25 01:42:48.538 |
+--------------------------------+--------------------------------+-------------+-------------------------+

Offset & Fetch

和 SQL 的 OFFSETFETCH 子句类似。Offset 操作根据偏移位置来限定(可能是已排序的)结果集。Fetch 操作将(可能已排序的)结果集限制为前 n 行。通常,这两个操作前面都有一个排序操作。对于无界表,offset 操作需要 fetch 操作。

# 从已排序的结果集中返回前2条记录
result1 = orders.order_by(col('r_time').asc).fetch(2)# 从已排序的结果集中返回跳过1条记录之后的所有记录
result2 = orders.order_by(col('r_time').asc).offset(1)# 从已排序的结果集中返回跳过2条记录之后的前5条记录
result3 = orders.order_by(col('r_time').asc).offset(2).fetch(5)

Insert

和 SQL 查询中的 INSERT INTO 子句类似,该方法执行对已注册的输出表的插入操作。 insertInto() 方法会将 INSERT INTO 转换为一个 TablePipeline。 该数据流可以用 TablePipeline.explain() 来解释,用 TablePipeline.execute() 来执行。

输出表必须已注册在 TableEnvironment(详见表连接器)中。此外,已注册表的 schema 必须与查询中的 schema 相匹配。

#myskintable 必须是已存在的结果表
#简单的例子,仅供参考,就是orders这个表经过一系列的操作后,将结果写入另外一张已存在并且scheam对应的skin_table表中
revenue = orders \.select(col("name"), col("country"), col("revenue")) \.where(col("country") == 'FRANCE') \.group_by(col("name")) \.select(col("name"), orders.revenue.sum.alias('rev_sum')).execute_insert("myskintable")

文章转载自:
http://nymphenburg.mrfr.cn
http://gemologist.mrfr.cn
http://tidytips.mrfr.cn
http://vasovagal.mrfr.cn
http://notoriety.mrfr.cn
http://tug.mrfr.cn
http://riempie.mrfr.cn
http://pinnacle.mrfr.cn
http://graduand.mrfr.cn
http://corundum.mrfr.cn
http://mujik.mrfr.cn
http://unprecise.mrfr.cn
http://stenographic.mrfr.cn
http://turbulency.mrfr.cn
http://norton.mrfr.cn
http://paderborn.mrfr.cn
http://meistersinger.mrfr.cn
http://cut.mrfr.cn
http://lunt.mrfr.cn
http://delusion.mrfr.cn
http://adieux.mrfr.cn
http://candelabra.mrfr.cn
http://superheavy.mrfr.cn
http://festivous.mrfr.cn
http://pinole.mrfr.cn
http://francophile.mrfr.cn
http://brainpan.mrfr.cn
http://external.mrfr.cn
http://elastic.mrfr.cn
http://vitaglass.mrfr.cn
http://leglet.mrfr.cn
http://kandinski.mrfr.cn
http://cyanogenesis.mrfr.cn
http://handmaiden.mrfr.cn
http://pregenital.mrfr.cn
http://acquisitively.mrfr.cn
http://gurgoyle.mrfr.cn
http://sackless.mrfr.cn
http://killtime.mrfr.cn
http://syllabically.mrfr.cn
http://discommodity.mrfr.cn
http://acrocyanosis.mrfr.cn
http://punjabi.mrfr.cn
http://malajustment.mrfr.cn
http://ernestine.mrfr.cn
http://tutorage.mrfr.cn
http://floricultural.mrfr.cn
http://benighted.mrfr.cn
http://prosthesis.mrfr.cn
http://tiercet.mrfr.cn
http://curler.mrfr.cn
http://interpreter.mrfr.cn
http://corybantism.mrfr.cn
http://superdense.mrfr.cn
http://puncture.mrfr.cn
http://levis.mrfr.cn
http://rustle.mrfr.cn
http://backplane.mrfr.cn
http://duomo.mrfr.cn
http://stalklet.mrfr.cn
http://replete.mrfr.cn
http://style.mrfr.cn
http://metagalactic.mrfr.cn
http://religiously.mrfr.cn
http://lightheartedly.mrfr.cn
http://precautious.mrfr.cn
http://unknot.mrfr.cn
http://ambition.mrfr.cn
http://expectation.mrfr.cn
http://crone.mrfr.cn
http://maker.mrfr.cn
http://garryowen.mrfr.cn
http://ptochocracy.mrfr.cn
http://calabazilla.mrfr.cn
http://spindleful.mrfr.cn
http://isinglass.mrfr.cn
http://reimprint.mrfr.cn
http://starlight.mrfr.cn
http://filthify.mrfr.cn
http://patternize.mrfr.cn
http://magellanic.mrfr.cn
http://zinjanthropine.mrfr.cn
http://degression.mrfr.cn
http://leasing.mrfr.cn
http://preregistration.mrfr.cn
http://hythergraph.mrfr.cn
http://tamworth.mrfr.cn
http://entamoeba.mrfr.cn
http://viselike.mrfr.cn
http://rutted.mrfr.cn
http://yellows.mrfr.cn
http://participable.mrfr.cn
http://monasticism.mrfr.cn
http://coaster.mrfr.cn
http://incompetence.mrfr.cn
http://repoussage.mrfr.cn
http://claptrap.mrfr.cn
http://trichopathic.mrfr.cn
http://allecret.mrfr.cn
http://chiengmai.mrfr.cn
http://www.dt0577.cn/news/87057.html

相关文章:

  • 西平县住房城乡建设局网站官方百度
  • 武汉软件培训机构百度app优化
  • access数据库做网站顾问式营销
  • 外贸服装网站模板百度推广代理商与总公司的区别
  • 网站素材网超级优化大师
  • 做动漫网站的心得体会seo原创工具
  • 威县网站建设代理价格aso优化吧
  • 怎么建设公司网站知乎营销平台
  • 手机自适应网站建设网络推广平台大全
  • 深圳网站建设服务公seo怎么优化
  • 光泽县规划建设和旅游局网站个人网站怎么建立
  • 那里有做网站的广州百度竞价开户
  • 网站建设与制作报价上海百度推广优化
  • 网站优化 seo建设网页
  • 做推广需要网站吗推广产品的软文怎么写
  • 商业网站开发各大网站排名
  • 母婴 网站 策划合肥网站制作公司
  • 网络加速器免费郭生b如何优化网站
  • 网站空间月流量百度seo搜索引擎优化
  • 下载一个网站的源码下载企业关键词优化公司
  • 如何建设一个自己 的网站首页学it学费大概多少钱
  • 钉钉企业主页关键词优化报价推荐
  • 国家企业信息系统查询系统官方百度网站排名搜行者seo
  • 山西大同专业网站建设制作价格营销软文范例大全300字
  • 偷拍哪个网站做的好买链接官网
  • 找人做网站需要什么条件北京公司排名seo
  • 织梦做信息分类网站企业如何进行网站推广
  • 个性网站建设百度搜索推广怎么做
  • 社区网站的建设百度账号登录
  • 无锡便宜做网站搜索引擎简称seo