RocketMQ-15丨RocketMQ主从复制

Posted by jiefang on July 6, 2021

RocketMQ主从复制

原理

为了提高消息消费的高可用性,避免Broker 发生单点故障引起存储在Broker 上的消息无法及时消费, RocketMQ 引入了Broker 主备机制, 即消息消费到达主服务器后需要将消息同步到消息从服务器,如果主服务器Broker 宕机后,消息消费者可以从从服务器拉取消息。

流程

  1. master服务器启动,并在特定端口上监昕从服务器的连接;
  2. slave服务器主动连接主服务器,master服务器接收客户端的连接,并建立相关TCP 连接;
  3. slave服务器主动向master服务器发送待拉取消息偏移量,master服务器解析请求并返回消息给从服务器;
  4. slave服务器保存消息并继续发送新的消息同步请求;

核心类

  • HAService: RocketMQ 主从同步核心实现类;
  • HAService$AcceptSocketService : HA Master 端监昕客户端连接实现类;
  • HAService$GroupTransferService :主从同步通知实现类;
  • HAService$HAClient: HA Client 端实现类;
  • HAConnection : HA Master 服务端HA 连接对象的封装,与Broker 从服务器的网络读写实现类;
  • HAConnection$ReadSocketService: HA Master 网络读实现类;
  • HAConnection$WriteSocketServicce: HA Master 网络写实现类;

Master端

HAService#start()

1
2
3
4
5
6
7
8
9
10
/**
 * HAService启动
 * @throws Exception 异常
 */
public void start() throws Exception {
    this.acceptSocketService.beginAccept();
    this.acceptSocketService.start();
    this.groupTransferService.start();
    this.haClient.start();
}

AcceptSocketService

AcceptSocketService#beginAccept()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
 * master端监听客户端连接实现类
 * Listens to slave connections to create {@link HAConnection}.
 */
class AcceptSocketService extends ServiceThread {
            /**
         * 开始监听slave连接,注册连接事件
         * Starts listening to slave connections.
         *
         * @throws Exception If fails.
         */
        public void beginAccept() throws Exception {
            this.serverSocketChannel = ServerSocketChannel.open();
            this.selector = RemotingUtil.openSelector();
            this.serverSocketChannel.socket().setReuseAddress(true);
            this.serverSocketChannel.socket().bind(this.socketAddressListen);
            this.serverSocketChannel.configureBlocking(false);
            this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
        }
}

AcceptSocketService#start()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
 * ServerSocketChannel监听可以连接事件,启动连接,处理可读可写事件
 * {@inheritDoc}
 */
@Override
public void run() {
    log.info(this.getServiceName() + " service started");
    while (!this.isStopped()) {
        try {
            this.selector.select(1000);
            Set<SelectionKey> selected = this.selector.selectedKeys();
            if (selected != null) {
                for (SelectionKey k : selected) {
                    //连接事件就绪
                    if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                        SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
                        if (sc != null) {
                            HAService.log.info("HAService receive new connection, "
                                + sc.socket().getRemoteSocketAddress());
                            try {
                                HAConnection conn = new HAConnection(HAService.this, sc);
                                conn.start();
                                HAService.this.addConnection(conn);
                            } catch (Exception e) {
                                log.error("new HAConnection exception", e);
                                sc.close();
                            }
                        }
                    } else {
                        log.warn("Unexpected ops in select " + k.readyOps());
                    }
                }
                selected.clear();
            }
        } catch (Exception e) {
            log.error(this.getServiceName() + " service has exception.", e);
        }
    }
    log.info(this.getServiceName() + " service end");
}

GroupTransferService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
 * 主从同步阻塞实现
 * 职责是负责当主从同步复制结束后,通知由于等待HA同步结果而阻塞的消息发送者线程。
 * GroupTransferService Service
 */
class GroupTransferService extends ServiceThread {
        /**
         * 发送请求
         * @param request 请求
         */
        public synchronized void putRequest(final CommitLog.GroupCommitRequest request) {
            synchronized (this.requestsWrite) {
                this.requestsWrite.add(request);
            }
            this.wakeup();
        }
        public void run() {
            log.info(this.getServiceName() + " service started");
            while (!this.isStopped()) {
                try {
                    this.waitForRunning(10);
                    this.doWaitTransfer();
                } catch (Exception e) {
                    log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }
            log.info(this.getServiceName() + " service end");
        }  
}

GroupTransferService#doWaitTransfer()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
 * 等待主从同步结果,设置消息状态
 */
private void doWaitTransfer() {
    synchronized (this.requestsRead) {
        if (!this.requestsRead.isEmpty()) {
            for (CommitLog.GroupCommitRequest req : this.requestsRead) {
                boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                //超时时间
                long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()
                      //主从同步超时时间5S
                    + HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();
                while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
                    //休眠1S
                    this.notifyTransferObject.waitForRunning(1000);
                    transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                }

                if (!transferOK) {
                    log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
                }
                //设置消息主从同步的结果
                req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
            }
            this.requestsRead.clear();
        }
    }
}

HAConnection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
 * Master服务端HA连接对象封装,与Slave服务器网络读写实现
 * Master服务器在收到从服务器的连接请求后,会将主从服务器的连接SocketChannel封装成HAConnection对象,实现主服务器与从服务器的读写操作
 */
public class HAConnection {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    private final HAService haService;
    private final SocketChannel socketChannel;
    private final String clientAddr;
    //master向slave写数据服务类
    private WriteSocketService writeSocketService;
    //master从slave读数据服务类
    private ReadSocketService readSocketService;
    //slave请求拉取数据的偏移量
    private volatile long slaveRequestOffset = -1;
    //slave反馈已拉取完成的数据偏移量
    private volatile long slaveAckOffset = -1;
    
    public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException {
        this.haService = haService;
        this.socketChannel = socketChannel;
        this.clientAddr = this.socketChannel.socket().getRemoteSocketAddress().toString();
        this.socketChannel.configureBlocking(false);
        this.socketChannel.socket().setSoLinger(false, -1);
        this.socketChannel.socket().setTcpNoDelay(true);
        this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
        this.socketChannel.socket().setSendBufferSize(1024 * 64);
        this.writeSocketService = new WriteSocketService(this.socketChannel);
        this.readSocketService = new ReadSocketService(this.socketChannel);
        this.haService.getConnectionCount().incrementAndGet();
    }
    /**
     * 监控可读事件和可写事件,读处理和写处理启动
     */
    public void start() {
        this.readSocketService.start();
        this.writeSocketService.start();
    }    
}

ReadSocketService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
 * master服务端网络读实现类
 */
class ReadSocketService extends ServiceThread {
    private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024;
    private final Selector selector;
    private final SocketChannel socketChannel;
    private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
    private int processPosition = 0;
    private volatile long lastReadTimestamp = System.currentTimeMillis();

    public ReadSocketService(final SocketChannel socketChannel) throws IOException {
        this.selector = RemotingUtil.openSelector();
        this.socketChannel = socketChannel;
        this.socketChannel.register(this.selector, SelectionKey.OP_READ);
        this.setDaemon(true);
    }
    @Override
    public void run() {
        HAConnection.log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            try {
                this.selector.select(1000);
                //处理可读事件
                boolean ok = this.processReadEvent();
                if (!ok) {
                    HAConnection.log.error("processReadEvent error");
                    break;
                }
                long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
                if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
                    log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
                    break;
                }
            } catch (Exception e) {
                HAConnection.log.error(this.getServiceName() + " service has exception.", e);
                break;
            }
        }

        this.makeStop();

        writeSocketService.makeStop();

        haService.removeConnection(HAConnection.this);

        HAConnection.this.haService.getConnectionCount().decrementAndGet();

        SelectionKey sk = this.socketChannel.keyFor(this.selector);
        if (sk != null) {
            sk.cancel();
        }

        try {
            this.selector.close();
            this.socketChannel.close();
        } catch (IOException e) {
            HAConnection.log.error("", e);
        }
        HAConnection.log.info(this.getServiceName() + " service end");
    }
}
ReadSocketService#processReadEvent()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
    /**
     * 处理读事件
     * @return 结果
     */
    private boolean processReadEvent() {
        int readSizeZeroTimes = 0;
        //如果byteBufferRead 没有剩余空间,说明该position==limit==capacity ,调用byteBufferRead.flip()方法,
        //产生的效果为position=O,limit=capacity 并设置processPostion为0,表示从头开始处理
        if (!this.byteBufferRead.hasRemaining()) {
            this.byteBufferRead.flip();
            this.processPosition = 0;
        }

        while (this.byteBufferRead.hasRemaining()) {
            try {
                int readSize = this.socketChannel.read(this.byteBufferRead);
                if (readSize > 0) {
                    readSizeZeroTimes = 0;
                    this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                    //如果读取的字节大于0 并且本次读取到的内容大于等于8 ,表明收到了从服务器一条拉取消息的请求
                    if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
                        int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
                        long readOffset = this.byteBufferRead.getLong(pos - 8);
                        this.processPosition = pos;

                        HAConnection.this.slaveAckOffset = readOffset;
                        if (HAConnection.this.slaveRequestOffset < 0) {
                            HAConnection.this.slaveRequestOffset = readOffset;
                            log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
                        }
                        //更新推送到salve的最大偏移量
                        HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
                    }
                //重试3次
                } else if (readSize == 0) {
                    if (++readSizeZeroTimes >= 3) {
                        break;
                    }
                //如果读取到的字节数小于0 ,表示连接处于半关闭状态
                } else {
                    log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
                    return false;
                }
            } catch (IOException e) {
                log.error("processReadEvent exception", e);
                return false;
            }
        }
        return true;
    }
}

WriteSocketService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
/**
 * master服务端网络写实现类
 */
class WriteSocketService extends ServiceThread {
    private final Selector selector;
    private final SocketChannel socketChannel;

    private final int headerSize = 8 + 4;
    private final ByteBuffer byteBufferHeader = ByteBuffer.allocate(headerSize);
    private long nextTransferFromWhere = -1;
    private SelectMappedBufferResult selectMappedBufferResult;
    private boolean lastWriteOver = true;
    private long lastWriteTimestamp = System.currentTimeMillis();

    public WriteSocketService(final SocketChannel socketChannel) throws IOException {
        this.selector = RemotingUtil.openSelector();
        this.socketChannel = socketChannel;
        this.socketChannel.register(this.selector, SelectionKey.OP_WRITE);
        this.setDaemon(true);
    }

    @Override
    public void run() {
        HAConnection.log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            try {
                this.selector.select(1000);
                //如果slaveRequestOffset等于-1,说明Master还未收到从服务器的拉取请求,放弃本次事件处理
                if (-1 == HAConnection.this.slaveRequestOffset) {
                    Thread.sleep(10);
                    continue;
                }
                //如果nextTransferFromWhere为-1,表示初次进行数据传输,计算待传输的物理偏移量,
                //如果slaveRequestOffset为0,则从当前commitLog文件最大偏移量开始传输,否则根据从服务器的拉取请求偏移量开始传输。
                if (-1 == this.nextTransferFromWhere) {
                    if (0 == HAConnection.this.slaveRequestOffset) {
                        long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
                        masterOffset =
                            masterOffset
                                - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                                .getMappedFileSizeCommitLog());

                        if (masterOffset < 0) {
                            masterOffset = 0;
                        }

                        this.nextTransferFromWhere = masterOffset;
                    } else {
                        this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
                    }

                    log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
                        + "], and slave request " + HAConnection.this.slaveRequestOffset);
                }
                //判断上次写事件是否已将信息全部写入客户端
                if (this.lastWriteOver) {

                    long interval =
                        HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
                    //当前时间-上次写入时间>心跳检测时间,发送心跳包
                    if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                        .getHaSendHeartbeatInterval()) {

                        // Build Header
                        this.byteBufferHeader.position(0);
                        this.byteBufferHeader.limit(headerSize);
                        this.byteBufferHeader.putLong(this.nextTransferFromWhere);
                        this.byteBufferHeader.putInt(0);
                        this.byteBufferHeader.flip();

                        this.lastWriteOver = this.transferData();
                        if (!this.lastWriteOver)
                            continue;
                    }
                } else {
                    //上次数据未写完,继续传输
                    this.lastWriteOver = this.transferData();
                    if (!this.lastWriteOver)
                        continue;
                }

                SelectMappedBufferResult selectResult =
                    HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
                //slave服务器请求的待拉取偏移量,查找该偏移量之后所有可读消息
                if (selectResult != null) {
                    int size = selectResult.getSize();
                    //查找到消息的总长度大于配置HA传输一次同步任务的最大传输字节,默认32K
                    if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
                        size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
                    }

                    long thisOffset = this.nextTransferFromWhere;
                    this.nextTransferFromWhere += size;

                    selectResult.getByteBuffer().limit(size);
                    this.selectMappedBufferResult = selectResult;

                    // Build Header
                    this.byteBufferHeader.position(0);
                    this.byteBufferHeader.limit(headerSize);
                    this.byteBufferHeader.putLong(thisOffset);
                    this.byteBufferHeader.putInt(size);
                    this.byteBufferHeader.flip();

                    this.lastWriteOver = this.transferData();
                //未找到消息,等待100毫秒
                } else {
                    HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
                }
            } catch (Exception e) {

                HAConnection.log.error(this.getServiceName() + " service has exception.", e);
                break;
            }
        }

        HAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();

        if (this.selectMappedBufferResult != null) {
            this.selectMappedBufferResult.release();
        }

        this.makeStop();

        readSocketService.makeStop();

        haService.removeConnection(HAConnection.this);

        SelectionKey sk = this.socketChannel.keyFor(this.selector);
        if (sk != null) {
            sk.cancel();
        }

        try {
            this.selector.close();
            this.socketChannel.close();
        } catch (IOException e) {
            HAConnection.log.error("", e);
        }

        HAConnection.log.info(this.getServiceName() + " service end");
    }

    /**
     * 传输数据
     * @return
     * @throws Exception
     */
    private boolean transferData() throws Exception {
        int writeSizeZeroTimes = 0;
        // Write Header 写报文头
        while (this.byteBufferHeader.hasRemaining()) {
            int writeSize = this.socketChannel.write(this.byteBufferHeader);
            if (writeSize > 0) {
                writeSizeZeroTimes = 0;
                this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
            } else if (writeSize == 0) {
                if (++writeSizeZeroTimes >= 3) {
                    break;
                }
            } else {
                throw new Exception("ha master write header error < 0");
            }
        }

        if (null == this.selectMappedBufferResult) {
            return !this.byteBufferHeader.hasRemaining();
        }

        writeSizeZeroTimes = 0;

        // Write Body 写报文体
        if (!this.byteBufferHeader.hasRemaining()) {
            while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
                int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
                if (writeSize > 0) {
                    writeSizeZeroTimes = 0;
                    this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                } else if (writeSize == 0) {
                    if (++writeSizeZeroTimes >= 3) {
                        break;
                    }
                } else {
                    throw new Exception("ha master write body error < 0");
                }
            }
        }

        boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();

        if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
            this.selectMappedBufferResult.release();
            this.selectMappedBufferResult = null;
        }
        return result;
    }
}

Slave端

HAClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
 * HA客户端实现类
 */
class HAClient extends ServiceThread {
    //Socket读缓冲区大小
    private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
    //master地址
    private final AtomicReference<String> masterAddress = new AtomicReference<>();
    //slave向master发起主从同步拉取偏移量
    private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
    private SocketChannel socketChannel;
    //事件选择器
    private Selector selector;
    //最后写入时间戳
    private long lastWriteTimestamp = System.currentTimeMillis();
    //反馈slave当前的复制进度, commitlog文件最大偏移量
    private long currentReportedOffset = 0;
    //本次己处理读缓存区的指针
    private int dispatchPosition = 0;
    //读缓冲区
    private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
    //读缓冲区备份,与byteBufferRead交换
    private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);

    public HAClient() throws IOException {
        this.selector = RemotingUtil.openSelector();
    }
}

HAClient#run()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Override
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            //Slave服务器连接Master服务器
            if (this.connectMaster()) {
                //每5S发送一次心跳,是否需要向master反馈当前待拉取偏移量
                if (this.isTimeToReportOffset()) {
                    //slave向master报告当前复制进度
                    boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                    if (!result) {
                        this.closeMaster();
                    }
                }
                //阻塞1S
                this.selector.select(1000);
                //处理网络读事件
                boolean ok = this.processReadEvent();
                if (!ok) {
                    this.closeMaster();
                }

                if (!reportSlaveMaxOffsetPlus()) {
                    continue;
                }

                long interval =
                    HAService.this.getDefaultMessageStore().getSystemClock().now()
                        - this.lastWriteTimestamp;
                if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
                    .getHaHousekeepingInterval()) {
                    log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
                        + "] expired, " + interval);
                    this.closeMaster();
                    log.warn("HAClient, master not response some time, so close connection");
                }
            } else {
                this.waitForRunning(1000 * 5);
            }
        } catch (Exception e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
            this.waitForRunning(1000 * 5);
        }
    }
    log.info(this.getServiceName() + " service end");
}

HAClient#reportSlaveMaxOffset()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
 * slave向master报告最大偏移量
 * @param maxOffset 最大偏移量
 * @return 是否全部写入
 */
private boolean reportSlaveMaxOffset(final long maxOffset) {
    this.reportOffset.position(0);
    this.reportOffset.limit(8);
    this.reportOffset.putLong(maxOffset);
    this.reportOffset.position(0);
    this.reportOffset.limit(8);
    //首先先将ByteBuffer的position设置为0, limit设置为待写入字节长度, 然后调用putLong将待拉取偏移量写入ByteBuffer中,
    //需要将ByteBuffer 从写模式切换到读模式,这里的用法是手动将position设置为0, limit 设置为可读长度,
    //其实这里可以直接调用ByteBuffer的flip()方法来切换ByteBuffer的读写状态。
    //特别需要留意的是,调用网络通道的write方法是在一个while循环中反复判断byteBuffer 是否全部写入到通道中,
    //这是由于NIO是一个非阻塞IO,调用一次write方法不一定会将ByteBuffer可读字节全部写入
    for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
        try {
            //向socketChannel写入ByteBuffer
            this.socketChannel.write(this.reportOffset);
        } catch (IOException e) {
            log.error(this.getServiceName()
                + "reportSlaveMaxOffset this.socketChannel.write exception", e);
            return false;
        }
    }
    lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
    return !this.reportOffset.hasRemaining();
}

HAClient#processReadEvent()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
 * salve端处理网络读事件
 * @return boolean
 */
private boolean processReadEvent() {
    int readSizeZeroTimes = 0;
    //ByteBuffer是否还有剩余空间
    while (this.byteBufferRead.hasRemaining()) {
        try {
            //循环判断readByteBuffer 是否还有剩余空间,如果
            //存在剩余空间,则调用SocketChannel#read(),将通道中的数据读入到读缓存区中。
            //1.如果读取到的字节数大于0 ,重置i卖取到0 字节的次数,并更新最后一次写入时间
            //戳,然后调用dispatchReadRequest 方法将读取到的所有消息全部追加到消息内存映射文件中,然后再次反馈拉取进度给服务器。
            //2.如果连续3次从网络通道读取到0个字节,则结束本次读,返回true 。
            //3.如果读取到的字节数小于0 或发生IO 异常,则返回false 。
            int readSize = this.socketChannel.read(this.byteBufferRead);
            if (readSize > 0) {
                readSizeZeroTimes = 0;
                boolean result = this.dispatchReadRequest();
                if (!result) {
                    log.error("HAClient, dispatchReadRequest error");
                    return false;
                }
            } else if (readSize == 0) {
                if (++readSizeZeroTimes >= 3) {
                    break;
                }
            } else {
                log.info("HAClient, processReadEvent read socket < 0");
                return false;
            }
        } catch (IOException e) {
            log.info("HAClient, processReadEvent read socket exception", e);
            return false;
        }
    }
    return true;
}

HAClient#dispatchReadRequest()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
 * 分发读请求
 * @return boolean
 */
private boolean dispatchReadRequest() {
    final int msgHeaderSize = 8 + 4; // phyoffset + size
    int readSocketPos = this.byteBufferRead.position();

    while (true) {
        int diff = this.byteBufferRead.position() - this.dispatchPosition;
        //处理消息头
        if (diff >= msgHeaderSize) {
            //master物理偏移量
            long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
            int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
            //slave物理偏移量
            long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

            if (slavePhyOffset != 0) {
                if (slavePhyOffset != masterPhyOffset) {
                    log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
                        + slavePhyOffset + " MASTER: " + masterPhyOffset);
                    return false;
                }
            }
            //处理消息体
            if (diff >= (msgHeaderSize + bodySize)) {
                byte[] bodyData = new byte[bodySize];
                this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize);
                this.byteBufferRead.get(bodyData);
                //Slave追加消息至本地CommitLog
                HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);

                this.byteBufferRead.position(readSocketPos);
                this.dispatchPosition += msgHeaderSize + bodySize;

                if (!reportSlaveMaxOffsetPlus()) {
                    return false;
                }

                continue;
            }
        }
        if (!this.byteBufferRead.hasRemaining()) {
            this.reallocateByteBuffer();
        }
        break;
    }
    return true;
}