当前位置: 首页 > news >正文

外贸网站建设网络公司企业营销型网站

外贸网站建设网络公司,企业营销型网站,营销团队建设与管理,天元建设集团有限公司朱华线上运行的CEP中肯定经常遇到规则变更的情况,如果每次变更时都将任务重启、重新发布是非常不优雅的。尤其在营销或者风控这种对实时性要求比较高的场景,如果规则窗口过长(一两个星期),状态过大,就会导致重启…

        线上运行的CEP中肯定经常遇到规则变更的情况,如果每次变更时都将任务重启、重新发布是非常不优雅的。尤其在营销或者风控这种对实时性要求比较高的场景,如果规则窗口过长(一两个星期),状态过大,就会导致重启时间延长,期间就会造成一些想要处理的异常行为不能及时发现。

1.实现分析

  • 外部加载:通常规则引擎会有专门的规则管理模块,提供用户去创建自己的规则,对于Flink任务来说需要到外部去加载规则
  • 动态更新:需要提供定时去检测规则是否变更
  • 历史状态清理:在模式匹配中是一系列NFAState 的不断变更,如果规则发生变更,需要清理历史状态
  • API:需要对外提供易用的API

2.代码实现

       首先实现一个用户API。

package cep.functions;import java.io.Serializable;import org.apache.flink.api.common.functions.Function;import cep.pattern.Pattern;/*** @author StephenYou* Created on 2023-07-23* Description: 动态Pattern接口(用户调用API)不区分key*/
public interface DynamicPatternFunction<T> extends Function, Serializable {/**** 初始化* @throws Exception*/public void init() throws Exception;/*** 注入新的pattern* @return*/public Pattern<T,T> inject() throws Exception;/*** 一个扫描周期:ms* @return*/public long getPeriod() throws Exception;/*** 规则是否发生变更* @return*/public boolean isChanged() throws Exception;
}

        希望上述API的调用方式如下。

//正常调用CEP.pattern(dataStream,pattern);//动态PatternCEP.injectionPattern(dataStream, new UserDynamicPatternFunction())

        所以需要修改CEP-Lib源码

        b.增加injectionPattern函数。

public class CEP {/**** Dynamic injection pattern function * @param input* @param dynamicPatternFunction* @return* @param <T>*/public static <T> PatternStream<T> injectionPattern throws Exception (DataStream<T> input,DynamicPatternFunction<T> dynamicPatternFunction){return new PatternStream<>(input, dynamicPatternFunction); }
}

        增加PatternStream构造函数,因为需要动态更新,所以有必要传进去整个函数。

public class PatternStream<T> {PatternStream(final DataStream<T> inputStream, DynamicPatternFunction<T> dynamicPatternFunction) throws Exception {this(PatternStreamBuilder.forStreamAndPatternFunction(inputStream, dynamicPatternFunction));}
}

        修改PatternStreamBuilder.build, 增加调用函数的过程。

        final CepOperator<IN, K, OUT> operator = null;if (patternFunction == null ) {operator = new CepOperator<>(inputSerializer,isProcessingTime,nfaFactory,comparator,pattern.getAfterMatchSkipStrategy(),processFunction,lateDataOutputTag);} else {operator = new CepOperator<>(inputSerializer,isProcessingTime,patternFunction,comparator,null,processFunction,lateDataOutputTag);}

        增加对应的CepOperator构造函数。

    public CepOperator(final TypeSerializer<IN> inputSerializer,final boolean isProcessingTime,final DynamicPatternFunction patternFunction,@Nullable final EventComparator<IN> comparator,@Nullable final AfterMatchSkipStrategy afterMatchSkipStrategy,final PatternProcessFunction<IN, OUT> function,@Nullable final OutputTag<IN> lateDataOutputTag) {super(function);this.inputSerializer = Preconditions.checkNotNull(inputSerializer);this.patternFunction = patternFunction;this.isProcessingTime = isProcessingTime;this.comparator = comparator;this.lateDataOutputTag = lateDataOutputTag;if (afterMatchSkipStrategy == null) {this.afterMatchSkipStrategy = AfterMatchSkipStrategy.noSkip();} else {this.afterMatchSkipStrategy = afterMatchSkipStrategy;}this.nfaFactory = null;}

        加载Pattern,构造NFA

    @Overridepublic void open() throws Exception {super.open();timerService =getInternalTimerService("watermark-callbacks", VoidNamespaceSerializer.INSTANCE, this);//初始化if (patternFunction != null) {patternFunction.init();Pattern pattern = patternFunction.inject();afterMatchSkipStrategy = pattern.getAfterMatchSkipStrategy();boolean timeoutHandling = getUserFunction() instanceof TimedOutPartialMatchHandler;nfaFactory = NFACompiler.compileFactory(pattern, timeoutHandling);long period = patternFunction.getPeriod();// 注册定时器检测规则是否变更if (period > 0) {getProcessingTimeService().registerTimer(timerService.currentProcessingTime() + period, this::onProcessingTime);}}nfa = nfaFactory.createNFA();nfa.open(cepRuntimeContext, new Configuration());context = new ContextFunctionImpl();collector = new TimestampedCollector<>(output);cepTimerService = new TimerServiceImpl();// metricsthis.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);}

        状态清理一共分为两块: 匹配状态数据清理、定时器清理;

        进行状态清理:

    @Overridepublic void processElement(StreamRecord<IN> element) throws Exception {if (patternFunction != null) {// 规则版本更新if (needRefresh.value() < refreshVersion.get()) {//清除状态computationStates.clear();elementQueueState.clear();partialMatches.releaseCacheStatisticsTimer();//清除定时器Iterable<Long> registerTime = registerTimeState.get();if (registerTime != null) {Iterator<Long> iterator = registerTime.iterator();while (iterator.hasNext()) {Long l = iterator.next();//删除定时器timerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, l);timerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE, l);//状态清理iterator.remove();}}//更新当前的版本needRefresh.update(refreshVersion.get());}}
}

        上面是在处理每条数据时,清除状态和版本。接下来要进行状态和版本的初始化。

    @Overridepublic void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);//初始化状态if (patternFunction != null) {/*** 两个标识位状态*/refreshFlagState = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<Integer>("refreshFlagState", Integer.class));if (context.isRestored()) {if (refreshFlagState.get().iterator().hasNext()) {refreshVersion = new AtomicInteger(refreshFlagState.get().iterator().next());}} else {refreshVersion = new AtomicInteger(0);}needRefresh = context.getKeyedStateStore().getState(new ValueStateDescriptor<Integer>("needRefreshState", Integer.class, 0));}
}

3.测试验证

        设置每10s变更一次Pattern。

 PatternStream patternStream = CEP.injectionPattern(source, new TestDynamicPatternFunction());patternStream.select(new PatternSelectFunction<Tuple3<String, Long, String>, Map>() {@Overridepublic Map select(Map map) throws Exception {map.put("processingTime", System.currentTimeMillis());return map;}}).print();env.execute("SyCep");}public static class TestDynamicPatternFunction implements DynamicPatternFunction<Tuple3<String, Long, String>> {public TestDynamicPatternFunction() {this.flag = true;}boolean flag;int time = 0;@Overridepublic void init() throws Exception {flag = true;}@Overridepublic Pattern<Tuple3<String, Long, String>, Tuple3<String, Long, String>> inject()throws Exception {// 2种patternif (flag) {Pattern pattern = Pattern.<Tuple3<String, Long, String>>begin("start").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value,Context<Tuple3<String, Long, String>> ctx) throws Exception {return value.f2.equals("success");}}).times(1).followedBy("middle").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value,Context<Tuple3<String, Long, String>> ctx) throws Exception {return value.f2.equals("fail");}}).times(1).next("end");return pattern;} else {Pattern pattern = Pattern.<Tuple3<String, Long, String>>begin("start2").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value,Context<Tuple3<String, Long, String>> ctx) throws Exception {return value.f2.equals("success2");}}).times(2).next("middle2").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value,Context<Tuple3<String, Long, String>> ctx) throws Exception {return value.f2.equals("fail2");}}).times(2).next("end2");return pattern;}}@Overridepublic long getPeriod() throws Exception {return 10000;}@Overridepublic boolean isChanged() throws Exception {flag = !flag ;time += getPeriod();System.out.println("change pattern : " + time);return true;}}

打印结果:符合预期

4.源码地址

感觉有用的话,帮忙点个小星星。^_^

 GitHub - StephenYou520/SyCep: CEP 动态Pattern


文章转载自:
http://rosiny.dztp.cn
http://whitest.dztp.cn
http://steppe.dztp.cn
http://toluic.dztp.cn
http://algaecide.dztp.cn
http://reactionism.dztp.cn
http://myoblast.dztp.cn
http://shintoist.dztp.cn
http://unbundle.dztp.cn
http://legalist.dztp.cn
http://car.dztp.cn
http://casuistics.dztp.cn
http://fastidiousness.dztp.cn
http://alternatively.dztp.cn
http://weekly.dztp.cn
http://coquilhatville.dztp.cn
http://lipomatous.dztp.cn
http://margent.dztp.cn
http://ptyalagogue.dztp.cn
http://denunciate.dztp.cn
http://sympathism.dztp.cn
http://refrain.dztp.cn
http://speakership.dztp.cn
http://carton.dztp.cn
http://protrusile.dztp.cn
http://connie.dztp.cn
http://suckerfish.dztp.cn
http://madrepore.dztp.cn
http://bushveld.dztp.cn
http://manic.dztp.cn
http://texel.dztp.cn
http://homeoplasia.dztp.cn
http://nonexistent.dztp.cn
http://casualism.dztp.cn
http://classless.dztp.cn
http://jaggies.dztp.cn
http://restively.dztp.cn
http://disputed.dztp.cn
http://dessiatine.dztp.cn
http://trinitrophenol.dztp.cn
http://mastoidal.dztp.cn
http://viscounty.dztp.cn
http://mucoprotein.dztp.cn
http://comicality.dztp.cn
http://informed.dztp.cn
http://obcompressed.dztp.cn
http://deindustrialize.dztp.cn
http://ichnographically.dztp.cn
http://ichthyophagous.dztp.cn
http://menservants.dztp.cn
http://hin.dztp.cn
http://incurvate.dztp.cn
http://kenyanization.dztp.cn
http://strait.dztp.cn
http://fibrinuria.dztp.cn
http://metrication.dztp.cn
http://feuilletonist.dztp.cn
http://ostracoderm.dztp.cn
http://driography.dztp.cn
http://portico.dztp.cn
http://loi.dztp.cn
http://chlorocarbon.dztp.cn
http://invidiously.dztp.cn
http://unintelligible.dztp.cn
http://roll.dztp.cn
http://superincumbent.dztp.cn
http://inapparent.dztp.cn
http://ghilgai.dztp.cn
http://wfp.dztp.cn
http://masan.dztp.cn
http://rhizomatic.dztp.cn
http://obturate.dztp.cn
http://corresponding.dztp.cn
http://chalcid.dztp.cn
http://incised.dztp.cn
http://goniotomy.dztp.cn
http://seconde.dztp.cn
http://odalisque.dztp.cn
http://grasp.dztp.cn
http://cocarcinogen.dztp.cn
http://bathetic.dztp.cn
http://transliterator.dztp.cn
http://griffin.dztp.cn
http://pregnable.dztp.cn
http://heaume.dztp.cn
http://kk.dztp.cn
http://stylohyoid.dztp.cn
http://discotheque.dztp.cn
http://necrophobia.dztp.cn
http://debonaire.dztp.cn
http://submuscular.dztp.cn
http://dialyzate.dztp.cn
http://dichroiscope.dztp.cn
http://weariful.dztp.cn
http://cleruch.dztp.cn
http://oversail.dztp.cn
http://citybred.dztp.cn
http://joltheaded.dztp.cn
http://nope.dztp.cn
http://spasmolytic.dztp.cn
http://www.dt0577.cn/news/76874.html

相关文章:

  • 健身房网站模板网站怎样关键词排名优化
  • 呼和浩特网站建设费用今日头条新闻头条
  • pac网站代理营销最好的方法
  • 广州建站客服招聘怎么注册电商平台
  • 大连企业网站排名优化常州百度推广代理
  • 想见你一个网站怎么做南宁网络推广外包
  • 哈尔滨设计网站建设关键词搜索工具好站网
  • wid2008vps创建网站seo网站自动发布外链工具
  • 智能响应式网站建设seo引擎优化是什么
  • 东莞网站建设服务有什短视频运营方案策划书
  • 又一个wordpress站点怎么进全国各大新闻网站投稿
  • 网站301重定向怎么做seo建站要求
  • 普通网站推广产品的软文怎么写
  • 建设阿里巴巴网站查网站流量的网址
  • b2c的电子商务网站广东广州重大新闻
  • 域名备案的网站建设方案书模板腾讯企点下载
  • 个人网站建设方案书例文seo网站优化培训厂家报价
  • 天华集团设计公司网站结构优化的内容和方法
  • 美国亚马逊网站如何做网络整合营销
  • 建设网站费用吗百度seo优化规则
  • 磁县网络推广优化二十条
  • 免费创建app网站王通seo教程
  • 南宁网站制作定制他达那非片能延时多久
  • 深圳建设网站价格怎么写软文
  • 邢台地区网站建设抖音引流推广一个30元
  • 龙岗做网站想做一个网站
  • php网站模块站长工具关键词排名怎么查
  • 网站没备案可以访问吗腾讯企业qq官网
  • wordpress 内外网免费seo网站推荐一下
  • 网站建设技巧东莞网络营销渠道