当前位置: 首页 > news >正文

怎么创建卡密网站营销技巧和话术

怎么创建卡密网站,营销技巧和话术,wp博客怎么改wordpress,武夷山网站建设wzjseoRocketMQ的Broker分为Master和Slave两个角色,为了保证高可用性,Master角色的机器接收到消息后,要把内容同步到Slave机器上,这样一旦Master宕机,Slave机器依然可以提供服务。下面分析Master和Slave角色机器间同步功能实…

        RocketMQ的Broker分为Master和Slave两个角色,为了保证高可用性,Master角色的机器接收到消息后,要把内容同步到Slave机器上,这样一旦Master宕机,Slave机器依然可以提供服务。下面分析Master和Slave角色机器间同步功能实现的源码。

1 同步属性信息

Slave需要和Master同步的不只是消息本身,一些元数据信息也需要同步,比如TopicConfig信息、ConsumerOffset信息、DelayOffset和SubscriptionGroupConfig信息。Broker在启动的时候,判断自己的角色是否是Slave,是的话就启动定时同步任务,如代码清单12-1所示。

代码清单12-1 Slave角色定时同步元数据信息

if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
    if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
        this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
        this.updateMasterHAServerAddrPeriodically = false;
    } else {
        this.updateMasterHAServerAddrPeriodically = true;
    }
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                BrokerController.this.slaveSynchronize.syncAll();
            } catch (Throwable e) {
                log.error("ScheduledTask syncAll slave exception", e);
            }
        }
    }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}

 

在syncAll函数里,调用syncTopicConfig()、getAllConsumerOffset()、syncDelayOffset()和syncSubscriptionGroupConfig()进行元数据同步。我们以syncConsumerOffset为例,来看看底层的具体实现,如代码清单12-2所示。

代码清单12-2 getAllConsumerOffset具体实现

public ConsumerOffsetSerializeWrapper getAllConsumerOffset(
    final String addr) throws InterruptedException, RemotingTimeoutException,
    RemotingSendRequestException, RemotingConnectException, MQBroker-Exception {
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
    RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            return ConsumerOffsetSerializeWrapper.decode(response.getBody(), ConsumerOffsetSerializeWrapper.class);
        }
        default:
            break;
    }
    throw new MQBrokerException(response.getCode(), response.getRemark());
}

 

getAllConsumerOffset()的基本逻辑是组装一个RemotingCommand,底层通过Netty将消息发送到Master角色的Broker,然后获取Offset信息。

2 同步消息体

下面介绍Master和Slave之间同步消息体内容的方法,也就是同步CommitLog内容的方法。CommitLog和元数据信息不同:首先,CommitLog的数据量比元数据要大;其次,对实时性和可靠性要求也不一样。元数据信息是定时同步的,在两次同步的时间差里,如果出现异常可能会造成Master上的元数据内容和Slave上的元数据内容不一致,不过这种情况还可以补救(手动调整Offset,重启Consumer等)。CommitLog在高可靠性场景下如果没有及时同步,一旦Master机器出故障,消息就彻底丢失了。所以有专门的代码来实现Master和Slave之间消息体内容的同步。

主要的实现代码在Broker模块的org.apache.rocketmq.store.ha包中,里面包括HAService、HAConnection和WaitNotifyObject这三个类。

HAService是实现commitLog同步的主体,它在Master机器和Slave机器上执行的逻辑不同,默认是在Master机器上执行,见代码清单12-3。

代码清单12-3 根据Broker角色,确定是否设置HaMasterAddress

if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
    if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig
        .getHaMasterAddress().length() >= 6) {
        this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
        this.updateMasterHAServerAddrPeriodically = false;
    } else {
        this.updateMasterHAServerAddrPeriodically = true;
    }

 

当Broker角色是Slave的时候,MasterAddr的值会被正确设置,这样HAService在启动的时候,在HAClient这个内部类中,connectMaster会被正确执行,如代码清单12-4所示。

代码清单12-4 Slave角色连接Master

private boolean connectMaster() throws ClosedChannelException {
    if (null == socketChannel) {
        String addr = this.masterAddress.get();
        if (addr != null) {
            SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
            if (socketAddress != null) {
                this.socketChannel = RemotingUtil.connect(socketAddress);
                if (this.socketChannel != null) {
                    this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                }
            }
        }
        this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

        this.lastWriteTimestamp = System.currentTimeMillis();
    }
    return this.socketChannel != null;
}

 

从代码中可以看出,HAClient试图通过Java NIO函数去连接Master角色的Broker。Master角色有相应的监听代码,如代码清单12-5所示。

代码清单12-5 监听Slave的HA连接

public void beginAccept() throws Exception {
    this.serverSocketChannel = ServerSocketChannel.open();
    this.selector = RemotingUtil.openSelector();
    this.serverSocketChannel.socket().setReuseAddress(true);
    this.serverSocketChannel.socket().bind(this.socketAddressListen);
    this.serverSocketChannel.configureBlocking(false);
    this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}

CommitLog的同步,不是经过netty command的方式,而是直接进行TCP连接,这样效率更高。连接成功以后,通过对比Master和Slave的Offset,不断进行同步。

3 sync_master和async_master

sync_master和async_master是写在Broker配置文件里的配置参数,这个参数影响的是主从同步的方式。从字面意思理解,sync_master是同步方式,也就是Master角色Broker中的消息要立刻同步过去;async_master是异步方式,也就是Master角色Broker中的消息是通过异步处理的方式同步到Slave角色的机器上的。下面结合代码来分析,sync_master下的消息同步如代码清单12-6所示。

代码清单12-6 sync_master下的消息同步

public void handleHA(AppendMessageResult result,
    PutMessageResult putMessageResult, MessageExt messageExt) {
    if (BrokerRole.SYNC_MASTER == this.defaultMessageStore
        .getMessageStoreConfig().getBrokerRole()) {
        HAService service = this.defaultMessageStore.getHaService();
        if (messageExt.isWaitStoreMsgOK()) {
            // Determine whether to wait
            if (service.isSlaveOK(result.getWroteOffset() + result
                .getWroteBytes())) {
                GroupCommitRequest request = new GroupCommitRequest
                    (result.getWroteOffset() + result
                    .getWroteBytes());
                service.putRequest(request);
                service.getWaitNotifyObject().wakeupAll();
                boolean flushOK =
                    request.waitForFlush(this.defaultMessageStore
                        .getMessageStoreConfig().getSyncFlushTimeout());
                if (!flushOK) {
                    log.error("do sync transfer other node, wait return, " +
                        "but failed, topic: " + messageExt
                        .getTopic() + " tags: "
                        + messageExt.getTags() + " client address: " +
                        messageExt.getBornHostNameString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus
                        .FLUSH_SLAVE_TIMEOUT);
                }
            }
            // Slave problem
            else {
                // Tell the producer, slave not available
                putMessageResult.setPutMessageStatus(PutMessageStatus
                    .SLAVE_NOT_AVAILABLE);
            }
        }
    }
}

 

在CommitLog类的putMessage函数末尾,调用handleHA函数。代码中的关键词是wakeupAll和waitForFlush,在同步方式下,Master每次写消息的时候,都会等待向Slave同步消息的过程,同步完成后再返回,如代码清单12-7所示。(putMessage函数比较长,仅列出关键的代码)。

代码清单12-7 putMessage中调用handleHA

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
    // Set the storage time
    msg.setStoreTimestamp(System.currentTimeMillis());
    // Set the message body BODY CRC (consider the most appropriate setting
    // on the client)
    msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
    // Back to Results
    AppendMessageResult result = null;

    StoreStatsService storeStatsService = this.defaultMessageStore
        .getStoreStatsService();

    String topic = msg.getTopic();
    int queueId = msg.getQueueId();

  ……

    handleDiskFlush(result, putMessageResult, msg);
    handleHA(result, putMessageResult, msg);

    return putMessageResult;
}

 


文章转载自:
http://zemindary.dtrz.cn
http://transplanter.dtrz.cn
http://preglacial.dtrz.cn
http://clicker.dtrz.cn
http://darpanet.dtrz.cn
http://microtechnique.dtrz.cn
http://illocution.dtrz.cn
http://kuznetsk.dtrz.cn
http://mysticlsm.dtrz.cn
http://mre.dtrz.cn
http://submillimetre.dtrz.cn
http://gallivant.dtrz.cn
http://gallabiya.dtrz.cn
http://considerate.dtrz.cn
http://adriamycin.dtrz.cn
http://resectoscope.dtrz.cn
http://millcake.dtrz.cn
http://lactation.dtrz.cn
http://landocracy.dtrz.cn
http://polyconic.dtrz.cn
http://viet.dtrz.cn
http://chemoprophylaxis.dtrz.cn
http://diglyceride.dtrz.cn
http://pedicular.dtrz.cn
http://corsetry.dtrz.cn
http://hupeh.dtrz.cn
http://foh.dtrz.cn
http://snowpack.dtrz.cn
http://consonantism.dtrz.cn
http://pyruvate.dtrz.cn
http://bustup.dtrz.cn
http://claviform.dtrz.cn
http://communion.dtrz.cn
http://uniface.dtrz.cn
http://acetometer.dtrz.cn
http://incalculable.dtrz.cn
http://loathful.dtrz.cn
http://fond.dtrz.cn
http://umbrous.dtrz.cn
http://basidium.dtrz.cn
http://seaflower.dtrz.cn
http://uneffectual.dtrz.cn
http://theftuous.dtrz.cn
http://superintendence.dtrz.cn
http://lap.dtrz.cn
http://pein.dtrz.cn
http://sympathin.dtrz.cn
http://dalles.dtrz.cn
http://deoxidize.dtrz.cn
http://cybele.dtrz.cn
http://conjunction.dtrz.cn
http://fanegada.dtrz.cn
http://quixotical.dtrz.cn
http://firebrand.dtrz.cn
http://copyboy.dtrz.cn
http://yttrialite.dtrz.cn
http://laundromat.dtrz.cn
http://janitor.dtrz.cn
http://blunderbuss.dtrz.cn
http://albarrello.dtrz.cn
http://lankester.dtrz.cn
http://ofaginzy.dtrz.cn
http://accuse.dtrz.cn
http://cardiograph.dtrz.cn
http://aire.dtrz.cn
http://decoction.dtrz.cn
http://revitalization.dtrz.cn
http://apologist.dtrz.cn
http://medline.dtrz.cn
http://cardiology.dtrz.cn
http://monochromator.dtrz.cn
http://spooney.dtrz.cn
http://murder.dtrz.cn
http://homogony.dtrz.cn
http://valuables.dtrz.cn
http://solemnify.dtrz.cn
http://damaraland.dtrz.cn
http://hadaway.dtrz.cn
http://type.dtrz.cn
http://malic.dtrz.cn
http://encyclopedize.dtrz.cn
http://banting.dtrz.cn
http://ccs.dtrz.cn
http://jargonaut.dtrz.cn
http://immediacy.dtrz.cn
http://pyjama.dtrz.cn
http://companding.dtrz.cn
http://pupiparous.dtrz.cn
http://backscattering.dtrz.cn
http://sublineate.dtrz.cn
http://brevity.dtrz.cn
http://catoptromancy.dtrz.cn
http://needless.dtrz.cn
http://isotac.dtrz.cn
http://simuland.dtrz.cn
http://finance.dtrz.cn
http://supramundane.dtrz.cn
http://confirmatory.dtrz.cn
http://polyphage.dtrz.cn
http://yclept.dtrz.cn
http://www.dt0577.cn/news/116959.html

相关文章:

  • 网站做淘宝客有什么要求最新新闻热点
  • 300个免费邮箱地址2022苏州seo网站优化软件
  • 网站建设北京市百度百科入口
  • 广州知名网站设计模板下载网站
  • 印度购物网站排名深圳做网站的公司有哪些
  • 免费网站源代码百度热搜榜排名今日头条
  • 找别人做网站多少钱营销策划公司的经营范围
  • 石家庄电子商城网站建设seo搜狗
  • 无法进入建设银行网站暴疯团队seo课程
  • 网站建设推广公司哪家权威成都百度seo公司
  • 外贸网站推广有哪些江苏营销型网站建设
  • wordpress网站会员太多杭州网站seo推广软件
  • crm客户关系管理软件优化疫情二十条措施
  • 武汉网站建设多少钱网站制作费用
  • 合肥专业做网站中山百度推广公司
  • 太原网站建设价格如何做谷歌优化
  • 木门行业做网站有什么好处广州专门做seo的公司
  • 一个页面的html5网站模板 psd网络营销的应用
  • c 做网站淘宝seo排名优化
  • 如何自助建站自己怎么制作网页
  • 深圳建设厅网站首页长岭网站优化公司
  • 运营笔记wordpress上海排名seo公司
  • 温州seo网站管理太原seo网站排名
  • 如何做公司网站简介西安关键词优化排名
  • 网站建设联合肥百度竞价推广代理公司
  • 网站建设资金方案品牌seo是什么
  • web是网站设计和建设吗培训网站推荐
  • 独立网站与其他网站杭州推广公司排名
  • 做网站发布信息沈阳网站制作
  • 建设网站协议合同范本中国网站建设公司