当前位置: 首页 > news >正文

租赁空间网站建设北京网站sem、seo

租赁空间网站建设,北京网站sem、seo,思途智旅游网站开发,育儿网网站开发目录 一、主从同步工作原理 1. 主从配置 2. 启动HA 二、主从同步实现机制 1. 从Broker发送连接事件 2. 主Broker接收连接事件 3. 从Broker反馈复制进度 4. ReadSocketService线程读取从Broker复制进度 5. WriteSocketService传输同步消息 6. GroupTransferService线程…

目录

一、主从同步工作原理

1. 主从配置

2. 启动HA

二、主从同步实现机制

1. 从Broker发送连接事件

2. 主Broker接收连接事件

3. 从Broker反馈复制进度

4. ReadSocketService线程读取从Broker复制进度

5. WriteSocketService传输同步消息

6. GroupTransferService线程通知HA结果

        1):待需要HA的消息集合

        2):通知消息发送者线程

三、读写分离机制

四、参考资料


一、主从同步工作原理

        为了提高消息消费的高可用性,避免Broker发生单点故障引起存储在Broker上的消息无法及时消费, RocketMQ引入Broker主备机制,即:消息消费到达主服务器后需要将消息同步到消息从服务器,如果主服务器Broker宕机后,消息消费者可以从从服务器拉取消息。

        下图所示是Broker的HA交互机制流程图及类图。主从同步模式分为:同步、异步。

  • step1:主服务器启动,并在特定端口上监听从服务器的连接;
  • step2:从服务器主动连接主服务器,主服务器接收客户端的连接,并建立相关TCP连接;
  • step3:从服务器主动向主服务器发送待拉取消息偏移量,主服务器解析请求并返回消息给从服务器;
  • step4:从服务器保存消息并继续发送新的消息同步请求。

1. 主从配置

        参考rocketmq-distribution项目的conf目录下有:2主2从异步HA配置(2m-2s-async)、2主2从同步HA配置(2m-2s-sync)。以下1主1从异步HA配置实例如下。

        主Broker配置:

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 启动SQL过滤
enablePropertyFilter = truenamesrvAddr=192.168.1.55:9876;172.17.0.3:9876
brokerIP1=192.168.1.55

        从Broker配置:

        注意:brokerName与主机相同;brokerId > 0时,则为从,0时则为主;brokerRole角色为SLAVE(从),刷盘类型为ASYNC_FLUSH(异步刷盘)。

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 1
deleteWhen = 04
fileReservedTime = 48
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
# 启动SQL过滤
enablePropertyFilter = truenamesrvAddr=192.168.1.55:9876;172.17.0.3:9876

2. 启动HA

        org.apache.rocketmq.store.DefaultMessageStore#start是Broker启动方法,如下图所示是其调用链及相关HA部分代码。 

/*** broker启动时,消息存储线程* BrokerController#startBasicService()* @throws Exception*/
@Override
public void start() throws Exception {// 是否HA主从复制if (!messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {this.haService.init(this);}......if (this.haService != null) {this.haService.start();}......
}

        org.apache.rocketmq.store.ha.DefaultHAService#init是HAService初始化方法,如下代码所示。注意,从Broker的broker.conf配置的brokerRole为SLAVE,才能创建HAClient(从Broker注册到主Broker)

@Override
public void init(final DefaultMessageStore defaultMessageStore) throws IOException {this.defaultMessageStore = defaultMessageStore;this.acceptSocketService = new DefaultAcceptSocketService(defaultMessageStore.getMessageStoreConfig());this.groupTransferService = new GroupTransferService(this, defaultMessageStore);if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {this.haClient = new DefaultHAClient(this.defaultMessageStore);}this.haConnectionStateNotificationService = new HAConnectionStateNotificationService(this, defaultMessageStore);
}

        org.apache.rocketmq.store.ha.DefaultHAService#start是HAService启动方法。注意:

  • org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService:主Broker接收从Broker的连接事件;
  • org.apache.rocketmq.store.ha.GroupTransferService:负责主Broker向从Broker发送同步数据;
  • org.apache.rocketmq.store.ha.HAClient:从Broker向主Broker发送连接事件;

        Broker启动时,根据配置brokerRole配置(ASYNC_MASTER、SYNC_MASTER、SLAVE)判定Broker是主还是从。若是Slave角色,在broker配置文件中获取haMasterAddress,并更新至masterAddress;但是haMasterAddress配置为空,则启动成功,但是不会执行HA

/*** 启动HAService* step1:{@link AcceptSocketService}接收从Broker的注册事件,方法是{@link AcceptSocketService#beginAccept()}* step2:启动{@link AcceptSocketService}线程,监听从Broker发送心跳* step3:同步数据{@link GroupTransferService}线程启动,主Broker向从Broker发送数据* step4:启动从Broker{@link HAClient}发送心跳到主Broker*/
@Override
public void start() throws Exception {// 主接收从Broker的连接事件,SelectionKey.OP_ACCEPT(连接事件)this.acceptSocketService.beginAccept();// 启动主Broker线程this.acceptSocketService.start();// 主Broker同步数据线程启动this.groupTransferService.start();this.haConnectionStateNotificationService.start();// 启动从Broker{@link HAClient}发送心跳到主Brokerif (haClient != null) {this.haClient.start();}
}

二、主从同步实现机制

1. 从Broker发送连接事件

        org.apache.rocketmq.store.ha.DefaultHAClient是从Broker向主Broker的发送连接事件的核心类,是个线程。其主要属性如下代码所示。

// Socket读缓存区大小,4M
private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
// 主Broker地址
private final AtomicReference<String> masterHaAddress = new AtomicReference<>();
private final AtomicReference<String> masterAddress = new AtomicReference<>();
// 从Broker向主Broker发起HA的偏移量
private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
// 网络传输通道
private SocketChannel socketChannel;
// 事件选择器
private Selector selector;
/*** 上次读取主Broker的时间戳* last time that slave reads date from master.*/
private long lastReadTimestamp = System.currentTimeMillis();
/*** 上次写入主Broker的时间戳* last time that slave reports offset to master.*/
private long lastWriteTimestamp = System.currentTimeMillis();// 反馈HA的复制进度(从Broker的Commitlog文件的最大偏移量)
private long currentReportedOffset = 0;
// 本次处理读缓存区的指针
private int dispatchPosition = 0;
// 读缓存区
private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
// 读缓存区备份,与byteBufferRead交换
private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
private DefaultMessageStore defaultMessageStore;
// HA连接状态
private volatile HAConnectionState currentState = HAConnectionState.READY;
// 流监控
private FlowMonitor flowMonitor;

        org.apache.rocketmq.store.ha.DefaultHAClient#run是HAClient启动执行任务,其调用链和代码如下。

  • DefaultHAClient#connectMaster():从Broker连接到主Broker
  • DefaultHAClient#transferFromMaster():向主发送HA进度;处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中。  

/*** 启动HAClient* {@link DefaultHAClient#connectMaster()}:从Broker连接到主Broker* {@link DefaultHAClient#transferFromMaster()}:向主发送HA进度;处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中*/
@Override
public void run() {log.info(this.getServiceName() + " service started");this.flowMonitor.start();while (!this.isStopped()) {try {switch (this.currentState) {case SHUTDOWN:return;case READY:if (!this.connectMaster()) {log.warn("HAClient connect to master {} failed", this.masterHaAddress.get());this.waitForRunning(1000 * 5);}continue;case TRANSFER:// 向主发送HA进度;处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中if (!transferFromMaster()) {// 没有可拉取消息时,设置READY状态closeMasterAndWait();continue;}break;default:this.waitForRunning(1000 * 2);continue;}long interval = this.defaultMessageStore.now() - this.lastReadTimestamp;if (interval > this.defaultMessageStore.getMessageStoreConfig().getHaHousekeepingInterval()) {log.warn("AutoRecoverHAClient, housekeeping, found this connection[" + this.masterHaAddress+ "] expired, " + interval);this.closeMaster();log.warn("AutoRecoverHAClient, master not response some time, so close connection");}} catch (Exception e) {log.warn(this.getServiceName() + " service has exception. ", e);this.closeMasterAndWait();}}log.info(this.getServiceName() + " service end");
}

        注意,一旦HAClient线程启动后,在状态READY、TRANSFER来回变化,READY状态下:发送从Broker连接事件到主Broker,开启Socket连接;TRANSFER状态下:主从发送相关数据信息,如:从向主发送HA复制进度(currentReportedOffset,即:从Broker的Commitlog文件的最大偏移量);主向从发送同步消息

        org.apache.rocketmq.store.ha.DefaultHAClient#connectMaster是从Broker连接到主Broker的核心方法,其代码如下。

/*** 从Broker连接到主Broker* 注意:*  a. Broker启动时,若是Slave角色,从broker配置文件中获取haMasterAddress,并更新至masterAddress;*  b. 若是Slave角色,但是haMasterAddress配置为空,则启动成功,但是不会执行HA* @return true连接成功;false连接失败*/
public boolean connectMaster() throws ClosedChannelException {if (null == socketChannel) {// 获取主Broker地址String addr = this.masterHaAddress.get();if (addr != null) {// 根据地址创建SocketAddress对象SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);// 获取SocketChannelthis.socketChannel = RemotingUtil.connect(socketAddress);if (this.socketChannel != null) {// SocketChannel注册OP_READ(网络读事件)this.socketChannel.register(this.selector, SelectionKey.OP_READ);log.info("HAClient connect to master {}", addr);this.changeCurrentState(HAConnectionState.TRANSFER);}}// 获取Commitlog最大偏移量(HA同步进度)this.currentReportedOffset = this.defaultMessageStore.getMaxPhyOffset();this.lastReadTimestamp = System.currentTimeMillis();}return this.socketChannel != null;
}

2. 主Broker接收连接事件

        org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService是主Broker接收从Broker连接事件的实现类,是一个线程。其主要属性如下代码所示。

// 主Broker监听本地的Socket(本地IP + 端口号)
private final SocketAddress socketAddressListen;
// Socket通道,基于NIO
private ServerSocketChannel serverSocketChannel;
// 事件选择器,基于NIO
private Selector selector;

        org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService#beginAccept方法定义了主Broker监听从Broker的连接事件

/*** 启动监听从broker的连接* Starts listening to slave connections.** @throws Exception If fails.*/
public void beginAccept() throws Exception {this.serverSocketChannel = ServerSocketChannel.open();this.selector = RemotingUtil.openSelector();this.serverSocketChannel.socket().setReuseAddress(true); // TCP可重复使用this.serverSocketChannel.socket().bind(this.socketAddressListen); // 绑定监听端口if (0 == messageStoreConfig.getHaListenPort()) {messageStoreConfig.setHaListenPort(this.serverSocketChannel.socket().getLocalPort());log.info("OS picked up {} to listen for HA", messageStoreConfig.getHaListenPort());}this.serverSocketChannel.configureBlocking(false); // 非阻塞模式this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT); // 注册OP_ACCEPT(连接事件)
}

        org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService#run监听到从Broker连接事件的任务处理,为每个连接事件创建org.apache.rocketmq.store.ha.HAConnection对象并启动(负责M-S的数据同步逻辑)

/*** 标准的NIO连接处理方式* step1:选择器每1s处理一次连接就绪事件* step2:是否连接事件,若是,创建{@link SocketChannel}* step3:每一个连接创建{@link HAConnection}(负责M-S的数据同步逻辑)*/
@Override
public void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {// 选择器每1s处理一次连接就绪事件this.selector.select(1000);Set<SelectionKey> selected = this.selector.selectedKeys();if (selected != null) {for (SelectionKey k : selected) {// 是否连接事件if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {// 若是连接事件时,创建SocketChannelSocketChannel sc = ((ServerSocketChannel) k.channel()).accept();if (sc != null) {DefaultHAService.log.info("HAService receive new connection, "+ sc.socket().getRemoteSocketAddress());try {// 每一个连接创建HAConnection并启动(负责M-S的数据同步逻辑)HAConnection conn = createConnection(sc);conn.start();DefaultHAService.this.addConnection(conn);} catch (Exception e) {log.error("new HAConnection exception", e);sc.close();}}} else {log.warn("Unexpected ops in select " + k.readyOps());}}selected.clear();}} catch (Exception e) {log.error(this.getServiceName() + " service has exception.", e);}}log.info(this.getServiceName() + " service end");
}

        org.apache.rocketmq.store.ha.DefaultHAConnection创建并启动时,启动读、写线程服务。其关键属性如下代码所示。

  • private WriteSocketService writeSocketService:主Broker向从Broker写数据服务类
  • private ReadSocketService readSocketService:主Broker读取从Broker数据服务类
private final DefaultHAService haService;
private final SocketChannel socketChannel;
// HA客户端连接地址
private final String clientAddress;
// 主Broker向从Broker写数据服务类
private WriteSocketService writeSocketService;
// 主Broker读取从Broker数据服务类
private ReadSocketService readSocketService;
private volatile HAConnectionState currentState = HAConnectionState.TRANSFER;
// 从Broker请求拉取的偏移量
private volatile long slaveRequestOffset = -1;
// 从Broker反馈已完成的偏移量
private volatile long slaveAckOffset = -1;
private FlowMonitor flowMonitor;

3. 从Broker反馈复制进度

        org.apache.rocketmq.store.ha.DefaultHAClient#transferFromMaster是从Broker与主Broker传输数据的核心方法,代码如下所示,该方法有两大功能:

  • 从Broker向主Broker:反馈HA复制进度,即:currentReportedOffset(从Broker的Commitlog文件的最大偏移量),方法org.apache.rocketmq.store.ha.DefaultHAClient#reportSlaveMaxOffset执行。
  • 从Broker接收主Broker:HA同步消息内容,方法org.apache.rocketmq.store.ha.DefaultHAClient#processReadEvent执行。
/*** 向主反馈HA复制进度;处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中* {@link DefaultHAClient#reportSlaveMaxOffset(long)}:向主反馈HA复制进度,即:currentReportedOffset(从Broker的Commitlog文件的最大偏移量)* {@link DefaultHAClient#processReadEvent()}:处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中*/
private boolean transferFromMaster() throws IOException {boolean result;// 判断是否需要向主Broker反馈当前待拉取偏移量if (this.isTimeToReportOffset()) {log.info("Slave report current offset {}", this.currentReportedOffset);// 向主Broker反馈拉取偏移量result = this.reportSlaveMaxOffset(this.currentReportedOffset);if (!result) {return false;}}this.selector.select(1000);// 处理主Broker发送过来的消息数据result = this.processReadEvent();if (!result) {return false;}return reportSlaveMaxOffsetPlus();
}

        org.apache.rocketmq.store.ha.DefaultHAClient#reportSlaveMaxOffset向主Broker反馈HA复制进度,代码如下。

/*** 向主Broker反馈拉取偏移量* 注意:*      a. 向主Broker反馈拉取偏移量maxOffset: 对于Slave端:发送下次待拉取消息偏移量*                                        对于Master端:本次请求拉取的偏移量,也可以理解为同步ACK*      b. 手动切换ByteBuffer的写模式/读模式;*      c. 通过{@link Buffer#hasRemaining()}判断缓存内容是否完全写入SocketChannel(基于NIO模式的写范例)* @param maxOffset HA待拉取偏移量* @return ByteBuffer缓存的内容是否写完*/
private boolean reportSlaveMaxOffset(final long maxOffset) {// 偏移量写入ByteBufferthis.reportOffset.position(0); // 写缓存位置this.reportOffset.limit(8); // 写缓存字节长度this.reportOffset.putLong(maxOffset); // 偏移量写入ByteBuffer// 将ByteBuffer的写模式 转为 读模式this.reportOffset.position(0);this.reportOffset.limit(8);// 循环,并判定ByteBuffer是否完全写入SocketChannelfor (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {try {this.socketChannel.write(this.reportOffset);} catch (IOException e) {log.error(this.getServiceName()+ "reportSlaveMaxOffset this.socketChannel.write exception", e);return false;}}lastWriteTimestamp = this.defaultMessageStore.getSystemClock().now();return !this.reportOffset.hasRemaining();
}

4. ReadSocketService线程读取从Broker复制进度

        org.apache.rocketmq.store.ha.DefaultHAConnection.ReadSocketService#processReadEvent是主Broker读取从Broker拉取消息的请求,获取内容是HA复制进度。其代码如下,看出主Broker获取从Broker的HA复制进度后,赋值给DefaultHAConnection#slaveRequestOffset属性,后立即唤醒GroupTransferService线程,执行消息同步

/*** 主Broker读取从Broker拉取消息的请求* step1:判定byteBufferRead是否有剩余空间,没有则{@link Buffer#flip()}* step2:用剩余空间,从SocketChannel读数据到缓存中;读取到的内容是从Broker拉取消息的偏移量* step3:通知等待同步HA复制结果的发送消息线程* @return*/
private boolean processReadEvent() {int readSizeZeroTimes = 0;/*byteBufferRead没有剩余空间时,则:position == limit == capacity调用flip()方法后,则:position == 0, limit == capacity,加上processPosition = 0,说明从头开始处理*/if (!this.byteBufferRead.hasRemaining()) {this.byteBufferRead.flip(); // ByteBuffer重置处理this.processPosition = 0;}// ByteBuffer有剩余空间,循环至byteBufferRead没有剩余空间while (this.byteBufferRead.hasRemaining()) {try {// 从SocketChannel读数据到缓存中int readSize = this.socketChannel.read(this.byteBufferRead);if (readSize > 0) {readSizeZeroTimes = 0; // 重置this.lastReadTimestamp = DefaultHAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();// 读取内容长度 >= 8,说明收到从Broker的拉取请求(内容是offset)if ((this.byteBufferRead.position() - this.processPosition) >= 8) {int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);long readOffset = this.byteBufferRead.getLong(pos - 8);this.processPosition = pos;// 从Broker反馈已完成的偏移量DefaultHAConnection.this.slaveAckOffset = readOffset;// 更新从Broker请求拉取的偏移量if (DefaultHAConnection.this.slaveRequestOffset < 0) {DefaultHAConnection.this.slaveRequestOffset = readOffset;log.info("slave[" + DefaultHAConnection.this.clientAddress + "] request offset " + readOffset);}// 通知等待同步HA复制结果的发送消息线程DefaultHAConnection.this.haService.notifyTransferSome(DefaultHAConnection.this.slaveAckOffset);}} else if (readSize == 0) {// 连续读取字节数0,则终止本次读取处理if (++readSizeZeroTimes >= 3) {break;}} else {log.error("read socket[" + DefaultHAConnection.this.clientAddress + "] < 0");return false;}} catch (IOException e) {log.error("processReadEvent exception", e);return false;}}return true;
}

5. WriteSocketService传输同步消息

        ReadSocketService线程读取从Broker发送的HA复制进度,由org.apache.rocketmq.store.ha.DefaultHAConnection.WriteSocketService根据DefaultHAConnection#slaveRequestOffset获取主Broker还没有同步的所有消息进行HA同步。其如下代码所示WriteSocketService#run方法,是同步消息核心逻辑。

/*** 传输消息内容到HA客户端* step1:slaveRequestOffset为-1时,说明主Broker没有收到从Broker的拉取请求,忽略本次写事件* step2:nextTransferFromWhere为-1时,说明初次传输,计算nextTransferFromWhere(待传输offset)* step3:判断上次是否传输完*        上次传输完:当前时间 与 上次传输时间的间隔 > 发送心跳包时间间隔,*                  发送心跳包长度: nextTransferFromWhere + size(消息长度0,避免长连接由空闲而关闭)*        上次传输没有完成:继续传输,忽略本次写事件* step4:根据从Broker待拉取消息offset查找之后的所有可读消息* step5:待同步消息总大小 > 一次传输的大小,默认32KB,则截取,此时一次传输不是完整的消息* step6:传输消息内容*/
@Override
public void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {this.selector.select(1000);// slaveRequestOffset为-1时,说明主Broker没有收到从Broker的拉取请求,忽略本次写事件if (-1 == DefaultHAConnection.this.slaveRequestOffset) {Thread.sleep(10);continue;}/*nextTransferFromWhere为-1时,说明初次传输初次传输时,计算nextTransferFromWhere(待传输offset)*/// 初次传输if (-1 == this.nextTransferFromWhere) {// =0 时,从Commitlog文件的最大偏移量传输if (0 == DefaultHAConnection.this.slaveRequestOffset) {long masterOffset = DefaultHAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();masterOffset =masterOffset- (masterOffset % DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getMappedFileSizeCommitLog());if (masterOffset < 0) {masterOffset = 0;}this.nextTransferFromWhere = masterOffset;}// !=0 时,从Broker请求的偏移量else {this.nextTransferFromWhere = DefaultHAConnection.this.slaveRequestOffset;}log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + DefaultHAConnection.this.clientAddress+ "], and slave request " + DefaultHAConnection.this.slaveRequestOffset);}/*判断上次是否传输完上次传输完:当前时间 与 上次传输时间的间隔 > 发送心跳包时间间隔,发送心跳包长度: nextTransferFromWhere + size(消息长度0,避免长连接由空闲而关闭)上次传输没有完成:继续传输,忽略本次写事件*/// lastWriteOver为true,则上次传输完if (this.lastWriteOver) {// 当前时间 与 上次传输时间的间隔long interval =DefaultHAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;// 时间间隔 > 发送心跳包时间间隔if (interval > DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaSendHeartbeatInterval()) {// Build Header 发送心跳包长度: nextTransferFromWhere + size(消息长度0,避免长连接由空闲而关闭)this.byteBufferHeader.position(0);this.byteBufferHeader.limit(headerSize);this.byteBufferHeader.putLong(this.nextTransferFromWhere);this.byteBufferHeader.putInt(0);this.byteBufferHeader.flip();// 传输数据this.lastWriteOver = this.transferData();if (!this.lastWriteOver)continue;}}// lastWriteOver为false,则上次传输没有完成,则继续传输else {// 继续传输上次拉取请求,还未完成,则忽略本次写事件this.lastWriteOver = this.transferData();if (!this.lastWriteOver)continue;}// 根据从Broker待拉取消息offset查找之后的所有可读消息SelectMappedBufferResult selectResult =DefaultHAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);if (selectResult != null) {int size = selectResult.getSize();// 待同步消息总大小 > 一次传输的大小,默认32KBif (size > DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {size = DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();}int canTransferMaxBytes = flowMonitor.canTransferMaxByteNum();if (size > canTransferMaxBytes) {if (System.currentTimeMillis() - lastPrintTimestamp > 1000) {log.warn("Trigger HA flow control, max transfer speed {}KB/s, current speed: {}KB/s",String.format("%.2f", flowMonitor.maxTransferByteInSecond() / 1024.0),String.format("%.2f", flowMonitor.getTransferredByteInSecond() / 1024.0));lastPrintTimestamp = System.currentTimeMillis();}size = canTransferMaxBytes;}long thisOffset = this.nextTransferFromWhere;this.nextTransferFromWhere += size; // 下一次写入的offsetselectResult.getByteBuffer().limit(size);this.selectMappedBufferResult = selectResult;// Build Header 传输size大小的消息内容(不一定是完整的消息)this.byteBufferHeader.position(0);this.byteBufferHeader.limit(headerSize);this.byteBufferHeader.putLong(thisOffset);this.byteBufferHeader.putInt(size);this.byteBufferHeader.flip();// 传输数据this.lastWriteOver = this.transferData();} else {DefaultHAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);}} catch (Exception e) {DefaultHAConnection.log.error(this.getServiceName() + " service has exception.", e);break;}}DefaultHAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();if (this.selectMappedBufferResult != null) {this.selectMappedBufferResult.release();}changeCurrentState(HAConnectionState.SHUTDOWN);this.makeStop();readSocketService.makeStop();haService.removeConnection(DefaultHAConnection.this);SelectionKey sk = this.socketChannel.keyFor(this.selector);if (sk != null) {sk.cancel();}try {this.selector.close();this.socketChannel.close();} catch (IOException e) {DefaultHAConnection.log.error("", e);}DefaultHAConnection.log.info(this.getServiceName() + " service end");
}

6. GroupTransferService线程通知HA结果

        org.apache.rocketmq.store.ha.GroupTransferService该类负责将主从同步复制结束后,通知阻塞的消息发送者线程。同步主从Broker模式,即:消息刷磁盘后,继续等待新消息被传输到从Broker,等待传输结果,并通知消息发送线程

        1):待需要HA的消息集合

        org.apache.rocketmq.store.CommitLog#asyncPutMessage是消息生产者发送消息到Broker时执行存储消息,参考《RocketMQ5.0.0消息存储<二>_消息存储流程》,该方法会根据同步或异步模式(默认)来执行org.apache.rocketmq.store.CommitLog#handleDiskFlushAndHA方法(完成刷盘和HA复制),方法调用链如下。

        生产者把消息发送到Broker,完成commit操作(消息提交到文件内存映射中) ,随后根据同步/异步模式完成刷盘和HA。HA操作时,把消息提交请求添加到org.apache.rocketmq.store.ha.GroupTransferService.requestsWrite是主Broker待需要HA的的集合。以下是org.apache.rocketmq.store.CommitLog#handleHA的代码。  

private CompletableFuture<PutMessageStatus> handleHA(AppendMessageResult result, PutMessageResult putMessageResult,int needAckNums) {if (needAckNums >= 0 && needAckNums <= 1) {return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);}HAService haService = this.defaultMessageStore.getHaService();long nextOffset = result.getWroteOffset() + result.getWroteBytes();// Wait enough acks from different slavesGroupCommitRequest request = new GroupCommitRequest(nextOffset, this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout(), needAckNums);haService.putRequest(request);haService.getWaitNotifyObject().wakeupAll();return request.future();
}

        2):通知消息发送者线程

        HAService启动时,会启动GroupTransferService线程。GroupTransferService#run执行任务,如下代码所示。

@Override
public void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {// 间隔10sthis.waitForRunning(10);// 主从同步复制结束后,通知阻塞的消息发送者线程this.doWaitTransfer();} catch (Exception e) {log.warn(this.getServiceName() + " service has exception. ", e);}}log.info(this.getServiceName() + " service end");
}

        其中执行waitForRunning()方法时,会去执行org.apache.rocketmq.store.ha.GroupTransferService#swapRequests方法,使得requestsWrite与requestsRead两个集合对调

  • private volatile List<CommitLog.GroupCommitRequest> requestsWrite:主Broker待需要HA的消息集合
  • private volatile List<CommitLog.GroupCommitRequest> requestsRead:主Broker正在执行的HA集合
private void swapRequests() {List<CommitLog.GroupCommitRequest> tmp = this.requestsWrite;this.requestsWrite = this.requestsRead;this.requestsRead = tmp;
}

        org.apache.rocketmq.store.ha.GroupTransferService#doWaitTransfer方法是主从同步复制结束后,通知阻塞的消息发送者线程,如下代码所示。

/*** 主从同步复制结束后,通知阻塞的消息发送者线程* step1:遍历消息提交请求(内存提交到Commitlog文件的内存映射)* step2:判断主从同步成功:从已经成功复制的最大偏移量 >= 消息生产者发送消息后返回下一条消息的偏移量*/
private void doWaitTransfer() {// 加锁synchronized (this.requestsRead) {if (!this.requestsRead.isEmpty()) {// commit请求,即:内存提交到Commitlog文件的内存映射for (CommitLog.GroupCommitRequest req : this.requestsRead) {boolean transferOK = false;long deadLine = req.getDeadLine(); // 是否超时final boolean allAckInSyncStateSet = req.getAckNums() == MixAll.ALL_ACK_IN_SYNC_STATE_SET;for (int i = 0; !transferOK && deadLine - System.nanoTime() > 0; i++) { // 是否超时if (i > 0) {// 等待1sthis.notifyTransferObject.waitForRunning(1000);}if (!allAckInSyncStateSet && req.getAckNums() <= 1) {// 主从同步成功判断:从已经成功复制的最大偏移量 >= 消息生产者发送消息后返回下一条消息的偏移量transferOK = haService.getPush2SlaveMaxOffset().get() >= req.getNextOffset();continue;}if (allAckInSyncStateSet && this.haService instanceof AutoSwitchHAService) {// In this mode, we must wait for all replicas that in InSyncStateSet.final AutoSwitchHAService autoSwitchHAService = (AutoSwitchHAService) this.haService;final Set<String> syncStateSet = autoSwitchHAService.getSyncStateSet();if (syncStateSet.size() <= 1) {// Only mastertransferOK = true;break;}// Include masterint ackNums = 1;for (HAConnection conn : haService.getConnectionList()) {final AutoSwitchHAConnection autoSwitchHAConnection = (AutoSwitchHAConnection) conn;if (syncStateSet.contains(autoSwitchHAConnection.getSlaveAddress()) && autoSwitchHAConnection.getSlaveAckOffset() >= req.getNextOffset()) {ackNums++;}if (ackNums >= syncStateSet.size()) {transferOK = true;break;}}} else {// Include masterint ackNums = 1;for (HAConnection conn : haService.getConnectionList()) {// TODO: We must ensure every HAConnection represents a different slave// Solution: Consider assign a unique and fixed IP:ADDR for each different slaveif (conn.getSlaveAckOffset() >= req.getNextOffset()) {ackNums++;}if (ackNums >= req.getAckNums()) {transferOK = true;break;}}}}if (!transferOK) {log.warn("transfer message to slave timeout, offset : {}, request acks: {}",req.getNextOffset(), req.getAckNums());}// 从完成复制后,唤醒消息发送者线程req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);}this.requestsRead.clear();}}
}

三、读写分离机制

        RocketMQ读写分离与其他中间件的实现方式完全不同,RocketMQ是消费者首先向主服务器发起拉取消息请求,然后主服务器返回一批消息,然后会根据主服务器负载压力与主从同步情况,向从服务器建议下次消息拉取是从主服务器还是从从服务器拉取

        RocketMQ根据MessageQueu查找Broker地址的唯一依据是brokerName。Broker组织中根据brokerName获取一组Broker服务器(M-S),它们的brokerName相同但brokerId不同,主服务器的brokerId为0,从服务器的brokerId大于0。 其方法是org.apache.rocketmq.client.impl.factory.MQClientInstance#findBrokerAddressInSubscribe

        详细消费拉取消息时,实现读写分离机制见后续章节,参考《RocketMQ5.0.0消息消费<一> _ PUSH模式的消息拉取》。

四、参考资料

【RocketMQ】学习RocketMQ必须要知道的主从同步原理_午睡的猫…的博客-CSDN博客_rocketmq主从同步原理

【RocketMQ】主从同步实现原理 - shanml - 博客园

RocketMQ5.0.0消息存储<二>_消息存储流程_爱我所爱0505的博客-CSDN博客_rocketmq 消息写入流程RocketMQ5.0.0消息存储<三>_消息转发与恢复机制_爱我所爱0505的博客-CSDN博客 

RocketMQ5.0.0消息存储<四>_刷盘机制_爱我所爱0505的博客-CSDN博客

http://www.dt0577.cn/news/3922.html

相关文章:

  • 制作网站公司谁家好怎么自己做网站
  • 做网站的工作记录上海今天刚刚发生的新闻
  • 二手房网签合同在哪个网站做厦门seo顾问屈兴东
  • 代理记账网站模板网盘资源大全
  • 地下城做解封任务的网站头条搜索是百度引擎吗
  • php网站后台密码破解程序个人在百度上发广告怎么发
  • 青岛网站建设方案案例站长之家网站
  • 大良营销网站建设服务东莞营销网站建设直播
  • 网络研发工程师专业黑帽seo
  • 广州网站建设公司有哪些seo免费优化工具
  • 网站被做跳转怎么办网络营销课程个人感悟
  • ai做网站优化营商环境建议
  • dream8网站建设及设计推广普通话手抄报一等奖
  • 做网站建网站1688官网入口
  • wordpress关闭发表评论对seo的理解
  • 一建建设网站新闻头条今日要闻最新
  • 政府网站建设服务域名查询138ip
  • 企业网站的建设与维护最近新闻热点
  • 手机4g建立网站百度竞价开户渠道
  • 怎么跟网站建设公司谈灰色行业推广渠道
  • 转移wordpress东莞关键词优化软件
  • 长春网站建设优化排名100大看免费行情的软件
  • 自己建的网站能用吗营销软文300字范文
  • 网站工作室设计网站收录查询工具
  • Linux做视频网站网速均衡seo海外推广
  • 外贸网站营销推广武汉标兵seo
  • 深圳网站定制建设搜索营销
  • 阿里云ecs服务器建设网站百度有免费推广广告
  • 购物网站建设成本推广公司运营模式
  • 兼职网站制作百度广告推广费用一年多少钱