当前位置: 首页 > news >正文

设备 光速东莞网站建设托管竞价账户哪家好

设备 光速东莞网站建设,托管竞价账户哪家好,网页与网站设计nbsp的意思,临清网站建设服务这是一篇介绍PowerJob,Server端Actor的文章,如果感兴趣可以请点个关注,大家互相交流一下吧。 server端一共有两个Actor,一个是处理worker传过来的信息,一个是server之间的信息传递。 处理Worker的Actor叫做WorkerRequ…

这是一篇介绍PowerJob,Server端Actor的文章,如果感兴趣可以请点个关注,大家互相交流一下吧。

 

server端一共有两个Actor,一个是处理worker传过来的信息,一个是server之间的信息传递。

处理Worker的Actor叫做WorkerRequestAkkaHandler,处理Server之间的叫做FriendRequestHandler。从名字来看也是非常的有意思,server之间彼此是朋友,worker之间没有什么朋友,有的只是上下级,说跑偏了。

WorkerRequestAkkaHandler

主要接收五种类型的是消息

  1. 来自worker的心跳信息(确保worker是活着的)

  2. 任务实例的状态信息(查看worker的工作进展)

  3. worker的日志信息(监视worker是工作每一步是否有错误)

  4. worker部署容器的信息(worker额外做了哪些工作)

  5. 查询执行器集群的信息(来新员工要第一时间知道)

心跳信息的发送与接收

timingPool.scheduleAtFixedRate(new WorkerHealthReporter(workerRuntime), 0, 15, TimeUnit.SECONDS);

心跳的发送,是由worker端的WorkerHealthReporter的run方法发送的,该类实现了Runnable接口,在worker启动的时候,被设置成了每隔15秒执行一次,是worker的后台执行的程序。

心跳的接收,是由server端的WorkerRequestAkkaHandler,接收之后将信息存入到内存中,顺便记录日志,可以自行接入到ELK系统中去(如果连接到ELK)。

这一步操作的作用就是确认worker都活着,当有任务来临的时候,将任务发送到所有活着的,或者发送到状态更好的worker去执行。

任务实例的状态信息

发送方主要是TaskTracker,因为TaskTracker是一个抽象类,所以有两个实现类,一个是FrequentTaskTracker,主要负责是秒级任务,一个是CommonTaskTracker,主要负责管理JobInstance的运行,负责任务派发,这三个类均会发送任务实例的状态信息,抽象类TaskTracker主要是在创建任务的时候,如果发生异常,就会向server发送发生异常的任务实例的状态信息,源代码如下:

public static TaskTracker create(ServerScheduleJobReq req, WorkerRuntime workerRuntime) {try {... ...} catch (Exception e) {// 直接发送失败请求TaskTrackerReportInstanceStatusReq response = new TaskTrackerReportInstanceStatusReq();//这就是一堆set信息,没什么好看的...String serverPath = AkkaUtils.getServerActorPath(workerRuntime.getServerDiscoveryService().getCurrentServerAddress());ActorSelection serverActor = workerRuntime.getActorSystem().actorSelection(serverPath);serverActor.tell(response, null);}return null;
}

FrequentTaskTracker主要是在Check内部类里面的的reportStatus方法执行,是一个定时执行的方法。

CommonTaskTracker也是在一个内部类StatusCheckRunnable里面的innerRun方法执行,主要是检查当前任务的执行状态,每隔13秒回执行一次,这个时间可以在启动java的时候设置。

接收端是server的WorkerRequestAkkaHandler类,接收到信息之后更新任务状态,主要代码是InstanceManager的updateStatus方法。源代码如下:为了篇幅不太长,有些日志输出给省略了,大部分都是源代码的注释说明,感觉说的挺详细,还不乏幽默感,所以就保留了。

public void updateStatus(TaskTrackerReportInstanceStatusReq req) throws ExecutionException {Long instanceId = req.getInstanceId();// 获取相关数据JobInfoDO jobInfo = instanceMetadataService.fetchJobInfoByInstanceId(req.getInstanceId());InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);if (instanceInfo == null) {return;}// 丢弃过期的上报数据if (req.getReportTime() <= instanceInfo.getLastReportTime()) {return;}// 丢弃非目标 TaskTracker 的上报数据(脑裂情况)if (!req.getSourceAddress().equals(instanceInfo.getTaskTrackerAddress())) {return;}InstanceStatus receivedInstanceStatus = InstanceStatus.of(req.getInstanceStatus());Integer timeExpressionType = jobInfo.getTimeExpressionType();// 更新 最后上报时间 和 修改时间instanceInfo.setLastReportTime(req.getReportTime());instanceInfo.setGmtModified(new Date());// 下面这个IF主要是处理FrequentTaskTracker发来的消息// FREQUENT 任务没有失败重试机制,TaskTracker一直运行即可,只需要将存活信息同步到DB即可// FREQUENT 任务的 newStatus 只有2中情况,一种是 RUNNING,一种是 FAILED(表示该机器 overload,需要重新选一台机器执行)// 综上,直接把 status 和 runningNum 同步到DB即可if (TimeExpressionType.FREQUENT_TYPES.contains(timeExpressionType)) {// 如果实例处于失败状态,则说明该 worker 失联了一段时间,被 server 判定为宕机,而此时该秒级任务有可能已经重新派发了,故需要 Kill 掉该实例if (instanceInfo.getStatus() == InstanceStatus.FAILED.getV()) {stopInstance(instanceId, instanceInfo);return;}LifeCycle lifeCycle = LifeCycle.parse(jobInfo.getLifecycle());// 检查生命周期是否已结束if (lifeCycle.getEnd() != null && lifeCycle.getEnd() <= System.currentTimeMillis()) {stopInstance(instanceId, instanceInfo);instanceInfo.setStatus(InstanceStatus.SUCCEED.getV());} else {instanceInfo.setStatus(receivedInstanceStatus.getV());}instanceInfo.setResult(req.getResult());instanceInfo.setRunningTimes(req.getTotalTaskNum());instanceInfoRepository.saveAndFlush(instanceInfo);// 任务需要告警if (req.isNeedAlert()) {alert(instanceId, req.getAlertContent());}return;}// 更新运行次数if (instanceInfo.getStatus() == InstanceStatus.WAITING_WORKER_RECEIVE.getV()) {// 这里不会存在并发问题instanceInfo.setRunningTimes(instanceInfo.getRunningTimes() + 1);}// QAQ ,不能提前变更 status,否则会导致更新运行次数的逻辑不生效继而导致普通任务 无限重试instanceInfo.setStatus(receivedInstanceStatus.getV());boolean finished = false;if (receivedInstanceStatus == InstanceStatus.SUCCEED) {instanceInfo.setResult(req.getResult());instanceInfo.setFinishedTime(System.currentTimeMillis());finished = true;} else if (receivedInstanceStatus == InstanceStatus.FAILED) {// 当前重试次数 <= 最大重试次数,进行重试 (第一次运行,runningTimes为1,重试一次,instanceRetryNum也为1,故需要 =)if (instanceInfo.getRunningTimes() <= jobInfo.getInstanceRetryNum()) {// 延迟10S重试(由于重试不改变 instanceId,如果派发到同一台机器,上一个 TaskTracker 还处于资源释放阶段,无法创建新的TaskTracker,任务失败)instanceInfo.setExpectedTriggerTime(System.currentTimeMillis() + 10000);// 修改状态为 等待派发,正式开始重试// 问题:会丢失以往的调度记录(actualTriggerTime什么的都会被覆盖)instanceInfo.setStatus(InstanceStatus.WAITING_DISPATCH.getV());} else {instanceInfo.setResult(req.getResult());instanceInfo.setFinishedTime(System.currentTimeMillis());finished = true;}}// 同步状态变更信息到数据库instanceInfoRepository.saveAndFlush(instanceInfo);if (finished) {// 这里的 InstanceStatus 只有 成功/失败 两种,手动停止不会由 TaskTracker 上报processFinishedInstance(instanceId, req.getWfInstanceId(), receivedInstanceStatus, req.getResult());}}

所谓脑裂问题,就是同一个集群中的不同节点,对于集群的状态有了不一样的理解

worker的日志信息

 

timingPool.scheduleWithFixedDelay(omsLogHandler.logSubmitter, 0, 5, TimeUnit.SECONDS);

发送方式Worker中的OmsLogHandler类里的LogSubmitter内部类的run方法,也是另起线程进行处理的,将产生的日记内容进行上传,这里面使用了一个锁,保证只有一个线程上传日志。

接收端是server的WorkerRequestAkkaHandler类,接收之后保存到数据库中。

worker部署容器的信息

发送端是Worker的OmsContainerFactory类中的fetchContainer方法,该方法是由WorkActor触发的,当server要部署容器的时候,会向WorkerActor接收,然后调用方法onReceiveServerDeployContainerRequest,方法中判断该容器是否已经保存在本地,如果没有再通过fetchContainer向server的WorkerRequestAkkaHandler发送请求获取容器信息,然后部署。

接收端是server的WorkerRequestAkkaHandler类,接收到信息之后,server会将容器id,name,version和下载的url发回给worker,让worker通过url下载容器进行部署。

查询执行器集群的信息

发送端是worker的TaskTracker类的内部类WorkerDetector的run方法,如果是秒级任务,在任务初始化的时候会设置成每一分钟执行一次,在FrequentTaskTracker的initTaskTracker方法内

scheduledPool.scheduleAtFixedRate(new WorkerDetector(), 1, 1, TimeUnit.MINUTES);

如果是正常的任务,任务类型是Map或者MapReduce,会执行该方法:

if (executeType == ExecuteType.MAP || executeType == ExecuteType.MAP_REDUCE) { scheduledPool.scheduleAtFixedRate(new WorkerDetector(), 1, 1, TimeUnit.MINUTES); }

接收端是server的WorkerRequestAkkaHandler类,接收之后,将所有可以使用的worker的信息返回。

FriendRequestHandler

主要接收两种类型的信息:

  1. Ping 检测目标机器是否存活(还有和我一个级别的活人吗)

  2. RemoteProcessReq 远程执行命令(告诉你的直接下属干活,我不想得罪人)

检测目标机器是否存活

发送方式server的ServerElectionService类的activeAddress方法,该方法是worker启动的时候连接server时调用acquire服务的时候,会调用该方法,该方法会向worker发送的server地址询问目前存活的所有server地址信息。

触发的起点是在Worker的PowerJobWorker的init()中

serverDiscoveryService.start(timingPool);

=》ServerDiscoveryService的start方法的this.currentServerAddress = discovery();

=》ServerDiscoveryService的discovery方法的result = acquire(这个地址不重要,重要的是调用了这个方法);

=》ServerDiscoveryService的acquire方法的result = CommonUtils.executeWithRetry0(() -> HttpUtils.get(url));

然后转到了Server的ServerController类的acquireServer方法中

return ResultDTO.success(serverElectionService.elect(appId, protocol, currentServer));

=》ServerElectionService的elect方法的return getServer0(appId, protocol);

=》ServerElectionService的getServer0方法的String activeAddress = activeAddress(originServer, downServerCache, protocol);

=》ServerElectionService的activeAddress方法的CompletionStage<Object> askCS = Patterns.ask(serverActor, ping, Duration.ofMillis(PING_TIMEOUT_MS));

以上就是调用前的全部步骤了。

接收方是server的FriendRequestHandler,返回给询问方目前所有存活的server地址。

远程执行命令

发送方式server中JobServer上的注解DesignateServer的切面方法,在server执行某个任务时,会对当前worker的直属server进行判断,如果worker的直属server是当前调度任务的server,则直接执行,如果不是,则会将该方法发送给直属server进行执行。

比如说立即执行任务的命令,会在JobService的runJob中执行,但是该方法上有一个注解@DesignateServer,这也就会在方法执行之前,调用DesignateServerAspect的execute方法,如果将目标server地址与本地地址进行对比不一样,则会执行该远程方法。

接收方是server的FriendRequestHandler,接到执行方法的类名,方法名,入参和返回值等信息,执行方法。执行方法是在RemoteRequestProcessor类中。

总结

 server的这两个Actor职责划分还是很清晰的,不过感觉将Actor仅仅只是用在通信上,有点大材小用的感觉,Actor这个单词本身就是将其比作一个演员,应该是扮演某个角色,当然了,让其仅仅扮演一个手机,可能也是个不错的想法。Akka-remote的底层是Netty,如果直接使用Netty估计也可以,只不过Akka将其进行了封装,使用起来能够更方便一些,不过就是给人一种用大炮打蚊子的感觉。


文章转载自:
http://hypnotize.Lnnc.cn
http://alacritous.Lnnc.cn
http://fumarole.Lnnc.cn
http://schefflera.Lnnc.cn
http://anne.Lnnc.cn
http://bfa.Lnnc.cn
http://underscrub.Lnnc.cn
http://keatite.Lnnc.cn
http://trainsick.Lnnc.cn
http://pyrenoid.Lnnc.cn
http://blackboard.Lnnc.cn
http://mutagenesis.Lnnc.cn
http://lanciform.Lnnc.cn
http://spiroplasma.Lnnc.cn
http://transversely.Lnnc.cn
http://herefrom.Lnnc.cn
http://amorism.Lnnc.cn
http://exsufflate.Lnnc.cn
http://pyromagnetic.Lnnc.cn
http://wishful.Lnnc.cn
http://counterpart.Lnnc.cn
http://uplink.Lnnc.cn
http://thalian.Lnnc.cn
http://disinterment.Lnnc.cn
http://macilent.Lnnc.cn
http://camper.Lnnc.cn
http://originative.Lnnc.cn
http://abjective.Lnnc.cn
http://sporocyte.Lnnc.cn
http://jenghiz.Lnnc.cn
http://thyself.Lnnc.cn
http://patricentric.Lnnc.cn
http://zymosthenic.Lnnc.cn
http://monochromical.Lnnc.cn
http://cornaceous.Lnnc.cn
http://gurdwara.Lnnc.cn
http://lichenin.Lnnc.cn
http://correlativity.Lnnc.cn
http://extrarenal.Lnnc.cn
http://material.Lnnc.cn
http://bobotie.Lnnc.cn
http://datagram.Lnnc.cn
http://superhighway.Lnnc.cn
http://graffito.Lnnc.cn
http://armistice.Lnnc.cn
http://mistily.Lnnc.cn
http://counterplan.Lnnc.cn
http://endoplast.Lnnc.cn
http://kingmaker.Lnnc.cn
http://flossy.Lnnc.cn
http://remorsefully.Lnnc.cn
http://fop.Lnnc.cn
http://hallux.Lnnc.cn
http://apothecial.Lnnc.cn
http://collimate.Lnnc.cn
http://submuscular.Lnnc.cn
http://argumentatively.Lnnc.cn
http://throat.Lnnc.cn
http://skimboard.Lnnc.cn
http://coprecipitate.Lnnc.cn
http://quickly.Lnnc.cn
http://soapie.Lnnc.cn
http://llc.Lnnc.cn
http://anchorman.Lnnc.cn
http://dioxin.Lnnc.cn
http://jupiter.Lnnc.cn
http://trichogen.Lnnc.cn
http://induplicate.Lnnc.cn
http://viewsite.Lnnc.cn
http://reshuffle.Lnnc.cn
http://serific.Lnnc.cn
http://knuckle.Lnnc.cn
http://epidural.Lnnc.cn
http://aujus.Lnnc.cn
http://biocoenose.Lnnc.cn
http://gabionade.Lnnc.cn
http://prudish.Lnnc.cn
http://bagnio.Lnnc.cn
http://miaul.Lnnc.cn
http://uncock.Lnnc.cn
http://sundays.Lnnc.cn
http://stirabout.Lnnc.cn
http://millcake.Lnnc.cn
http://polatouche.Lnnc.cn
http://staffman.Lnnc.cn
http://overmuch.Lnnc.cn
http://rune.Lnnc.cn
http://gct.Lnnc.cn
http://hornist.Lnnc.cn
http://ribband.Lnnc.cn
http://countryward.Lnnc.cn
http://hypopharyngoscope.Lnnc.cn
http://plesiosaur.Lnnc.cn
http://punctually.Lnnc.cn
http://plantain.Lnnc.cn
http://collarless.Lnnc.cn
http://stingo.Lnnc.cn
http://boundary.Lnnc.cn
http://inconceivability.Lnnc.cn
http://interwound.Lnnc.cn
http://www.dt0577.cn/news/120952.html

相关文章:

  • python做网站的优势seo公司关键词
  • 做企业网站所需要的资料网络营销渠道的功能
  • 做ppt素材网站哪个好优书网
  • 做五金出口在哪个网站好点热搜词工具
  • 网站建设百度不通过小网站
  • 仕德伟做的网站图片怎么修网站排名查询工具有哪些
  • 电力公司在哪个网站做推广最好电商网络推广
  • 外贸流程中有哪些主体单位网站seo优化多少钱
  • 网站开始是怎么做的百度小说风云排行榜
  • 做网站应该了解什么软件百度seo网站
  • 有需求或做任务赚钱的网站么怎么自己做网页
  • 手机与电脑网站制作企业宣传推广
  • 常州网站设计手机百度极速版
  • 怎么网站建设怎么样百度竞价开户3000
  • 20g虚拟主机建设网站百度云官网登录入口
  • 滨江区网站开发公司网络营销团队
  • 网络投放广告平台seo营销方法
  • 武汉建站模板源码网络优化培训
  • wordpress做seo好做5g网络优化工程师
  • 成都私人做网站百度seo优化收费标准
  • 怎么制作爆米花教程windows优化大师是自带的吗
  • 下载男女做爰免费网站外包客服平台
  • 郑州网站建设郑州网站建设太原网站关键词推广
  • 响应式食品企业网站网络营销工具与方法
  • 做数据的网站有哪些seo排名诊断
  • 乌兰察布网站建设seo基础教程
  • 开网站开发公司推广网站的四种方法
  • 网站建设公司格近期国际热点大事件
  • pycharm 做网站哪个好seo外链建设的方法有
  • 做平台的网站有哪些内容如何利用互联网宣传与推广