租赁空间网站建设北京网站sem、seo
目录
一、主从同步工作原理
1. 主从配置
2. 启动HA
二、主从同步实现机制
1. 从Broker发送连接事件
2. 主Broker接收连接事件
3. 从Broker反馈复制进度
4. ReadSocketService线程读取从Broker复制进度
5. WriteSocketService传输同步消息
6. GroupTransferService线程通知HA结果
1):待需要HA的消息集合
2):通知消息发送者线程
三、读写分离机制
四、参考资料
一、主从同步工作原理
为了提高消息消费的高可用性,避免Broker发生单点故障引起存储在Broker上的消息无法及时消费, RocketMQ引入Broker主备机制,即:消息消费到达主服务器后需要将消息同步到消息从服务器,如果主服务器Broker宕机后,消息消费者可以从从服务器拉取消息。
下图所示是Broker的HA交互机制流程图及类图。主从同步模式分为:同步、异步。
- step1:主服务器启动,并在特定端口上监听从服务器的连接;
- step2:从服务器主动连接主服务器,主服务器接收客户端的连接,并建立相关TCP连接;
- step3:从服务器主动向主服务器发送待拉取消息偏移量,主服务器解析请求并返回消息给从服务器;
- step4:从服务器保存消息并继续发送新的消息同步请求。
1. 主从配置
参考rocketmq-distribution项目的conf目录下有:2主2从异步HA配置(2m-2s-async)、2主2从同步HA配置(2m-2s-sync)。以下1主1从异步HA配置实例如下。
主Broker配置:
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 启动SQL过滤
enablePropertyFilter = truenamesrvAddr=192.168.1.55:9876;172.17.0.3:9876
brokerIP1=192.168.1.55
从Broker配置:
注意:brokerName与主机相同;brokerId > 0时,则为从,0时则为主;brokerRole角色为SLAVE(从),刷盘类型为ASYNC_FLUSH(异步刷盘)。
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 1
deleteWhen = 04
fileReservedTime = 48
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
# 启动SQL过滤
enablePropertyFilter = truenamesrvAddr=192.168.1.55:9876;172.17.0.3:9876
2. 启动HA
org.apache.rocketmq.store.DefaultMessageStore#start是Broker启动方法,如下图所示是其调用链及相关HA部分代码。
/*** broker启动时,消息存储线程* BrokerController#startBasicService()* @throws Exception*/
@Override
public void start() throws Exception {// 是否HA主从复制if (!messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {this.haService.init(this);}......if (this.haService != null) {this.haService.start();}......
}
org.apache.rocketmq.store.ha.DefaultHAService#init是HAService初始化方法,如下代码所示。注意,从Broker的broker.conf配置的brokerRole为SLAVE,才能创建HAClient(从Broker注册到主Broker)。
@Override
public void init(final DefaultMessageStore defaultMessageStore) throws IOException {this.defaultMessageStore = defaultMessageStore;this.acceptSocketService = new DefaultAcceptSocketService(defaultMessageStore.getMessageStoreConfig());this.groupTransferService = new GroupTransferService(this, defaultMessageStore);if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {this.haClient = new DefaultHAClient(this.defaultMessageStore);}this.haConnectionStateNotificationService = new HAConnectionStateNotificationService(this, defaultMessageStore);
}
org.apache.rocketmq.store.ha.DefaultHAService#start是HAService启动方法。注意:
- org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService:主Broker接收从Broker的连接事件;
- org.apache.rocketmq.store.ha.GroupTransferService:负责主Broker向从Broker发送同步数据;
- org.apache.rocketmq.store.ha.HAClient:从Broker向主Broker发送连接事件;
Broker启动时,根据配置brokerRole配置(ASYNC_MASTER、SYNC_MASTER、SLAVE)判定Broker是主还是从。若是Slave角色,在broker配置文件中获取haMasterAddress,并更新至masterAddress;但是haMasterAddress配置为空,则启动成功,但是不会执行HA。
/*** 启动HAService* step1:{@link AcceptSocketService}接收从Broker的注册事件,方法是{@link AcceptSocketService#beginAccept()}* step2:启动{@link AcceptSocketService}线程,监听从Broker发送心跳* step3:同步数据{@link GroupTransferService}线程启动,主Broker向从Broker发送数据* step4:启动从Broker{@link HAClient}发送心跳到主Broker*/
@Override
public void start() throws Exception {// 主接收从Broker的连接事件,SelectionKey.OP_ACCEPT(连接事件)this.acceptSocketService.beginAccept();// 启动主Broker线程this.acceptSocketService.start();// 主Broker同步数据线程启动this.groupTransferService.start();this.haConnectionStateNotificationService.start();// 启动从Broker{@link HAClient}发送心跳到主Brokerif (haClient != null) {this.haClient.start();}
}
二、主从同步实现机制
1. 从Broker发送连接事件
org.apache.rocketmq.store.ha.DefaultHAClient是从Broker向主Broker的发送连接事件的核心类,是个线程。其主要属性如下代码所示。
// Socket读缓存区大小,4M
private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
// 主Broker地址
private final AtomicReference<String> masterHaAddress = new AtomicReference<>();
private final AtomicReference<String> masterAddress = new AtomicReference<>();
// 从Broker向主Broker发起HA的偏移量
private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
// 网络传输通道
private SocketChannel socketChannel;
// 事件选择器
private Selector selector;
/*** 上次读取主Broker的时间戳* last time that slave reads date from master.*/
private long lastReadTimestamp = System.currentTimeMillis();
/*** 上次写入主Broker的时间戳* last time that slave reports offset to master.*/
private long lastWriteTimestamp = System.currentTimeMillis();// 反馈HA的复制进度(从Broker的Commitlog文件的最大偏移量)
private long currentReportedOffset = 0;
// 本次处理读缓存区的指针
private int dispatchPosition = 0;
// 读缓存区
private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
// 读缓存区备份,与byteBufferRead交换
private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
private DefaultMessageStore defaultMessageStore;
// HA连接状态
private volatile HAConnectionState currentState = HAConnectionState.READY;
// 流监控
private FlowMonitor flowMonitor;
org.apache.rocketmq.store.ha.DefaultHAClient#run是HAClient启动执行任务,其调用链和代码如下。
- DefaultHAClient#connectMaster():从Broker连接到主Broker。
- DefaultHAClient#transferFromMaster():向主发送HA进度;处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中。
/*** 启动HAClient* {@link DefaultHAClient#connectMaster()}:从Broker连接到主Broker* {@link DefaultHAClient#transferFromMaster()}:向主发送HA进度;处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中*/
@Override
public void run() {log.info(this.getServiceName() + " service started");this.flowMonitor.start();while (!this.isStopped()) {try {switch (this.currentState) {case SHUTDOWN:return;case READY:if (!this.connectMaster()) {log.warn("HAClient connect to master {} failed", this.masterHaAddress.get());this.waitForRunning(1000 * 5);}continue;case TRANSFER:// 向主发送HA进度;处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中if (!transferFromMaster()) {// 没有可拉取消息时,设置READY状态closeMasterAndWait();continue;}break;default:this.waitForRunning(1000 * 2);continue;}long interval = this.defaultMessageStore.now() - this.lastReadTimestamp;if (interval > this.defaultMessageStore.getMessageStoreConfig().getHaHousekeepingInterval()) {log.warn("AutoRecoverHAClient, housekeeping, found this connection[" + this.masterHaAddress+ "] expired, " + interval);this.closeMaster();log.warn("AutoRecoverHAClient, master not response some time, so close connection");}} catch (Exception e) {log.warn(this.getServiceName() + " service has exception. ", e);this.closeMasterAndWait();}}log.info(this.getServiceName() + " service end");
}
注意,一旦HAClient线程启动后,在状态READY、TRANSFER来回变化,READY状态下:发送从Broker连接事件到主Broker,开启Socket连接;TRANSFER状态下:主从发送相关数据信息,如:从向主发送HA复制进度(currentReportedOffset,即:从Broker的Commitlog文件的最大偏移量);主向从发送同步消息。
org.apache.rocketmq.store.ha.DefaultHAClient#connectMaster是从Broker连接到主Broker的核心方法,其代码如下。
/*** 从Broker连接到主Broker* 注意:* a. Broker启动时,若是Slave角色,从broker配置文件中获取haMasterAddress,并更新至masterAddress;* b. 若是Slave角色,但是haMasterAddress配置为空,则启动成功,但是不会执行HA* @return true连接成功;false连接失败*/
public boolean connectMaster() throws ClosedChannelException {if (null == socketChannel) {// 获取主Broker地址String addr = this.masterHaAddress.get();if (addr != null) {// 根据地址创建SocketAddress对象SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);// 获取SocketChannelthis.socketChannel = RemotingUtil.connect(socketAddress);if (this.socketChannel != null) {// SocketChannel注册OP_READ(网络读事件)this.socketChannel.register(this.selector, SelectionKey.OP_READ);log.info("HAClient connect to master {}", addr);this.changeCurrentState(HAConnectionState.TRANSFER);}}// 获取Commitlog最大偏移量(HA同步进度)this.currentReportedOffset = this.defaultMessageStore.getMaxPhyOffset();this.lastReadTimestamp = System.currentTimeMillis();}return this.socketChannel != null;
}
2. 主Broker接收连接事件
org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService是主Broker接收从Broker连接事件的实现类,是一个线程。其主要属性如下代码所示。
// 主Broker监听本地的Socket(本地IP + 端口号)
private final SocketAddress socketAddressListen;
// Socket通道,基于NIO
private ServerSocketChannel serverSocketChannel;
// 事件选择器,基于NIO
private Selector selector;
org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService#beginAccept方法定义了主Broker监听从Broker的连接事件。
/*** 启动监听从broker的连接* Starts listening to slave connections.** @throws Exception If fails.*/
public void beginAccept() throws Exception {this.serverSocketChannel = ServerSocketChannel.open();this.selector = RemotingUtil.openSelector();this.serverSocketChannel.socket().setReuseAddress(true); // TCP可重复使用this.serverSocketChannel.socket().bind(this.socketAddressListen); // 绑定监听端口if (0 == messageStoreConfig.getHaListenPort()) {messageStoreConfig.setHaListenPort(this.serverSocketChannel.socket().getLocalPort());log.info("OS picked up {} to listen for HA", messageStoreConfig.getHaListenPort());}this.serverSocketChannel.configureBlocking(false); // 非阻塞模式this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT); // 注册OP_ACCEPT(连接事件)
}
org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService#run监听到从Broker连接事件的任务处理,为每个连接事件创建org.apache.rocketmq.store.ha.HAConnection对象并启动(负责M-S的数据同步逻辑)。
/*** 标准的NIO连接处理方式* step1:选择器每1s处理一次连接就绪事件* step2:是否连接事件,若是,创建{@link SocketChannel}* step3:每一个连接创建{@link HAConnection}(负责M-S的数据同步逻辑)*/
@Override
public void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {// 选择器每1s处理一次连接就绪事件this.selector.select(1000);Set<SelectionKey> selected = this.selector.selectedKeys();if (selected != null) {for (SelectionKey k : selected) {// 是否连接事件if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {// 若是连接事件时,创建SocketChannelSocketChannel sc = ((ServerSocketChannel) k.channel()).accept();if (sc != null) {DefaultHAService.log.info("HAService receive new connection, "+ sc.socket().getRemoteSocketAddress());try {// 每一个连接创建HAConnection并启动(负责M-S的数据同步逻辑)HAConnection conn = createConnection(sc);conn.start();DefaultHAService.this.addConnection(conn);} catch (Exception e) {log.error("new HAConnection exception", e);sc.close();}}} else {log.warn("Unexpected ops in select " + k.readyOps());}}selected.clear();}} catch (Exception e) {log.error(this.getServiceName() + " service has exception.", e);}}log.info(this.getServiceName() + " service end");
}
org.apache.rocketmq.store.ha.DefaultHAConnection创建并启动时,启动读、写线程服务。其关键属性如下代码所示。
- private WriteSocketService writeSocketService:主Broker向从Broker写数据服务类
- private ReadSocketService readSocketService:主Broker读取从Broker数据服务类
private final DefaultHAService haService;
private final SocketChannel socketChannel;
// HA客户端连接地址
private final String clientAddress;
// 主Broker向从Broker写数据服务类
private WriteSocketService writeSocketService;
// 主Broker读取从Broker数据服务类
private ReadSocketService readSocketService;
private volatile HAConnectionState currentState = HAConnectionState.TRANSFER;
// 从Broker请求拉取的偏移量
private volatile long slaveRequestOffset = -1;
// 从Broker反馈已完成的偏移量
private volatile long slaveAckOffset = -1;
private FlowMonitor flowMonitor;
3. 从Broker反馈复制进度
org.apache.rocketmq.store.ha.DefaultHAClient#transferFromMaster是从Broker与主Broker传输数据的核心方法,代码如下所示,该方法有两大功能:
- 从Broker向主Broker:反馈HA复制进度,即:currentReportedOffset(从Broker的Commitlog文件的最大偏移量),方法org.apache.rocketmq.store.ha.DefaultHAClient#reportSlaveMaxOffset执行。
- 从Broker接收主Broker:HA同步消息内容,方法org.apache.rocketmq.store.ha.DefaultHAClient#processReadEvent执行。
/*** 向主反馈HA复制进度;处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中* {@link DefaultHAClient#reportSlaveMaxOffset(long)}:向主反馈HA复制进度,即:currentReportedOffset(从Broker的Commitlog文件的最大偏移量)* {@link DefaultHAClient#processReadEvent()}:处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中*/
private boolean transferFromMaster() throws IOException {boolean result;// 判断是否需要向主Broker反馈当前待拉取偏移量if (this.isTimeToReportOffset()) {log.info("Slave report current offset {}", this.currentReportedOffset);// 向主Broker反馈拉取偏移量result = this.reportSlaveMaxOffset(this.currentReportedOffset);if (!result) {return false;}}this.selector.select(1000);// 处理主Broker发送过来的消息数据result = this.processReadEvent();if (!result) {return false;}return reportSlaveMaxOffsetPlus();
}
org.apache.rocketmq.store.ha.DefaultHAClient#reportSlaveMaxOffset向主Broker反馈HA复制进度,代码如下。
/*** 向主Broker反馈拉取偏移量* 注意:* a. 向主Broker反馈拉取偏移量maxOffset: 对于Slave端:发送下次待拉取消息偏移量* 对于Master端:本次请求拉取的偏移量,也可以理解为同步ACK* b. 手动切换ByteBuffer的写模式/读模式;* c. 通过{@link Buffer#hasRemaining()}判断缓存内容是否完全写入SocketChannel(基于NIO模式的写范例)* @param maxOffset HA待拉取偏移量* @return ByteBuffer缓存的内容是否写完*/
private boolean reportSlaveMaxOffset(final long maxOffset) {// 偏移量写入ByteBufferthis.reportOffset.position(0); // 写缓存位置this.reportOffset.limit(8); // 写缓存字节长度this.reportOffset.putLong(maxOffset); // 偏移量写入ByteBuffer// 将ByteBuffer的写模式 转为 读模式this.reportOffset.position(0);this.reportOffset.limit(8);// 循环,并判定ByteBuffer是否完全写入SocketChannelfor (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {try {this.socketChannel.write(this.reportOffset);} catch (IOException e) {log.error(this.getServiceName()+ "reportSlaveMaxOffset this.socketChannel.write exception", e);return false;}}lastWriteTimestamp = this.defaultMessageStore.getSystemClock().now();return !this.reportOffset.hasRemaining();
}
4. ReadSocketService线程读取从Broker复制进度
org.apache.rocketmq.store.ha.DefaultHAConnection.ReadSocketService#processReadEvent是主Broker读取从Broker拉取消息的请求,获取内容是HA复制进度。其代码如下,看出主Broker获取从Broker的HA复制进度后,赋值给DefaultHAConnection#slaveRequestOffset属性,后立即唤醒GroupTransferService线程,执行消息同步。
/*** 主Broker读取从Broker拉取消息的请求* step1:判定byteBufferRead是否有剩余空间,没有则{@link Buffer#flip()}* step2:用剩余空间,从SocketChannel读数据到缓存中;读取到的内容是从Broker拉取消息的偏移量* step3:通知等待同步HA复制结果的发送消息线程* @return*/
private boolean processReadEvent() {int readSizeZeroTimes = 0;/*byteBufferRead没有剩余空间时,则:position == limit == capacity调用flip()方法后,则:position == 0, limit == capacity,加上processPosition = 0,说明从头开始处理*/if (!this.byteBufferRead.hasRemaining()) {this.byteBufferRead.flip(); // ByteBuffer重置处理this.processPosition = 0;}// ByteBuffer有剩余空间,循环至byteBufferRead没有剩余空间while (this.byteBufferRead.hasRemaining()) {try {// 从SocketChannel读数据到缓存中int readSize = this.socketChannel.read(this.byteBufferRead);if (readSize > 0) {readSizeZeroTimes = 0; // 重置this.lastReadTimestamp = DefaultHAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();// 读取内容长度 >= 8,说明收到从Broker的拉取请求(内容是offset)if ((this.byteBufferRead.position() - this.processPosition) >= 8) {int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);long readOffset = this.byteBufferRead.getLong(pos - 8);this.processPosition = pos;// 从Broker反馈已完成的偏移量DefaultHAConnection.this.slaveAckOffset = readOffset;// 更新从Broker请求拉取的偏移量if (DefaultHAConnection.this.slaveRequestOffset < 0) {DefaultHAConnection.this.slaveRequestOffset = readOffset;log.info("slave[" + DefaultHAConnection.this.clientAddress + "] request offset " + readOffset);}// 通知等待同步HA复制结果的发送消息线程DefaultHAConnection.this.haService.notifyTransferSome(DefaultHAConnection.this.slaveAckOffset);}} else if (readSize == 0) {// 连续读取字节数0,则终止本次读取处理if (++readSizeZeroTimes >= 3) {break;}} else {log.error("read socket[" + DefaultHAConnection.this.clientAddress + "] < 0");return false;}} catch (IOException e) {log.error("processReadEvent exception", e);return false;}}return true;
}
5. WriteSocketService传输同步消息
ReadSocketService线程读取从Broker发送的HA复制进度,由org.apache.rocketmq.store.ha.DefaultHAConnection.WriteSocketService根据DefaultHAConnection#slaveRequestOffset获取主Broker还没有同步的所有消息进行HA同步。其如下代码所示WriteSocketService#run方法,是同步消息核心逻辑。
/*** 传输消息内容到HA客户端* step1:slaveRequestOffset为-1时,说明主Broker没有收到从Broker的拉取请求,忽略本次写事件* step2:nextTransferFromWhere为-1时,说明初次传输,计算nextTransferFromWhere(待传输offset)* step3:判断上次是否传输完* 上次传输完:当前时间 与 上次传输时间的间隔 > 发送心跳包时间间隔,* 发送心跳包长度: nextTransferFromWhere + size(消息长度0,避免长连接由空闲而关闭)* 上次传输没有完成:继续传输,忽略本次写事件* step4:根据从Broker待拉取消息offset查找之后的所有可读消息* step5:待同步消息总大小 > 一次传输的大小,默认32KB,则截取,此时一次传输不是完整的消息* step6:传输消息内容*/
@Override
public void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {this.selector.select(1000);// slaveRequestOffset为-1时,说明主Broker没有收到从Broker的拉取请求,忽略本次写事件if (-1 == DefaultHAConnection.this.slaveRequestOffset) {Thread.sleep(10);continue;}/*nextTransferFromWhere为-1时,说明初次传输初次传输时,计算nextTransferFromWhere(待传输offset)*/// 初次传输if (-1 == this.nextTransferFromWhere) {// =0 时,从Commitlog文件的最大偏移量传输if (0 == DefaultHAConnection.this.slaveRequestOffset) {long masterOffset = DefaultHAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();masterOffset =masterOffset- (masterOffset % DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getMappedFileSizeCommitLog());if (masterOffset < 0) {masterOffset = 0;}this.nextTransferFromWhere = masterOffset;}// !=0 时,从Broker请求的偏移量else {this.nextTransferFromWhere = DefaultHAConnection.this.slaveRequestOffset;}log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + DefaultHAConnection.this.clientAddress+ "], and slave request " + DefaultHAConnection.this.slaveRequestOffset);}/*判断上次是否传输完上次传输完:当前时间 与 上次传输时间的间隔 > 发送心跳包时间间隔,发送心跳包长度: nextTransferFromWhere + size(消息长度0,避免长连接由空闲而关闭)上次传输没有完成:继续传输,忽略本次写事件*/// lastWriteOver为true,则上次传输完if (this.lastWriteOver) {// 当前时间 与 上次传输时间的间隔long interval =DefaultHAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;// 时间间隔 > 发送心跳包时间间隔if (interval > DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaSendHeartbeatInterval()) {// Build Header 发送心跳包长度: nextTransferFromWhere + size(消息长度0,避免长连接由空闲而关闭)this.byteBufferHeader.position(0);this.byteBufferHeader.limit(headerSize);this.byteBufferHeader.putLong(this.nextTransferFromWhere);this.byteBufferHeader.putInt(0);this.byteBufferHeader.flip();// 传输数据this.lastWriteOver = this.transferData();if (!this.lastWriteOver)continue;}}// lastWriteOver为false,则上次传输没有完成,则继续传输else {// 继续传输上次拉取请求,还未完成,则忽略本次写事件this.lastWriteOver = this.transferData();if (!this.lastWriteOver)continue;}// 根据从Broker待拉取消息offset查找之后的所有可读消息SelectMappedBufferResult selectResult =DefaultHAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);if (selectResult != null) {int size = selectResult.getSize();// 待同步消息总大小 > 一次传输的大小,默认32KBif (size > DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {size = DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();}int canTransferMaxBytes = flowMonitor.canTransferMaxByteNum();if (size > canTransferMaxBytes) {if (System.currentTimeMillis() - lastPrintTimestamp > 1000) {log.warn("Trigger HA flow control, max transfer speed {}KB/s, current speed: {}KB/s",String.format("%.2f", flowMonitor.maxTransferByteInSecond() / 1024.0),String.format("%.2f", flowMonitor.getTransferredByteInSecond() / 1024.0));lastPrintTimestamp = System.currentTimeMillis();}size = canTransferMaxBytes;}long thisOffset = this.nextTransferFromWhere;this.nextTransferFromWhere += size; // 下一次写入的offsetselectResult.getByteBuffer().limit(size);this.selectMappedBufferResult = selectResult;// Build Header 传输size大小的消息内容(不一定是完整的消息)this.byteBufferHeader.position(0);this.byteBufferHeader.limit(headerSize);this.byteBufferHeader.putLong(thisOffset);this.byteBufferHeader.putInt(size);this.byteBufferHeader.flip();// 传输数据this.lastWriteOver = this.transferData();} else {DefaultHAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);}} catch (Exception e) {DefaultHAConnection.log.error(this.getServiceName() + " service has exception.", e);break;}}DefaultHAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();if (this.selectMappedBufferResult != null) {this.selectMappedBufferResult.release();}changeCurrentState(HAConnectionState.SHUTDOWN);this.makeStop();readSocketService.makeStop();haService.removeConnection(DefaultHAConnection.this);SelectionKey sk = this.socketChannel.keyFor(this.selector);if (sk != null) {sk.cancel();}try {this.selector.close();this.socketChannel.close();} catch (IOException e) {DefaultHAConnection.log.error("", e);}DefaultHAConnection.log.info(this.getServiceName() + " service end");
}
6. GroupTransferService线程通知HA结果
org.apache.rocketmq.store.ha.GroupTransferService该类负责将主从同步复制结束后,通知阻塞的消息发送者线程。同步主从Broker模式,即:消息刷磁盘后,继续等待新消息被传输到从Broker,等待传输结果,并通知消息发送线程。
1):待需要HA的消息集合
org.apache.rocketmq.store.CommitLog#asyncPutMessage是消息生产者发送消息到Broker时执行存储消息,参考《RocketMQ5.0.0消息存储<二>_消息存储流程》,该方法会根据同步或异步模式(默认)来执行org.apache.rocketmq.store.CommitLog#handleDiskFlushAndHA方法(完成刷盘和HA复制),方法调用链如下。
生产者把消息发送到Broker,完成commit操作(消息提交到文件内存映射中) ,随后根据同步/异步模式完成刷盘和HA。HA操作时,把消息提交请求添加到org.apache.rocketmq.store.ha.GroupTransferService.requestsWrite是主Broker待需要HA的的集合。以下是org.apache.rocketmq.store.CommitLog#handleHA的代码。
private CompletableFuture<PutMessageStatus> handleHA(AppendMessageResult result, PutMessageResult putMessageResult,int needAckNums) {if (needAckNums >= 0 && needAckNums <= 1) {return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);}HAService haService = this.defaultMessageStore.getHaService();long nextOffset = result.getWroteOffset() + result.getWroteBytes();// Wait enough acks from different slavesGroupCommitRequest request = new GroupCommitRequest(nextOffset, this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout(), needAckNums);haService.putRequest(request);haService.getWaitNotifyObject().wakeupAll();return request.future();
}
2):通知消息发送者线程
HAService启动时,会启动GroupTransferService线程。GroupTransferService#run执行任务,如下代码所示。
@Override
public void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {// 间隔10sthis.waitForRunning(10);// 主从同步复制结束后,通知阻塞的消息发送者线程this.doWaitTransfer();} catch (Exception e) {log.warn(this.getServiceName() + " service has exception. ", e);}}log.info(this.getServiceName() + " service end");
}
其中执行waitForRunning()方法时,会去执行org.apache.rocketmq.store.ha.GroupTransferService#swapRequests方法,使得requestsWrite与requestsRead两个集合对调:
- private volatile List<CommitLog.GroupCommitRequest> requestsWrite:主Broker待需要HA的消息集合
- private volatile List<CommitLog.GroupCommitRequest> requestsRead:主Broker正在执行的HA集合
private void swapRequests() {List<CommitLog.GroupCommitRequest> tmp = this.requestsWrite;this.requestsWrite = this.requestsRead;this.requestsRead = tmp;
}
org.apache.rocketmq.store.ha.GroupTransferService#doWaitTransfer方法是主从同步复制结束后,通知阻塞的消息发送者线程,如下代码所示。
/*** 主从同步复制结束后,通知阻塞的消息发送者线程* step1:遍历消息提交请求(内存提交到Commitlog文件的内存映射)* step2:判断主从同步成功:从已经成功复制的最大偏移量 >= 消息生产者发送消息后返回下一条消息的偏移量*/
private void doWaitTransfer() {// 加锁synchronized (this.requestsRead) {if (!this.requestsRead.isEmpty()) {// commit请求,即:内存提交到Commitlog文件的内存映射for (CommitLog.GroupCommitRequest req : this.requestsRead) {boolean transferOK = false;long deadLine = req.getDeadLine(); // 是否超时final boolean allAckInSyncStateSet = req.getAckNums() == MixAll.ALL_ACK_IN_SYNC_STATE_SET;for (int i = 0; !transferOK && deadLine - System.nanoTime() > 0; i++) { // 是否超时if (i > 0) {// 等待1sthis.notifyTransferObject.waitForRunning(1000);}if (!allAckInSyncStateSet && req.getAckNums() <= 1) {// 主从同步成功判断:从已经成功复制的最大偏移量 >= 消息生产者发送消息后返回下一条消息的偏移量transferOK = haService.getPush2SlaveMaxOffset().get() >= req.getNextOffset();continue;}if (allAckInSyncStateSet && this.haService instanceof AutoSwitchHAService) {// In this mode, we must wait for all replicas that in InSyncStateSet.final AutoSwitchHAService autoSwitchHAService = (AutoSwitchHAService) this.haService;final Set<String> syncStateSet = autoSwitchHAService.getSyncStateSet();if (syncStateSet.size() <= 1) {// Only mastertransferOK = true;break;}// Include masterint ackNums = 1;for (HAConnection conn : haService.getConnectionList()) {final AutoSwitchHAConnection autoSwitchHAConnection = (AutoSwitchHAConnection) conn;if (syncStateSet.contains(autoSwitchHAConnection.getSlaveAddress()) && autoSwitchHAConnection.getSlaveAckOffset() >= req.getNextOffset()) {ackNums++;}if (ackNums >= syncStateSet.size()) {transferOK = true;break;}}} else {// Include masterint ackNums = 1;for (HAConnection conn : haService.getConnectionList()) {// TODO: We must ensure every HAConnection represents a different slave// Solution: Consider assign a unique and fixed IP:ADDR for each different slaveif (conn.getSlaveAckOffset() >= req.getNextOffset()) {ackNums++;}if (ackNums >= req.getAckNums()) {transferOK = true;break;}}}}if (!transferOK) {log.warn("transfer message to slave timeout, offset : {}, request acks: {}",req.getNextOffset(), req.getAckNums());}// 从完成复制后,唤醒消息发送者线程req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);}this.requestsRead.clear();}}
}
三、读写分离机制
RocketMQ读写分离与其他中间件的实现方式完全不同,RocketMQ是消费者首先向主服务器发起拉取消息请求,然后主服务器返回一批消息,然后会根据主服务器负载压力与主从同步情况,向从服务器建议下次消息拉取是从主服务器还是从从服务器拉取。
RocketMQ根据MessageQueu查找Broker地址的唯一依据是brokerName。Broker组织中根据brokerName获取一组Broker服务器(M-S),它们的brokerName相同但brokerId不同,主服务器的brokerId为0,从服务器的brokerId大于0。 其方法是org.apache.rocketmq.client.impl.factory.MQClientInstance#findBrokerAddressInSubscribe
详细消费拉取消息时,实现读写分离机制见后续章节,参考《RocketMQ5.0.0消息消费<一> _ PUSH模式的消息拉取》。
四、参考资料
【RocketMQ】学习RocketMQ必须要知道的主从同步原理_午睡的猫…的博客-CSDN博客_rocketmq主从同步原理
【RocketMQ】主从同步实现原理 - shanml - 博客园
RocketMQ5.0.0消息存储<二>_消息存储流程_爱我所爱0505的博客-CSDN博客_rocketmq 消息写入流程RocketMQ5.0.0消息存储<三>_消息转发与恢复机制_爱我所爱0505的博客-CSDN博客
RocketMQ5.0.0消息存储<四>_刷盘机制_爱我所爱0505的博客-CSDN博客