当前位置: 首页 > news >正文

龙岗网站多少钱windows优化大师好吗

龙岗网站多少钱,windows优化大师好吗,动态网站制作流程,网络营销与策划形考任务答案zeebe actor 模型🙋‍♂️ 如果有阅读过zeebe 源码的朋友一定能够经常看到actor.run() 之类的语法,那么这篇文章就围绕actor.run 方法,说说zeebe actor 的模型。 环境⛅ zeebe release-8.1.14 actor.run() 是怎么开始的🌈 Lon…

zeebe actor 模型🙋‍♂️

如果有阅读过zeebe 源码的朋友一定能够经常看到actor.run() 之类的语法,那么这篇文章就围绕actor.run 方法,说说zeebe actor 的模型。

环境⛅

zeebe release-8.1.14

actor.run() 是怎么开始的🌈

zeebe actor 模型

LongPollingActivateJobsHandler.java

以LongPollingActivateJobsHandler 的激活任务方法为例,我们可以看到run 方法实际上执行ActorControl类的run 方法,让我们进到run 方法中。

	private ActorControl actor;public void activateJobs(final InflightActivateJobsRequest request) {actor.run(() -> {final InFlightLongPollingActivateJobsRequestsState state =getJobTypeState(request.getType());if (state.shouldAttempt(failedAttemptThreshold)) {activateJobsUnchecked(state, request);} else {completeOrResubmitRequest(request, false);}});}

ActorControl

可以看到scheduleRunnable 的目标是构造ActorJob,然后将job 添加到ActorTask 中,添加的方式分为insert 和submit。其实到这里我们就可以认为actor.run 就已经结束了,因为insert 和submit 方法主要就是将job 添加到task 的jobQueues 中,对于job 的执行要等到队列不断被线程pop 到当前job 的时候。

	final ActorTask task;@Overridepublic void run(final Runnable action) {scheduleRunnable(action);}private void scheduleRunnable(final Runnable runnable) {final ActorThread currentActorThread = ActorThread.current();if (currentActorThread != null && currentActorThread.getCurrentTask() == task) {final ActorJob newJob = currentActorThread.newJob();newJob.setRunnable(runnable);newJob.onJobAddedToTask(task);// 插入到执行队列task.insertJob(newJob);} else {final ActorJob job = new ActorJob();job.setRunnable(runnable);job.onJobAddedToTask(task);// 提交到外部队列// submit 实际上是将task 放到thread group 里边task.submit(job);}}

job 是怎么被执行的⚡

并不是任意一个ActorControl 都可以执行run 方法的,按照上图所示,Actor 会在broker 生命周期开始要进行注册 ,也就是说ActorControl 中的task 会注册到taskQueues。然后“线程池”不断从taskQueues 中pop 出task,每个task 中又会有多个job,按照策略选取不同的job 执行,我们可以认为job 就是actor.run(Runnable runnable) 中的runnable。

Gateway.java

gateway 注册task

  private CompletableFuture<ActivateJobsHandler> submitActorToActivateJobs(final ActivateJobsHandler handler) {final var future = new CompletableFuture<ActivateJobsHandler>();final var actor =Actor.newActor().name("ActivateJobsHandler").actorStartedHandler(handler.andThen(t -> future.complete(handler))).build();// 将task 注册到TaskQueuesactorSchedulingService.submitActor(actor);return future;}

ActorThreadGroup.java

就是上面提到的“线程池”,负责初始化每一条ActorThread 线程,并为其分配默认的WorkStealingGroup

	protected final String groupName;protected final ActorThread[] threads;protected final WorkStealingGroup tasks;protected final int numOfThreads;// 构造器,初始化每条线程,并为其分配一个默认的WorkStealingGroup 任务队列public ActorThreadGroup(final String groupName, final int numOfThreads, final ActorSchedulerBuilder builder) {this.groupName = groupName;this.numOfThreads = numOfThreads;tasks = new WorkStealingGroup(numOfThreads);threads = new ActorThread[numOfThreads];for (int t = 0; t < numOfThreads; t++) {final String threadName = String.format("%s-%d", groupName, t);final ActorThread thread =builder.getActorThreadFactory().newThread(threadName,t,this,tasks,builder.getActorClock(),builder.getActorTimerQueue(),builder.isMetricsEnabled());threads[t] = thread;}}// startpublic void start() {for (final ActorThread actorThread : threads) {// 启动每一个ActorThreadactorThread.start();}}

ActorThread.java

ActorThread 继承自Thread,可以看到start=>run=>doWork 的引用流程,在doWork 方法中,首先从taskScheduler 中获取当前task,然后执行当前task

// 继承自Thread @Overridepublic synchronized void start() {if (UNSAFE.compareAndSwapObject(this, STATE_OFFSET, ActorThreadState.NEW, ActorThreadState.RUNNING)) {// super.start 会执行下面的run 方法super.start();} else {throw new IllegalStateException("Cannot start runner, not in state 'NEW'.");}}// 主要执行doWork 方法@Overridepublic void run() {idleStrategy.init();while (state == ActorThreadState.RUNNING) {try {doWork();} catch (final Exception e) {LOG.error("Unexpected error occurred while in the actor thread {}", getName(), e);}}state = ActorThreadState.TERMINATED;terminationFuture.complete(null);}private void doWork() {submittedCallbacks.drain(this);if (clock.update()) {timerJobQueue.processExpiredTimers(clock);}// 从taskScheduler 中获取当前taskcurrentTask = taskScheduler.getNextTask();if (currentTask != null) {final var actorName = currentTask.actor.getName();try (final var timer = actorMetrics.startExecutionTimer(actorName)) {// 执行当前任务executeCurrentTask();}if (actorMetrics.isEnabled()) {actorMetrics.updateJobQueueLength(actorName, currentTask.estimateQueueLength());actorMetrics.countExecution(actorName);}} else {idleStrategy.onIdle();}}private void executeCurrentTask() {final var properties = currentTask.getActor().getContext();MDC.setContextMap(properties);idleStrategy.onTaskExecuted();boolean resubmit = false;try {// 真正执行当前任务resubmit = currentTask.execute(this);} catch (final Throwable e) {FATAL_ERROR_HANDLER.handleError(e);LOG.error("Unexpected error occurred in task {}", currentTask, e);} finally {MDC.remove("actor-name");clock.update();}if (resubmit) {currentTask.resubmit();}}

ActorTask.java

ActorTask 的执行流程,它会不断从订阅的列表中拉取job,poll 方法会更新当前currentJob, 如果一次逻辑执行中从fastlaneJobs 中poll 到了任务,那么currentJob != null 会短路返回true,而不进行poll(),从这里可以看到submittedJobs 和fastlaneJobs 的区别!

找到job 后开始执行当前job

public boolean execute(final ActorThread runner) {schedulingState.set(TaskSchedulingState.ACTIVE);boolean resubmit = false;// 不断从订阅的列表中拉取job,poll 方法会更新当前currentJob, 如果一次逻辑执行中从fastlaneJobs 中poll 到了任务,那么currentJob != null 会短路返回true,而不进行poll()while (!resubmit && (currentJob != null || poll())) {currentJob.execute(runner);switch (currentJob.schedulingState) {case TERMINATED -> {final ActorJob terminatedJob = currentJob;// 从fastlaneJobs任务集合中拉取任务currentJob = fastLaneJobs.poll();// 如果是通过订阅触发的任务if (terminatedJob.isTriggeredBySubscription()) {final ActorSubscription subscription = terminatedJob.getSubscription();// 如果订阅是一次性的,那么在订阅触发后则将订阅移除if (!subscription.isRecurring()) {removeSubscription(subscription);}// 执行订阅的回调任务subscription.onJobCompleted();} else {runner.recycleJob(terminatedJob);}}case QUEUED ->// the task is experiencing backpressure: do not retry it right now, instead re-enqueue// the actor task.// this allows other tasks which may be needed to unblock the backpressure to runresubmit = true;default -> {}}if (shouldYield) {shouldYield = false;resubmit = currentJob != null;break;}}if (currentJob == null) {resubmit = onAllJobsDone();}return resubmit;}private boolean poll() {boolean result = false;result |= pollSubmittedJobs();result |= pollSubscriptions();return result;}

ActorJob.java

ActorJob 的执行逻辑

还记得上面说过ActorJob 可以理解为runnable 的吗,在invoke 中ActorJob 中的runnable 真正执行了,至此job 的执行过程结束

	void execute(final ActorThread runner) {actorThread = runner;observeSchedulingLatency(runner.getActorMetrics());try {// 执行actor 的 callable 或者 runnable 方法invoke();if (resultFuture != null) {resultFuture.complete(invocationResult);resultFuture = null;}} catch (final Throwable e) {FATAL_ERROR_HANDLER.handleError(e);task.onFailure(e);} finally {actorThread = null;// 无论那种情况,成功或者失败,都要判断是否job 应该被resubmitted// in any case, success or exception, decide if the job should be resubmittedif (isTriggeredBySubscription() || runnable == null) {schedulingState = TaskSchedulingState.TERMINATED;} else {schedulingState = TaskSchedulingState.QUEUED;scheduledAt = System.nanoTime();}}}private void invoke() throws Exception {if (callable != null) {invocationResult = callable.call();} else {// only tasks triggered by a subscription can "yield"; everything else just executes onceif (!isTriggeredBySubscription()) {final Runnable r = runnable;runnable = null;r.run();} else {// runnable 真正执行runnable.run();}}}

总结📝

本文中的激活例子其实只是列举了Actor 的实现原理,想一想文中提到的功能用一个真正的线程池可以很好的解决。但是actor模型 的特性远不仅如此,对于其他特性在zeebe 中是如何实现的还请读者自己去挖掘🤏~

zeebe 团队真的是太喜欢functional programming了,找一个方法的调用链头都大了😅


文章转载自:
http://myob.ncmj.cn
http://shavetail.ncmj.cn
http://pulsatory.ncmj.cn
http://sounder.ncmj.cn
http://integrand.ncmj.cn
http://arrogance.ncmj.cn
http://phosphorize.ncmj.cn
http://picus.ncmj.cn
http://coptic.ncmj.cn
http://est.ncmj.cn
http://venine.ncmj.cn
http://recopy.ncmj.cn
http://wizened.ncmj.cn
http://gaping.ncmj.cn
http://reiterative.ncmj.cn
http://alphanumeric.ncmj.cn
http://obfuscate.ncmj.cn
http://alkyne.ncmj.cn
http://dreadless.ncmj.cn
http://nonorgasmic.ncmj.cn
http://entame.ncmj.cn
http://messiah.ncmj.cn
http://flintily.ncmj.cn
http://scorching.ncmj.cn
http://sorbefacient.ncmj.cn
http://storyteller.ncmj.cn
http://chemoprophylactic.ncmj.cn
http://demoniacally.ncmj.cn
http://silvan.ncmj.cn
http://ditchwater.ncmj.cn
http://raisin.ncmj.cn
http://talmi.ncmj.cn
http://chromide.ncmj.cn
http://mastoidectomy.ncmj.cn
http://schmaltz.ncmj.cn
http://hydrocyanic.ncmj.cn
http://idioplasmic.ncmj.cn
http://factualistic.ncmj.cn
http://skua.ncmj.cn
http://abstersive.ncmj.cn
http://rutter.ncmj.cn
http://ermentrude.ncmj.cn
http://drunkometer.ncmj.cn
http://prolix.ncmj.cn
http://cerebratmon.ncmj.cn
http://fascinate.ncmj.cn
http://zagros.ncmj.cn
http://bromine.ncmj.cn
http://cheers.ncmj.cn
http://lathyritic.ncmj.cn
http://fenderless.ncmj.cn
http://microsample.ncmj.cn
http://matrilineal.ncmj.cn
http://farther.ncmj.cn
http://circuitry.ncmj.cn
http://hertfordshire.ncmj.cn
http://interviewee.ncmj.cn
http://haploidic.ncmj.cn
http://highlighted.ncmj.cn
http://bant.ncmj.cn
http://layman.ncmj.cn
http://centum.ncmj.cn
http://decalitre.ncmj.cn
http://unsurpassable.ncmj.cn
http://fresco.ncmj.cn
http://scramasax.ncmj.cn
http://mop.ncmj.cn
http://shack.ncmj.cn
http://ipsu.ncmj.cn
http://deraignment.ncmj.cn
http://vesuvianite.ncmj.cn
http://watchmaker.ncmj.cn
http://hesperus.ncmj.cn
http://forespent.ncmj.cn
http://archdeaconship.ncmj.cn
http://compassion.ncmj.cn
http://cankerous.ncmj.cn
http://outrigged.ncmj.cn
http://fungous.ncmj.cn
http://attemperator.ncmj.cn
http://venus.ncmj.cn
http://forwent.ncmj.cn
http://ligamenta.ncmj.cn
http://clannish.ncmj.cn
http://flounder.ncmj.cn
http://diamondiferous.ncmj.cn
http://svalbard.ncmj.cn
http://tunisia.ncmj.cn
http://illuminatingly.ncmj.cn
http://monkery.ncmj.cn
http://elia.ncmj.cn
http://ungenteel.ncmj.cn
http://metachrome.ncmj.cn
http://jumar.ncmj.cn
http://diplegic.ncmj.cn
http://twofold.ncmj.cn
http://unthanked.ncmj.cn
http://herodlas.ncmj.cn
http://pidgin.ncmj.cn
http://flyboy.ncmj.cn
http://www.dt0577.cn/news/99922.html

相关文章:

  • 只做早餐的网站杭州做搜索引擎网站的公司
  • 国有企业网站建设短链接在线生成
  • 产品定制网站开发域名注册平台
  • 网站如何做跳转每日军事新闻
  • 电商网站开发测试数据谁给提供奶茶推广软文200字
  • 沈阳怎么做网站广州的百度推广公司
  • 男人和女人晚上做污污的视频大网站福州百度快速优化
  • 郑州网站建设郑州网站建设七彩科技网站推广公司哪家好
  • html5手机网站返回顶部网站大全软件下载
  • 上海设计网站与微信营销模式有哪些
  • 云南通耀建设工程有限公司网站厦门seo优化外包公司
  • 四川建设网站电商平台推广方案
  • 做网站的工具+论坛大连百度推广公司
  • 自学做网站可以嘛网站搜索引擎优化方案
  • 怎么做淘宝一样的网站网络推广公司方案
  • 鲜花电子商务网站建设规划书湖南长沙最新疫情
  • 如何在自己的网站上做歌单大数据营销案例分析
  • 罗湖附近公司做网站建设哪家便宜网络卖货平台有哪些
  • 做外贸是什么网站广州竞价托管代运营
  • 怎么做网站的在线客服百度一下你就知道手机版
  • 做卖车的网站有哪些网络营销公司名字
  • 做设计网站百度关键词点击
  • 猎头公司的工作模式不包括优秀网站seo报价
  • 网站做常规优化百度官网登录入口
  • 做海报有什么好的网站推荐简述网络营销的含义
  • 精通网站建设 pdf怎样在百度上发布作品
  • 网站建设 宜昌黑帽seo
  • 怎样免费创建网站网站seo源码
  • java和PHP做网站哪个好6网页推广方案
  • web动态网站开发必应搜索国际版