当前位置: 首页 > news >正文

企业网站开发中文摘要学生个人网页制作html代码

企业网站开发中文摘要,学生个人网页制作html代码,互联网出版中的网站建设策划,南通网站建设祥云本篇主要讲述pulsar topic部分,主要从设计以及源码的视角进行讲述。在pulsar中,一个Topic的新建、扩容以及删除操作都是由Broker来处理的,而Topic相关的数据是存储在zookeeper上的。本篇文章模拟一个高效的学习流程进行展开 介绍使用方式(To…

本篇主要讲述pulsar topic部分,主要从设计以及源码的视角进行讲述。在pulsar中,一个Topic的新建、扩容以及删除操作都是由Broker来处理的,而Topic相关的数据是存储在zookeeper上的。本篇文章模拟一个高效的学习流程进行展开

  1. 介绍使用方式(Topic操作指令)
  2. 从高纬度俯视调用流程(从服务层面看Topic的调用流程)
  3. 逐步切入某个具体的操作进一步展开内部的调用流程(从代码层面看具体的调用流程)
  4. 看具体的实现(看代码具体的实现)
Topic操作指令

在日常对pulsar Topic操作时,咱们常常会用到以下指令

//1. Topic创建
pulsar-admin topics create-partitioned-topic \persistent://my-tenant/my-namespace/my-topic \--partitions 4
//2. Topic扩容
pulsar-admin topics update-partitioned-topic \persistent://my-tenant/my-namespace/my-topic \--partitions 8
//3. Topic删除
pulsar-admin topics delete persistent://test-tenant/ns1/tp1
//4. Topic unload
pulsar-admin topics unload persistent://test-tenant/ns1/tp1

更多的操作可以参考 https://pulsar.apache.org/docs/3.0.x/admin-api-topics/

在这里列举了针对分区并存储Topic的四个操作指令

  1. Topic创建,通过指令可以在指定的租户以及命名空间下创建Topic并指定分区数
  2. Topic扩容,这是业务场景为了提升性能时通常会用到的操作,增加指定Topic的分区数
  3. Topic删除,删除不用的Topic,在删之前一定要在监控上确保此Topic的上下游都没有被使用
  4. Topic unload,重制Topic状态

以上就是使用方式,打个比方就是猪可以用来烹饪,猪脑可以用来烤脑花等等

服务层面调用流程

在这里插入图片描述

Pulsar用户(管理员) 跟Pulsar集群的交互流程可以提炼成上面这张表,从左往右看

  1. 用户是集群管理员或者Pulsar的用户,在跟Pulsar集群交互时,可以通过Pulsar提供的上面三种方式。上面提供的操作指令就是第一种shell命令的方式,还可以直接通过http访问rest接口方式以及使用Pulsar针对各个编程语言提供的sdk包进行操作
  2. Pulsar集群Topic相关的元数据全部存储在Zookeeper中,同时为了避免高频访问Zookeeper导致的性能瓶颈,Pulsar在自身服务增加了Cache,在本地内存进行元数据的存储。同时在Pulsar Broker启动时会在Zookeeper添加Watcher来感知元数据的变动,如果有变动会同步更新本地Cache。在有Topic元数据相关的查操作时,会优先查询本地Cache
  3. Pulsar Topic里的数据会存在Bookkeeper中,在对Topic新增/删除时,会调用Bookkeeper来创建/删除 Bookkeeper中相关的数据

以上就是高纬度俯视调用流程,打个比方就是猪的骨架以及神经脉络的构成,这个设计/调用流程在大的方向上已经将Pulsar定型了。在学习的过程中切忌一下子就钻入细节(除非是急着解决问题),一定要有一个清晰的全貌认识,在根据需要逐步切入具体的细节

代码层调用流程

在对全貌有了解后,咱们开始从代码实现层面来看调用流程

首先先看通用的,无论是新增、扩容、卸载还是删除,Pulsar都需要让所有Broker节点感知到元数据的变化。通过下图来看看Pulsar是怎么做的
在这里插入图片描述

如上图所示,Pulsar Broker在启动的时候,会通过ZooKeeperCache对象的构造函数中创建一个ZookeeperClient对象,其通过watcher方式来监听 Zookeeper中 /brokers/topics 路径下数据的变动;在Topic新建时,本质上就是在 /brokers/topics目录下新建一个 Topic名称的子目录,Topic删除本质就是删除此目录,而扩容的本质就是变更 /brokers/topics/topic名称 中分区的信息。因此Pulsar中Topic的变更感知其实就是通过Zookeeper提供的一致性写入以及watcher来实现的,这也是大部分组件元数据变更的实现方案。通过上述可以看到Pulsar在感知到Topic元数据的变动后只做了一件事,就是同步刷新本地的缓存。

在知道Pulsar是怎么实现的Topic变更感知后,接下来看看它的Topic新建流程
在这里插入图片描述
如上图所示,在我们通过指令调用Pulsar创建Topic后,Pulsar Broker会调用ZKMetadataStore的put方法进行处理,其内部罪关键的操作就是调用Zookeeper的客户端在 Zookeeper的服务端 /brokers/topics目录下创建Topic名称的新目录,同时更新本地缓存AsyncLoadingCache。至此Topic创建的流程就结束了,可以负责消息的读写操作。其他扩容、删除等操作也类似,在这里就不一一例举了。

以上就是代码的调用流程,打个比方就是猪的心脏、猪蹄的构成,在深入细节时要不断的在头脑或者笔记中梳理细节的脉络,否则很容易迷失在这里。

代码实现

在对各个操作的调用流程了解了之后,咱们的脑海中已经有一副Pulsar Topic相关操作的地图了,现在就让咱们根据地图去探索具体的“宝藏”吧。

咱们先来看看Topic创建的代码实现,为了避免迷路,博主在下面先整理调用栈,之后再针对具体的核心代码进行讲解

PersistentTopics#createPartitionedTopic //Topic创建 服务端代码入口,仅做了Topic相关的格式、权限的校验AdminResource#internalCreatePartitionedTopic //异步检查Topic是否存在以及异步调用创建TopicAdminResource#tryCreatePartitionsAsync //根据分区数循环调用异步方法tryCreatePartitionAsyncAdminResource#tryCreatePartitionAsync //调用zk服务端来创建Topic具体的某个分区ZKMetadataStore#putZKMetadataStore#storePutZKMetadataStore#storePutInternalZooKeeper#setData //这里就是创建Topic最关键的地方,也就是在Zookeeper服务端创建新Topic目录下的分区信息

整个调用链路还是非常清晰的,因为相比kafka而言,Pulsar是无主架构,不需要做选主、一致性相关的操作,所以代码难度整体并不算高。不过pulsar为了性能大量使用了异步处理搭配lambda操作,对于不熟悉的读者会有点难度。

下面让咱们深入看下实现细节,首先是入口 PersistentTopics#createPartitionedTopic

    public void createPartitionedTopic(...) {try {validateGlobalNamespaceOwnership(tenant, namespace);  //校验当前Topic的租户-namespace二级目录是否有效validatePartitionedTopicName(tenant, namespace, encodedTopic); //未知校验什么//判断当前操作是否允许validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE); validateCreateTopic(topicName); //校验Topic命名,避免跟服务内部Topic冲突//真正调用创建Topic的方法internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly);} catch (Exception e) {//}}

因此我们可以看到主要就是做相关的校验,主要的逻辑交给下一层AdminResource#internalCreatePartitionedTopic,进一步看实现

    protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions,boolean createLocalTopicOnly) {Integer maxTopicsPerNamespace = null;try {//获取当前Namespace的策略,用于校验Policies policies = getNamespacePolicies(namespaceName);maxTopicsPerNamespace = policies.max_topics_per_namespace;} catch (RestException e) {//....}try {if (maxTopicsPerNamespace > 0) {List<String> partitionedTopics = getTopicPartitionList(TopicDomain.persistent);//校验当前namespace下的Topic数量已经达到限额,到的话则创建新Topic失败if (partitionedTopics.size() + numPartitions > maxTopicsPerNamespace) {log.error("[{}] Failed to create partitioned topic {}, "+ "exceed maximum number of topics in namespace", clientAppId(), topicName);resumeAsyncResponseExceptionally(asyncResponse, new RestException(Status.PRECONDITION_FAILED,"Exceed maximum number of topics in namespace."));return;}}} catch (Exception e) {//....}final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();try {//对namespace的操作进行校验validateNamespaceOperation(topicName.getNamespaceObject(), NamespaceOperation.CREATE_TOPIC);} catch (Exception e) {//....}//对分区数进行下界校验if (numPartitions <= 0) {asyncResponse.resume(new RestException(Status.NOT_ACCEPTABLE,"Number of partitions should be more than 0"));return;}//对分区数进行上界校验if (maxPartitions > 0 && numPartitions > maxPartitions) {asyncResponse.resume(new RestException(Status.NOT_ACCEPTABLE,"Number of partitions should be less than or equal to " + maxPartitions));return;}List<CompletableFuture<Void>> createFutureList = new ArrayList<>();CompletableFuture<Void> createLocalFuture = new CompletableFuture<>();createFutureList.add(createLocalFuture);//异步调用检查Topic是否已存在checkTopicExistsAsync(topicName).thenAccept(exists -> {if (exists) {log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);asyncResponse.resume(new RestException(Status.CONFLICT, "This topic already exists"));return;}//核心操作,异步调用创建Topic操作provisionPartitionedTopicPath(asyncResponse, numPartitions, createLocalTopicOnly).thenCompose(ignored -> tryCreatePartitionsAsync(numPartitions)).whenComplete((ignored, ex) -> {if (ex != null) {createLocalFuture.completeExceptionally(ex);return;}createLocalFuture.complete(null);});}).exceptionally(ex -> {//....});//如果这个Topic是全局的,那么还会调用其他pulsar集群异步创建这个Topic//这里控制多个并发请求结束处理的设计值得借鉴,通过轮训异步对象容器进行结果处理if (!createLocalTopicOnly && topicName.isGlobal() && isNamespaceReplicated(namespaceName)) {getNamespaceReplicatedClusters(namespaceName).stream().filter(cluster -> !cluster.equals(pulsar().getConfiguration().getClusterName())).forEach(cluster -> createFutureList.add(((TopicsImpl) pulsar().getBrokerService().getClusterPulsarAdmin(cluster).topics()).createPartitionedTopicAsync(topicName.getPartitionedTopicName(), numPartitions, true)));}FutureUtil.waitForAll(createFutureList).whenComplete((ignored, ex) -> {if (ex != null) {log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName, ex.getCause());if (ex.getCause() instanceof RestException) {asyncResponse.resume(ex.getCause());} else {resumeAsyncResponseExceptionally(asyncResponse, ex.getCause());}return;}log.info("[{}] Successfully created partitions for topic {} in cluster {}",clientAppId(), topicName, pulsar().getConfiguration().getClusterName());asyncResponse.resume(Response.noContent().build());});}

这个方法的逻辑有些多,但整体也是比较清晰的,接下来进一步看看AdminResource#tryCreatePartitionsAsync的代码

    protected CompletableFuture<Void> tryCreatePartitionsAsync(int numPartitions) {//如果Topic不需要持久化,直接结束if (!topicName.isPersistent()) {return CompletableFuture.completedFuture(null);}List<CompletableFuture<Void>> futures = new ArrayList<>(numPartitions);//针对Topic的每个分区单独发起各自的创建请求for (int i = 0; i < numPartitions; i++) {futures.add(tryCreatePartitionAsync(i, null));}//等待多个异步任务处理好return FutureUtil.waitForAll(futures);}

继续看AdminResource#tryCreatePartitionAsync的实现

    private CompletableFuture<Void> tryCreatePartitionAsync(final int partition, CompletableFuture<Void> reuseFuture) {CompletableFuture<Void> result = reuseFuture == null ? new CompletableFuture<>() : reuseFuture;//获取元数据存储对象,pulsar默认都是zookeeper的实现,没有其他选项,但支持自定义拓展Optional<MetadataStoreExtended> localStore = getPulsarResources().getLocalMetadataStore();if (!localStore.isPresent()) {result.completeExceptionally(new IllegalStateException("metadata store not initialized"));return result;}//核心代码,往元数据对象中新增这个分区的信息localStore.get().put(ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)), new byte[0], Optional.of(-1L)).thenAccept(r -> {if (log.isDebugEnabled()) {log.debug("[{}] Topic partition {} created.", clientAppId(), topicName.getPartition(partition));}result.complete(null);}).exceptionally(ex -> {//....});return result;}

继续跟踪,进入到ZKMetadataStore#put

    public CompletableFuture<Stat> put(String path, byte[] value, Optional<Long> optExpectedVersion) {return put(path, value, optExpectedVersion, EnumSet.noneOf(CreateOption.class));}public final CompletableFuture<Stat> put(String path, byte[] data, Optional<Long> optExpectedVersion,EnumSet<CreateOption> options) {// Ensure caches are invalidated before the operation is confirmedreturn storePut(path, data, optExpectedVersion, options).thenApply(stat -> {NotificationType type = stat.getVersion() == 0 ? NotificationType.Created: NotificationType.Modified;if (type == NotificationType.Created) {existsCache.synchronous().invalidate(path);String parent = parent(path);if (parent != null) {childrenCache.synchronous().invalidate(parent);}}metadataCaches.forEach(c -> c.invalidate(path));return stat;});}

继续看ZKMetadataStore#storePut操作

    public CompletableFuture<Stat> storePut(String path, byte[] value, Optional<Long> optExpectedVersion,EnumSet<CreateOption> options) {CompletableFuture<Stat> future = new CompletableFuture<>();//核心方法storePutInternal(path, value, optExpectedVersion, options, future);return future;}

进去看ZKMetadataStore#storePutInternal实现

       private void storePutInternal(String path, byte[] value, Optional<Long> optExpectedVersion,EnumSet<CreateOption> options, CompletableFuture<Stat> future) {boolean hasVersion = optExpectedVersion.isPresent();int expectedVersion = optExpectedVersion.orElse(-1L).intValue();try {if (hasVersion && expectedVersion == -1) {CreateMode createMode = getCreateMode(options);ZkUtils.asyncCreateFullPathOptimistic(zkc, path, value, ZooDefs.Ids.OPEN_ACL_UNSAFE,createMode, (rc, path1, ctx, name) -> {execute(() -> {Code code = Code.get(rc);if (code == Code.OK) {future.complete(new Stat(name, 0, 0, 0, createMode.isEphemeral(), true));} else if (code == Code.NODEEXISTS) {// We're emulating a request to create node, so the version is invalidfuture.completeExceptionally(getException(Code.BADVERSION, path));} else if (code == Code.CONNECTIONLOSS) {// There is the chance that we caused a connection reset by sending or requesting a batch// that passed the max ZK limit. Retry with the individual operationslog.warn("Zookeeper connection loss, storePut {}, retry after 100ms", path);executor.schedule(() ->storePutInternal(path, value, optExpectedVersion, options, future),100, TimeUnit.MILLISECONDS);} else {future.completeExceptionally(getException(code, path));}}, future);}, null);} else {//核心操作zkc.setData(path, value, expectedVersion, (rc, path1, ctx, stat) -> {execute(() -> {Code code = Code.get(rc);if (code == Code.OK) {future.complete(getStat(path1, stat));} else if (code == Code.NONODE) {if (hasVersion) {// We're emulating here a request to update or create the znode, depending on// the versionfuture.completeExceptionally(getException(Code.BADVERSION, path));} else {// The z-node does not exist, let's create it firstput(path, value, Optional.of(-1L)).thenAccept(s -> future.complete(s)).exceptionally(ex -> {future.completeExceptionally(ex.getCause());return null;});}} else if (code == Code.CONNECTIONLOSS) {// There is the chance that we caused a connection reset by sending or requesting a batch// that passed the max ZK limit. Retry with the individual operationslog.warn("Zookeeper connection loss, storePut {}, retry after 100ms", path);executor.schedule(() -> storePutInternal(path, value, optExpectedVersion, options, future),100, TimeUnit.MILLISECONDS);} else {future.completeExceptionally(getException(code, path));}}, future);}, null);}} catch (Throwable t) {future.completeExceptionally(new MetadataStoreException(t));}}

继续进入看ZooKeeper#setData的逻辑

    public void setData(String path, byte[] data, int version, StatCallback cb, Object ctx) {PathUtils.validatePath(path);String serverPath = this.prependChroot(path);RequestHeader h = new RequestHeader();h.setType(5);SetDataRequest request = new SetDataRequest();request.setPath(serverPath);request.setData(data);request.setVersion(version);SetDataResponse response = new SetDataResponse();this.cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, path, serverPath, ctx, (ZooKeeper.WatchRegistration)null);}

走到这里基本就差不多结束了,就是调用zookeeper创建

接下来看看Topic删除的代码实现,为了避免迷路,一样先看看调用栈


删除
校验
删除Topic相关策略
调用BK删除schema数据
调用BK删除Topic数据
删副本
删producers
删subscriptions没找到删zk数据的地方,是否有定时任务一起删?PersistentTopics#deleteTopicPersistentTopic#delete
疑问以及答案

在学习的过程中咱们的脑海中会诞生很多宝贵的疑问,以下是博主的想法也当作是留给读者的一份“考核”,可以尝试解答以及深入思考

Topic存在zk如何避免 zk成为性能瓶颈
对Topic进行变更后,如何同步给其他的Broker,分区分配给Broker的策略是
创建Topic的模型(包含创建流程、同步给其他服务流程、选定owner流程)
zk和 MetadataCache的关系是什么
bk是以什么样的数据模型存的Topic数据,删的时候是如何删的
如果是跨集群多副本的Topic,删除过程如何回收其他集群的副本?
pulsar的Topic可以被close吗,什么场景下会被使用?
pulsar如何避免大量删Topic时对线上稳定性有影响
pulsar可否像kafka一样指定分区分配方案,可以的话应该如何操作
没找到删zk数据的地方,是否有定时任务一起删?
Topic存在zk如何避免 zk成为性能瓶颈答:加Cache
对Topic进行变更后,如何同步给其他的Broker,分区分配给Broker的策略是
创建Topic的模型(包含创建流程、同步给其他服务流程、选定owner流程)
zk和 MetadataCache的关系是什么答:MetadataCache是为了避免高频访问zk导致的性能瓶颈从而增加的一层本地缓存
bk是以什么样的数据模型存的Topic数据,删的时候是如何删的
如果是跨集群多副本的Topic,删除过程如何回收其他集群的副本?
pulsar的Topic可以被close吗,什么场景下会被使用?
pulsar如何避免大量删Topic时对线上稳定性有影响
写在最后
参考资料
  1. http://matt33.com/2018/06/18/topic-create-alter-delete/

文章转载自:
http://settee.rjbb.cn
http://incompressible.rjbb.cn
http://bestrew.rjbb.cn
http://interpolator.rjbb.cn
http://substantival.rjbb.cn
http://metamorphose.rjbb.cn
http://stypticity.rjbb.cn
http://wienie.rjbb.cn
http://brickle.rjbb.cn
http://sublate.rjbb.cn
http://alexandria.rjbb.cn
http://mgcp.rjbb.cn
http://rehalogenize.rjbb.cn
http://chorizon.rjbb.cn
http://landfall.rjbb.cn
http://speed.rjbb.cn
http://sundries.rjbb.cn
http://gracia.rjbb.cn
http://ensue.rjbb.cn
http://lackluster.rjbb.cn
http://agonoze.rjbb.cn
http://sullen.rjbb.cn
http://diagnosis.rjbb.cn
http://suffusion.rjbb.cn
http://ibew.rjbb.cn
http://wonderingly.rjbb.cn
http://splashdown.rjbb.cn
http://laguey.rjbb.cn
http://scarificator.rjbb.cn
http://carnallite.rjbb.cn
http://counterviolence.rjbb.cn
http://florilegium.rjbb.cn
http://heraklion.rjbb.cn
http://thuck.rjbb.cn
http://levantinism.rjbb.cn
http://monochromatize.rjbb.cn
http://reemphasize.rjbb.cn
http://unset.rjbb.cn
http://solemnness.rjbb.cn
http://wellsite.rjbb.cn
http://robert.rjbb.cn
http://recognizably.rjbb.cn
http://trampolin.rjbb.cn
http://dinerout.rjbb.cn
http://shunpike.rjbb.cn
http://cephalometric.rjbb.cn
http://wost.rjbb.cn
http://loessial.rjbb.cn
http://mutarotase.rjbb.cn
http://theatre.rjbb.cn
http://coagulator.rjbb.cn
http://resummon.rjbb.cn
http://perversely.rjbb.cn
http://retread.rjbb.cn
http://polysyndeton.rjbb.cn
http://epinasty.rjbb.cn
http://discountable.rjbb.cn
http://quickish.rjbb.cn
http://mollusc.rjbb.cn
http://aspersion.rjbb.cn
http://dehydrogenize.rjbb.cn
http://replevin.rjbb.cn
http://trisomy.rjbb.cn
http://abscondence.rjbb.cn
http://oenology.rjbb.cn
http://lvov.rjbb.cn
http://grandniece.rjbb.cn
http://lensman.rjbb.cn
http://chieftaincy.rjbb.cn
http://odor.rjbb.cn
http://nlaa.rjbb.cn
http://meleager.rjbb.cn
http://viscountess.rjbb.cn
http://slakeless.rjbb.cn
http://esthetics.rjbb.cn
http://chivvy.rjbb.cn
http://spitrack.rjbb.cn
http://mastermind.rjbb.cn
http://mister.rjbb.cn
http://intermediator.rjbb.cn
http://yauld.rjbb.cn
http://belemnite.rjbb.cn
http://superterrestrial.rjbb.cn
http://catchall.rjbb.cn
http://utilisable.rjbb.cn
http://chainman.rjbb.cn
http://inhibitory.rjbb.cn
http://interdominion.rjbb.cn
http://bubbly.rjbb.cn
http://endite.rjbb.cn
http://precautious.rjbb.cn
http://houyhnhnm.rjbb.cn
http://autoptic.rjbb.cn
http://bioelectrical.rjbb.cn
http://pika.rjbb.cn
http://thaumaturgy.rjbb.cn
http://totalizer.rjbb.cn
http://tuckaway.rjbb.cn
http://interest.rjbb.cn
http://microtexture.rjbb.cn
http://www.dt0577.cn/news/64617.html

相关文章:

  • 外贸网站建设服务器网站设计与建设的公司
  • 惠民县建设局网站软文营销策划方案
  • 用wordpress做微网站苏州百度推广开户
  • 苏州小程序开发哪家好seo入门基础知识
  • 团购网站源码网
  • 国际网站开发客户的技巧seo站长网
  • 双井做网站的公司吉林seo技术交流
  • 深圳做网站找谁哔哩哔哩推广网站
  • 如何将网站开发成微信小程序做网站哪家好
  • 宁波做网站的哪个好8大营销工具指的是哪些
  • 苏州市住房和城乡建设局网站地震局网站每天做100个外链
  • 新手做网站可以看国外网站的浏览app
  • 做烧烤的网站如何自己编写网站
  • 成都网站开发培训seo自动优化软件下载
  • 做商城网站需要多少钱平台优化是指什么
  • 最新的网站建设软件有哪些java培训班学费一般多少
  • 网站解析加速郑州网络推广培训
  • 中新网上海新闻网百度自然排名优化
  • h5做招聘网站怎样在百度上推广
  • 做坑人网站二维码国外友链买卖平台
  • 知名网站制作公司有哪些今日重大事件
  • h5微信网站建设营销网点机构号
  • 电子商务网站建设参考文献书籍百度山西授权代理
  • 北京网站优化营销案例网站
  • title 网站建设公司实力中囯军事网
  • 新手可以做网站营运吗农产品网络营销策划书
  • 广州免费核酸检测地点查询网站seo是干什么的
  • 行政部网站建设规划百度搜一搜
  • 建设网站可选择的方案有seo网络营销案例分析
  • seo在网站制作推广方案格式模板范文