同步刷盘方式: 在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。
在同步刷盘模式下,当消息写到内存后,会等待数据写到磁盘的 CommitLog 文件。
CommitLog的handleDiskFlush方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { // Synchronization flush if (FlushDiskType.SYNC_FLUSH == this .defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { final GroupCommitService service = (GroupCommitService) this .flushCommitLogService; if (messageExt.isWaitStoreMsgOK()) { GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); boolean flushOK = request.waitForFlush( this .defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { log.error( "do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } } else { service.wakeup(); } } // Asynchronous flush else { if (! this .defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { flushCommitLogService.wakeup(); } else { commitLogService.wakeup(); } } }
class GroupCommitService extends FlushCommitLogService { private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>(); private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
//提交刷盘任务到任务列表 public synchronized void putRequest( final GroupCommitRequest request) { synchronized ( this .requestsWrite) { this .requestsWrite.add(request); } if (hasNotified.compareAndSet( false , true )) { waitPoint.countDown(); // notify } }
private void swapRequests() { List<GroupCommitRequest> tmp = this .requestsWrite; this .requestsWrite = this .requestsRead; this .requestsRead = tmp; }
private void doCommit() { synchronized ( this .requestsRead) { if (! this .requestsRead.isEmpty()) { for (GroupCommitRequest req : this .requestsRead) { // There may be a message in the next file, so a maximum of // two times the flush boolean flushOK = false ; for ( int i = 0 ; i < 2 && !flushOK; i++) { flushOK = CommitLog. this .mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
if (!flushOK) { CommitLog. this .mappedFileQueue.flush( 0 ); } }
req.wakeupCustomer(flushOK); }
long storeTimestamp = CommitLog. this .mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0 ) { CommitLog. this .defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); }
this .requestsRead.clear(); } else { // Because of individual messages is set to not sync flush, it // will come to this process CommitLog. this .mappedFileQueue.flush( 0 ); } } }
public void run() { CommitLog.log.info( this .getServiceName() + " service started" );
while (! this .isStopped()) { try { this .waitForRunning( 10 ); this .doCommit(); } catch (Exception e) { CommitLog.log.warn( this .getServiceName() + " service has exception. " , e); } }
// Under normal circumstances shutdown, wait for the arrival of the // request, and then flush try { Thread.sleep( 10 ); } catch (InterruptedException e) { CommitLog.log.warn( "GroupCommitService Exception, " , e); }
synchronized ( this ) { this .swapRequests(); }
this .doCommit();
CommitLog.log.info( this .getServiceName() + " service end" ); }
@Override protected void onWaitEnd() { this .swapRequests(); }
@Override public String getServiceName() { return GroupCommitService. class .getSimpleName(); }
@Override public long getJointime() { return 1000 * 60 * 5 ; } } |
GroupCommitRequest 是刷盘任务,提交刷盘任务后,会在刷盘队列中等待刷盘,而刷盘线程
GroupCommitService 每隔10毫秒写一批数据到磁盘。之所以不直接写是磁盘io压力大,写入性能低,每隔10毫秒写一次可以提升磁盘io效率和写入性能。
putRequest(request) 提交刷盘任务到任务列表 request.waitForFlush同步等待 GroupCommitService 将任务列表中的任务刷盘完成。两个队列读写分离, requestsWrite 是写队列,用户保存添加进来的刷盘任务, requestsRead 是读队列,在刷盘之前会把写队列的数据放入读队列。
CommitLog的doCommit方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
private void doCommit() { synchronized ( this .requestsRead) { if (! this .requestsRead.isEmpty()) { for (GroupCommitRequest req : this .requestsRead) { // There may be a message in the next file, so a maximum of // two times the flush boolean flushOK = false ; for ( int i = 0 ; i < 2 && !flushOK; i++) { //根据offset确定是否已经刷盘 flushOK = CommitLog. this .mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
if (!flushOK) { CommitLog. this .mappedFileQueue.flush( 0 ); } }
req.wakeupCustomer(flushOK); }
long storeTimestamp = CommitLog. this .mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0 ) { CommitLog. this .defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } //清空已刷盘的列表 this .requestsRead.clear(); } else { // Because of individual messages is set to not sync flush, it // will come to this process CommitLog. this .mappedFileQueue.flush( 0 ); } } } |
刷盘的时候依次读取 requestsRead 中的数据写入磁盘, 写入完成后清空 requestsRead 。
读写分离设计的目的是在刷盘时不影响任务提交到列表。
CommitLog.this.mappedFileQueue.flush(0);是刷盘操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
public boolean flush( final int flushLeastPages) { boolean result = true ; MappedFile mappedFile = this .findMappedFileByOffset( this .flushedWhere, this .flushedWhere == 0 ); if (mappedFile != null ) { long tmpTimeStamp = mappedFile.getStoreTimestamp(); int offset = mappedFile.flush(flushLeastPages); long where = mappedFile.getFileFromOffset() + offset; result = where == this .flushedWhere; this .flushedWhere = where; if ( 0 == flushLeastPages) { this .storeTimestamp = tmpTimeStamp; } }
return result; } |
通过MappedFile映射的CommitLog文件写入磁盘
这就是RocketMQ高可用设计之同步刷盘的基本情况了,大体思路就是一个读写分离的队列来刷盘,同步刷盘任务提交后会在刷盘队列中等待刷盘完成后再返回,而GroupCommitService每隔10毫秒写一批数据到磁盘。
到此这篇关于RocketMQ设计之同步刷盘的文章就介绍到这了,更多相关RocketMQ同步刷盘内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!
原文链接:https://blog.51cto.com/u_15460453/5075812