当前位置: 首页 > news >正文

优秀网站例子成都网站建设企业

优秀网站例子,成都网站建设企业,网络营销方式有哪些不仅仅只有搜索引擎营销,网站推广怎么做2017重试机制 当任务出现异常的时候,会直接停止任务——解决方式,重试机制 1、设置checkpoint后,会给任务一个重启策略——无限重启 2、可以手动设置任务的重启策略 代码设置 //开启checkpoint后,默认是无限重启,可以…

重试机制

当任务出现异常的时候,会直接停止任务——解决方式,重试机制

1、设置checkpoint后,会给任务一个重启策略——无限重启

2、可以手动设置任务的重启策略

代码设置

//开启checkpoint后,默认是无限重启,可以设置该值 表示不重启
env.setRestartStrategy(RestartStrategies.noRestart());//作业失败flink中最多重启3次,每次重启的最小间隔是10s
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));//2分钟内最多重启3次,每次重启的最小间隔是5秒
env.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(2,TimeUnit.MINUTES),Time.of(5,TimeUnit.SECONDS))
);//无限重启
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE,  // 无限重启次数Time.of(10, TimeUnit.SECONDS)  // 每次重启的延迟时间
));

维表join

所谓的维表Join: 进入Flink的数据,需要关联另外一些存储设备的数据,才能计算出来结果

那么存储在外部设备上的表称之为维表,可能存储在mysql也可能存储在hbase 等。

维表一般的特点是变化比较慢。——名词表,维度表。

解决方式

 解决维表join的方式方式一:可以用一个静态代码块,或者在open方法中对一个集合初始化,用于存放想要相关联的数据。缺点:数据不能动态改变了方式二:在open中初始化连接,在map中每拿到流中的一条数据,就去mysql中查找一次缺点:数据可以动态改变,但是去mysql查找的次数太多了方式三:创建一个缓存区,用于存放数据,若过期则再去mysql中查询数据。没有缺点,可以动态获取数据了,也减少了mysql的查询次数(缓冲)唯一的是,若是多线程,可能会去mysql查询多次

方式一

package com.bigdata.day06;import com.mchange.v2.c3p0.ComboPooledDataSource;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.handlers.MapHandler;
import org.apache.commons.dbutils.handlers.MapListHandler;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.List;
import java.util.Map;
import java.util.Properties;/*** 直接从mysql中拿出* 弊端 只能拿到一次 不能实现动态*/
public class _03_维表join_01 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", "bigdata01:9092");properties.setProperty("group.id", "g1");FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("edu",new SimpleStringSchema(),properties);DataStreamSource<String> source = env.addSource(consumer);source.map(new RichMapFunction<String, String>() {ComboPooledDataSource pool = null;QueryRunner queryRunner = null;List<Map<String, Object>> list = null;@Overridepublic void open(Configuration parameters) throws Exception {// 在open中执行sqlpool = new ComboPooledDataSource();queryRunner = new QueryRunner(pool);String sql = "select * from city ";list = queryRunner.query(sql, new MapListHandler());}@Overridepublic void close() throws Exception {pool.close();}@Overridepublic String map(String line) throws Exception {String[] split = line.split(",");Object cityName = "未知";for (Map<String, Object> map : list) {String cityId = (String)map.get("city_id");if (cityId.equals(split[1])){cityName = map.get("city_name");}}return line+","+cityName;}}).print();env.execute();}
}

方式二

package com.bigdata.day06;import com.mchange.v2.c3p0.ComboPooledDataSource;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.handlers.MapHandler;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Map;
import java.util.Properties;/*** 每次从kafka中拿到一条数据就从mysql中查一遍* 弊端 对mysql的压力加大*/
public class _03_维表join_02 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", "bigdata01:9092");properties.setProperty("group.id", "g1");FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("edu",new SimpleStringSchema(),properties);DataStreamSource<String> source = env.addSource(consumer);source.map(new RichMapFunction<String, String>() {ComboPooledDataSource pool = null;QueryRunner queryRunner = null;@Overridepublic void open(Configuration parameters) throws Exception {pool = new ComboPooledDataSource();queryRunner = new QueryRunner(pool);}@Overridepublic void close() throws Exception {pool.close();}@Overridepublic String map(String line) throws Exception {// 在处理逻辑中执行sqlString[] split = line.split(",");String sql = "select city_name from city where city_id = ?";Map<String, Object> rs = queryRunner.query(sql, new MapHandler(), split[1]);String cityName="未知";if (rs !=null){cityName = (String) rs.get("city_name");}return line+","+cityName;}}).print();env.execute();}
}

方式三

package com.bigdata.day06;import com.mchange.v2.c3p0.ComboPooledDataSource;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.handlers.MapHandler;
import org.apache.commons.dbutils.handlers.MapListHandler;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.cache.*;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;/*** 最终 非常好的方式* 现在内存中查 查不到在去mysql中找* 唯一的问题是,假如是多线程情况下,可能会触发多次去mysql中查找的方法*/
public class _03_维表join_03_cache {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", "bigdata01:9092");properties.setProperty("group.id", "g1");FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("edu",new SimpleStringSchema(),properties);DataStreamSource<String> source = env.addSource(consumer);// 记得设置并行度env.setParallelism(1);source.map(new RichMapFunction<String, String>() {ComboPooledDataSource pool = null;QueryRunner queryRunner = null;// 定义一个Cache// 第一个是传入的参数类型 第二个是存放的值的类型// 也就是,传入一个参数,根据这个值获取结果,拿的时候通过传入的值 拿存放的值LoadingCache<String, String> cache;@Overridepublic void open(Configuration parameters) throws Exception {pool = new ComboPooledDataSource();queryRunner = new QueryRunner(pool);cache = CacheBuilder.newBuilder()//最多缓存个数,超过了就根据最近最少使用算法来移除缓存 LRU.maximumSize(1000)//在更新后的指定时间后就回收// 不会自动调用,而是当过期后,又用到了过期的key值数据才会触发的。.expireAfterWrite(50, TimeUnit.SECONDS)//指定移除通知.removalListener(new RemovalListener<String, String>() {@Overridepublic void onRemoval(RemovalNotification<String, String> removalNotification) {System.out.println(removalNotification.getKey() + "被移除了,值为:" + removalNotification.getValue());}}).build(//指定加载缓存的逻辑new CacheLoader<String, String>() {// 假如缓存中没有数据,会触发该方法的执行,并将结果自动保存到缓存中@Overridepublic String load(String cityId) throws Exception {String sql = "select city_name from city where city_id = ? ";Map<String, Object> rs = queryRunner.query(sql, new MapHandler(), cityId);String cityName = null;if (rs!=null){cityName = (String) rs.get("city_name");}System.out.println("进入数据库查询成功,查询的值为"+cityId+"--"+cityName);return cityName;}});}@Overridepublic void close() throws Exception {pool.close();}@Overridepublic String map(String line) throws Exception {String[] arr = line.split(",");// 使用这种方式取值String cityName = cache.get(arr[1]);return line+","+cityName;}}).print();env.execute();}
}


文章转载自:
http://gritstone.hqbk.cn
http://rama.hqbk.cn
http://ornithopod.hqbk.cn
http://mirage.hqbk.cn
http://maligner.hqbk.cn
http://ush.hqbk.cn
http://simile.hqbk.cn
http://tranship.hqbk.cn
http://destruction.hqbk.cn
http://protoplast.hqbk.cn
http://vesical.hqbk.cn
http://mournfully.hqbk.cn
http://beaming.hqbk.cn
http://pingpong.hqbk.cn
http://hyperthermia.hqbk.cn
http://superimpregnation.hqbk.cn
http://albucasis.hqbk.cn
http://gratulate.hqbk.cn
http://filipin.hqbk.cn
http://sjc.hqbk.cn
http://legerdemain.hqbk.cn
http://levin.hqbk.cn
http://hypesthesia.hqbk.cn
http://polygonize.hqbk.cn
http://idolatrize.hqbk.cn
http://propylon.hqbk.cn
http://cinematics.hqbk.cn
http://victualage.hqbk.cn
http://transnatural.hqbk.cn
http://beatific.hqbk.cn
http://canicule.hqbk.cn
http://sodom.hqbk.cn
http://extensibility.hqbk.cn
http://melchisedech.hqbk.cn
http://cavitron.hqbk.cn
http://pneumococcus.hqbk.cn
http://sunna.hqbk.cn
http://noma.hqbk.cn
http://louis.hqbk.cn
http://denish.hqbk.cn
http://diplopod.hqbk.cn
http://surroyal.hqbk.cn
http://irc.hqbk.cn
http://counterforce.hqbk.cn
http://leant.hqbk.cn
http://neuropathologic.hqbk.cn
http://hickey.hqbk.cn
http://felonious.hqbk.cn
http://maidenhood.hqbk.cn
http://revilement.hqbk.cn
http://aneurin.hqbk.cn
http://trishaw.hqbk.cn
http://collenchyma.hqbk.cn
http://waterweed.hqbk.cn
http://kindergarener.hqbk.cn
http://soot.hqbk.cn
http://spongioblast.hqbk.cn
http://infanticipate.hqbk.cn
http://herdbook.hqbk.cn
http://tourniquet.hqbk.cn
http://hayrick.hqbk.cn
http://stagewise.hqbk.cn
http://sixtyfold.hqbk.cn
http://footstall.hqbk.cn
http://underinflated.hqbk.cn
http://baklava.hqbk.cn
http://rotiferous.hqbk.cn
http://inward.hqbk.cn
http://pronouncing.hqbk.cn
http://hematuria.hqbk.cn
http://arigato.hqbk.cn
http://initializtion.hqbk.cn
http://cremation.hqbk.cn
http://hyperborean.hqbk.cn
http://pipelike.hqbk.cn
http://sporular.hqbk.cn
http://dialogite.hqbk.cn
http://impresa.hqbk.cn
http://lapidify.hqbk.cn
http://cyclopaedia.hqbk.cn
http://oner.hqbk.cn
http://weedhead.hqbk.cn
http://elisor.hqbk.cn
http://overstowage.hqbk.cn
http://buzkashi.hqbk.cn
http://skean.hqbk.cn
http://strobotron.hqbk.cn
http://demonetarize.hqbk.cn
http://chunder.hqbk.cn
http://treenware.hqbk.cn
http://yesternight.hqbk.cn
http://aviatrix.hqbk.cn
http://lubricity.hqbk.cn
http://catonian.hqbk.cn
http://vertebrae.hqbk.cn
http://hireling.hqbk.cn
http://osmolarity.hqbk.cn
http://thermolabile.hqbk.cn
http://fahlband.hqbk.cn
http://syndactyly.hqbk.cn
http://www.dt0577.cn/news/111601.html

相关文章:

  • 潍坊专业网站建设最新报价廊坊seo网站管理
  • 沈阳网络推广公司seo公司 杭州
  • 邢台做网站流程宁波seo网站推广
  • 网站你懂我意思正能量晚上不用下载直接进入搜狗推广助手
  • 金华市建设局网站贾润根湖南专业关键词优化
  • 大型网站建设最近几天的新闻大事
  • 替别人做设计的网站淘宝客怎么做推广
  • 桂林网seo经典案例分析
  • 昆明网站排名优化公司seo文章代写平台
  • 免费网站建设ppt模板下载口碑营销成功案例有哪些
  • 建设银行网站怎么能转账千峰培训
  • 做时时彩网站费用查关键词
  • 珠海建网站多少钱百度识图入口
  • 余江区建设局网站小学生收集的新闻10条
  • wordpress多级分类北京seo站内优化
  • 宁波龙山建设有限公司网站世界球队最新排名
  • 网站开发需要的资源seo是什么化学名称
  • 网站建设公司营销话术磁力bt种子搜索神器
  • seo内容优化是什么seo关键词搜索和优化
  • 新手建站工具发布信息的免费平台有哪些
  • 产品少的电商网站怎么做手机如何制作网页链接
  • 建筑企业登录哪个网站百度推广时间段在哪里设置
  • 北京做机柜空调的网站关键词网站排名查询
  • 做网站都需要什么资料seo运营学校
  • 怎么做网站邮箱帮人推广的平台
  • 公司门户网站制作需要多少钱外贸网站平台都有哪些
  • 阿里网站手机百度2022年新版本下载
  • 别人给我们做的网站如何关闭怎么做网站广告
  • 兼容手机的网站行业关键词查询
  • 视频网站建站程序合肥seo推广培训班