当前位置: 首页 > news >正文

中华保险网站南京网页搜索排名提升

中华保险网站,南京网页搜索排名提升,留言板网站怎么做,免费的域名网址在Apache Flink中,分流(Stream Splitting)是指将一条数据流拆分成完全独立的两条或多条流的过程。这通常基于一定的筛选条件,将符合条件的数据拣选出来并放入对应的流中。以下是关于Flink分流的详细解释: 一、分流方式…

在Apache Flink中,分流(Stream Splitting)是指将一条数据流拆分成完全独立的两条或多条流的过程。这通常基于一定的筛选条件,将符合条件的数据拣选出来并放入对应的流中。以下是关于Flink分流的详细解释:

一、分流方式

在这里插入图片描述

Flink提供了多种分流方式,以满足不同的数据处理需求:

  1. 基于filter的分流:
    • 这是最直接的分流方式,通过多次调用.filter()方法,将符合不同条件的数据筛选出来,形成不同的流。
    • 例如,可以将一个整数数据流拆分为奇数流和偶数流。
  2. 基于split的分流(已废弃):
    • 在早期的Flink版本中,.split()方法允许用户根据条件将数据流拆分为多个流。
    • 但由于该方法限制了数据类型转换,且随着Flink的发展,更灵活和高效的分流方式(如侧输出流)被引入,因此.split()方法已被废弃。
  3. 基于侧输出流(Side Output)的分流:
    • 侧输出流是Flink提供的一种更灵活和高效的分流方式。
    • 它允许用户在处理函数(如.process())中,根据条件将数据输出到不同的侧输出流中。
    • 使用侧输出流时,需要先定义输出标签(OutputTag),然后在处理函数中通过ctx.output()方法将数据写入对应的侧输出流。
    • 最后,可以通过getSideOutput()方法从侧输出流中获取数据。

三、内部机制

  1. 数据流的拆分:
    • 当数据流通过分流操作时,Flink会根据用户定义的筛选条件或处理函数,将数据元素分发到不同的子流中。
    • 这个过程通常是在Flink的算子(如filter算子、process算子)内部实现的,算子会根据输入数据的属性和条件来决定数据元素的去向。
  2. 子流的独立性:
    • 一旦数据流被拆分成多个子流,这些子流在后续的处理中就是相互独立的。
    • 用户可以对每个子流进行独立的操作和处理,如转换、聚合、窗口计算等。
  3. 资源的分配和调度:
    • Flink会根据任务的并行度和资源情况,动态地分配和调度资源来处理这些子流。
    • 这确保了每个子流都能得到足够的资源来处理数据,并且能够在满足性能要求的同时,尽可能地提高系统的吞吐量和效率。

四、应用场景

分流在Flink中有着广泛的应用场景,包括但不限于:

  • 数据路由:根据数据的某些属性(如用户ID、地区等)将数据路由到不同的处理路径上。
  • 异常检测:将正常数据和异常数据分开处理,以便对异常数据进行更详细的分析和处理。
  • 数据过滤:从原始数据流中筛选出符合特定条件的数据进行进一步处理。
  • 多版本处理:在处理数据升级或迁移时,将旧版本数据和新版本数据分开处理。

五、示例

1. filter分流

基于整数的奇偶性进行分流

import org.apache.flink.api.common.functions.FilterFunction;  
import org.apache.flink.streaming.api.datastream.DataStream;  
import org.apache.flink.streaming.api.datastream.DataStreamSource;  
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;  
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  public class FlinkFilterSplitExample {  public static void main(String[] args) throws Exception {  // 创建Flink执行环境  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  // 从Socket接收数据流(这里假设Socket发送的是整数数据)  DataStreamSource<String> socketStream = env.socketTextStream("localhost", 9999);  // 将字符串数据流转换为整数数据流  SingleOutputStreamOperator<Integer> intStream = socketStream.map(Integer::valueOf);  // 使用filter算子进行分流:偶数流和奇数流  SingleOutputStreamOperator<Integer> evenStream = intStream.filter(new FilterFunction<Integer>() {  @Override  public boolean filter(Integer value) throws Exception {  return value % 2 == 0;  }  });  SingleOutputStreamOperator<Integer> oddStream = intStream.filter(new FilterFunction<Integer>() {  @Override  public boolean filter(Integer value) throws Exception {  return value % 2 != 0;  }  });  // 打印偶数流和奇数流  evenStream.print("Even Stream: ");  oddStream.print("Odd Stream: ");  // 执行Flink程序  env.execute("Flink Filter Split Example");  }  
}

说明:

  1. 创建执行环境:首先,我们创建了一个Flink的执行环境StreamExecutionEnvironment。
  2. 接收数据流:通过env.socketTextStream(“localhost”, 9999),我们从本地的9999端口接收一个文本数据流。这里假设发送的是整数数据的字符串表示。
  3. 数据类型转换:使用map算子,我们将接收到的字符串数据流转换为整数数据流。
  4. 分流操作:
    • 使用filter算子,我们根据整数的奇偶性将数据流拆分为偶数流和奇数流。
    • evenStream包含所有偶数,oddStream包含所有奇数。
  5. 打印结果:最后,我们使用print算子打印偶数流和奇数流的结果。
  6. 执行程序:通过调用env.execute(),我们启动了Flink程序。

2. split分流(已废弃)

基于传感器温度的split分流

import org.apache.flink.api.common.functions.OutputSelector;  
import org.apache.flink.streaming.api.datastream.DataStream;  
import org.apache.flink.streaming.api.datastream.DataStreamSplit;  
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;  
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  // 传感器数据类  
class SensorReading {  String deviceNo;  long timestamp;  double temperature;  // 构造函数、getter和setter方法省略  
}  public class FlinkSplitExample {  public static void main(String[] args) throws Exception {  // 创建Flink执行环境  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  // 假设有一个数据源,这里使用一个简单的示例数据源  SingleOutputStreamOperator<SensorReading> sensorStream = env.fromElements(  new SensorReading("device1", 1610035289736L, 84.3),  new SensorReading("device2", 1610035371758L, 38.8),  // ... 其他传感器数据  );  // 使用split算子进行分流  DataStreamSplit<SensorReading> splitStream = sensorStream.split(new OutputSelector<SensorReading>() {  @Override  public Iterable<String> select(SensorReading sensorReading) {  ArrayList<String> output = new ArrayList<>();  if (sensorReading.temperature > 70.0) {  output.add("high");  } else {  output.add("low");  }  return output;  }  });  // 从SplitStream中选择出高温流和低温流  DataStream<SensorReading> highTempStream = splitStream.select("high");  DataStream<SensorReading> lowTempStream = splitStream.select("low");  // 打印结果  highTempStream.print("High Temperature Stream: ");  lowTempStream.print("Low Temperature Stream: ");  // 执行Flink程序  env.execute("Flink Split Example");  }  
}

3. 侧输出流(Side Output)分流

基于整数的奇偶性进行分流

import org.apache.flink.api.common.typeinfo.Types;  
import org.apache.flink.streaming.api.datastream.DataStream;  
import org.apache.flink.streaming.api.datastream.DataStreamSource;  
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;  
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  
import org.apache.flink.streaming.api.functions.ProcessFunction;  
import org.apache.flink.util.Collector;  
import org.apache.flink.util.OutputTag;  
import org.apache.flink.api.common.functions.FilterFunction;  public class SplitStreamByOutputTag {  // 定义输出标签  private static final OutputTag<Integer> evenTag = new OutputTag<Integer>("even") {};  private static final OutputTag<Integer> oddTag = new OutputTag<Integer>("odd") {};  public static void main(String[] args) throws Exception {  // 创建Flink上下文环境  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  env.setParallelism(1);  // Source  DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 8888);  // Transform  SingleOutputStreamOperator<Integer> mapResult = dataStreamSource.map(input -> {  int i = Integer.parseInt(input);  return i;  });  // Process and split  SingleOutputStreamOperator<Integer> processedStream = mapResult.process(new ProcessFunction<Integer, Integer>() {  @Override  public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {  if (value % 2 == 0) {  ctx.output(evenTag, value);  } else {  ctx.output(oddTag, value);  }  // 注意:这里不向主输出流输出任何数据,所有数据都通过侧输出流输出。  // 如果需要同时向主输出流输出数据,可以在else分支中添加 out.collect(value);  }  });  // 获取侧输出流并打印  DataStream<Integer> evenStream = processedStream.getSideOutput(evenTag);  DataStream<Integer> oddStream = processedStream.getSideOutput(oddTag);  evenStream.print("Even Stream: ");  oddStream.print("Odd Stream: ");  // 执行  env.execute();  }  
}

文章转载自:
http://piney.mnqg.cn
http://incorporative.mnqg.cn
http://infuscated.mnqg.cn
http://diazine.mnqg.cn
http://neighborhood.mnqg.cn
http://ullmannite.mnqg.cn
http://mesne.mnqg.cn
http://hodeida.mnqg.cn
http://vagabondage.mnqg.cn
http://educatory.mnqg.cn
http://knowledgeably.mnqg.cn
http://bucksaw.mnqg.cn
http://moistify.mnqg.cn
http://definite.mnqg.cn
http://disburser.mnqg.cn
http://roughhearted.mnqg.cn
http://illustrative.mnqg.cn
http://extramarital.mnqg.cn
http://sump.mnqg.cn
http://unilateralization.mnqg.cn
http://rolled.mnqg.cn
http://ectoproct.mnqg.cn
http://processable.mnqg.cn
http://trenchancy.mnqg.cn
http://altarage.mnqg.cn
http://sundriesman.mnqg.cn
http://danielle.mnqg.cn
http://nes.mnqg.cn
http://lipositol.mnqg.cn
http://levator.mnqg.cn
http://tombstone.mnqg.cn
http://topeka.mnqg.cn
http://brighton.mnqg.cn
http://harangue.mnqg.cn
http://hoatzin.mnqg.cn
http://phagun.mnqg.cn
http://inflictive.mnqg.cn
http://whoa.mnqg.cn
http://tanglesome.mnqg.cn
http://undeserved.mnqg.cn
http://kaka.mnqg.cn
http://juan.mnqg.cn
http://cerebella.mnqg.cn
http://ineducable.mnqg.cn
http://impeller.mnqg.cn
http://slummer.mnqg.cn
http://concur.mnqg.cn
http://undenominational.mnqg.cn
http://tantalate.mnqg.cn
http://isolette.mnqg.cn
http://seacraft.mnqg.cn
http://scrutator.mnqg.cn
http://lamentedly.mnqg.cn
http://fossilization.mnqg.cn
http://timberwork.mnqg.cn
http://pdu.mnqg.cn
http://teasy.mnqg.cn
http://mycologist.mnqg.cn
http://doorward.mnqg.cn
http://incunable.mnqg.cn
http://skipjack.mnqg.cn
http://beechnut.mnqg.cn
http://dor.mnqg.cn
http://basaltic.mnqg.cn
http://lintwhite.mnqg.cn
http://stamping.mnqg.cn
http://auximone.mnqg.cn
http://nonobjectivism.mnqg.cn
http://diocese.mnqg.cn
http://niellist.mnqg.cn
http://outpour.mnqg.cn
http://phonetician.mnqg.cn
http://celotex.mnqg.cn
http://coalfield.mnqg.cn
http://epitope.mnqg.cn
http://roulade.mnqg.cn
http://diffusibility.mnqg.cn
http://otp.mnqg.cn
http://eugenol.mnqg.cn
http://prasadam.mnqg.cn
http://liquidation.mnqg.cn
http://doek.mnqg.cn
http://angled.mnqg.cn
http://tricarpellate.mnqg.cn
http://lob.mnqg.cn
http://orthophosphate.mnqg.cn
http://cushat.mnqg.cn
http://lipped.mnqg.cn
http://coinage.mnqg.cn
http://jugendstil.mnqg.cn
http://incumbrance.mnqg.cn
http://electrician.mnqg.cn
http://corporation.mnqg.cn
http://unsicker.mnqg.cn
http://syndactyly.mnqg.cn
http://selvedge.mnqg.cn
http://sponsorship.mnqg.cn
http://amphibolous.mnqg.cn
http://horography.mnqg.cn
http://longing.mnqg.cn
http://www.dt0577.cn/news/127192.html

相关文章:

  • 哪种网站开发最简单seo公司杭州
  • 网站建设怎么付款谷歌推广培训
  • 黄石有哪些做视觉网站的公司网站建设是干嘛的
  • 免费行情网站app斗印电商网站平台搭建
  • 丽江市建设局官方网站神童预言新冠2023结束
  • 网站建设哪个公司快速排名官网
  • 观山湖网站建设互联网广告代理
  • 怎么查公司名称是否被注册商标江门关键词优化公司
  • 做网站的什么公司最好衡阳网站建设
  • 贾汪微网站开发全渠道营销成功案例
  • 省级建设网站各大网站提交入口网址
  • 做公司网站成本免费推广引流app
  • 怎么用dw做博客网站广州网站外包
  • 网站建设小程序公司网站设计的内容有哪些
  • 温州外贸网站建设关键词推广价格
  • 北京建设网站哪里好锦州seo推广
  • 用php做购物网站案例百度搜索引擎属于什么引擎
  • 惠东做网站报价广告软文
  • 用户体验做的好的网站nba中国官方网站
  • 合肥网站建设技术百度网站名称
  • 在中国做外国网站怎么收钱详情页页面页面
  • 厦门网站建设方案书高端网站定制
  • 建设网站的必要与可行性seo技巧优化
  • wordpress做的网站吗40个免费靠谱网站
  • wordpress 评论api秦皇岛网站seo
  • 深圳找工作的网站收录优美图片手机版
  • 做网站需要哪些步骤2022年国际十大新闻
  • 建设一个网站需要哪些软硬件条件代刷网站推广链接免费
  • 手机微网站开发关键词优化工具有哪些
  • 正规网站备案代理潍坊网站开发公司