好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

RocketMQ设计之同步刷盘

同步刷盘方式: 在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。

在同步刷盘模式下,当消息写到内存后,会等待数据写到磁盘的 CommitLog 文件。

CommitLog的handleDiskFlush方法:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {

    // Synchronization flush

    if (FlushDiskType.SYNC_FLUSH == this .defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {

        final GroupCommitService service = (GroupCommitService) this .flushCommitLogService;

        if (messageExt.isWaitStoreMsgOK()) {

            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());

            service.putRequest(request);

            boolean flushOK = request.waitForFlush( this .defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());

            if (!flushOK) {

                log.error( "do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()

                    + " client address: " + messageExt.getBornHostString());

                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);

            }

        } else {

            service.wakeup();

        }

    }

    // Asynchronous flush

    else {

        if (! this .defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {

            flushCommitLogService.wakeup();

        } else {

            commitLogService.wakeup();

        }

    }

}

 

 

class GroupCommitService extends FlushCommitLogService {

        private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();

        private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();

 

         //提交刷盘任务到任务列表

        public synchronized void putRequest( final GroupCommitRequest request) {

            synchronized ( this .requestsWrite) {

                this .requestsWrite.add(request);

            }

            if (hasNotified.compareAndSet( false , true )) {

                waitPoint.countDown(); // notify

            }

        }

 

        private void swapRequests() {

            List<GroupCommitRequest> tmp = this .requestsWrite;

            this .requestsWrite = this .requestsRead;

            this .requestsRead = tmp;

        }

 

        private void doCommit() {

            synchronized ( this .requestsRead) {

                if (! this .requestsRead.isEmpty()) {

                    for (GroupCommitRequest req : this .requestsRead) {

                        // There may be a message in the next file, so a maximum of

                        // two times the flush

                        boolean flushOK = false ;

                        for ( int i = 0 ; i < 2 && !flushOK; i++) {

                            flushOK = CommitLog. this .mappedFileQueue.getFlushedWhere() >= req.getNextOffset();

 

                            if (!flushOK) {

                                CommitLog. this .mappedFileQueue.flush( 0 );

                            }

                        }

 

                        req.wakeupCustomer(flushOK);

                    }

 

                    long storeTimestamp = CommitLog. this .mappedFileQueue.getStoreTimestamp();

                    if (storeTimestamp > 0 ) {

                        CommitLog. this .defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);

                    }

 

                    this .requestsRead.clear();

                } else {

                    // Because of individual messages is set to not sync flush, it

                    // will come to this process

                    CommitLog. this .mappedFileQueue.flush( 0 );

                }

            }

        }

 

        public void run() {

            CommitLog.log.info( this .getServiceName() + " service started" );

 

            while (! this .isStopped()) {

                try {

                    this .waitForRunning( 10 );

                    this .doCommit();

                } catch (Exception e) {

                    CommitLog.log.warn( this .getServiceName() + " service has exception. " , e);

                }

            }

 

            // Under normal circumstances shutdown, wait for the arrival of the

            // request, and then flush

            try {

                Thread.sleep( 10 );

            } catch (InterruptedException e) {

                CommitLog.log.warn( "GroupCommitService Exception, " , e);

            }

 

            synchronized ( this ) {

                this .swapRequests();

            }

 

            this .doCommit();

 

            CommitLog.log.info( this .getServiceName() + " service end" );

        }

 

        @Override

        protected void onWaitEnd() {

            this .swapRequests();

        }

 

        @Override

        public String getServiceName() {

            return GroupCommitService. class .getSimpleName();

        }

 

        @Override

        public long getJointime() {

            return 1000 * 60 * 5 ;

        }

    }

GroupCommitRequest 是刷盘任务,提交刷盘任务后,会在刷盘队列中等待刷盘,而刷盘线程

GroupCommitService 每隔10毫秒写一批数据到磁盘。之所以不直接写是磁盘io压力大,写入性能低,每隔10毫秒写一次可以提升磁盘io效率和写入性能。

putRequest(request) 提交刷盘任务到任务列表 request.waitForFlush同步等待 GroupCommitService 将任务列表中的任务刷盘完成。

两个队列读写分离, requestsWrite 是写队列,用户保存添加进来的刷盘任务, requestsRead 是读队列,在刷盘之前会把写队列的数据放入读队列。

CommitLog的doCommit方法:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

private void doCommit() {

            synchronized ( this .requestsRead) {

                if (! this .requestsRead.isEmpty()) {

                    for (GroupCommitRequest req : this .requestsRead) {

                        // There may be a message in the next file, so a maximum of

                        // two times the flush

                        boolean flushOK = false ;

                        for ( int i = 0 ; i < 2 && !flushOK; i++) {

                            //根据offset确定是否已经刷盘

                            flushOK = CommitLog. this .mappedFileQueue.getFlushedWhere() >= req.getNextOffset();

 

                            if (!flushOK) {

                                CommitLog. this .mappedFileQueue.flush( 0 );

                            }

                        }

 

                        req.wakeupCustomer(flushOK);

                    }

 

                    long storeTimestamp = CommitLog. this .mappedFileQueue.getStoreTimestamp();

                    if (storeTimestamp > 0 ) {

                        CommitLog. this .defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);

                    }

                     //清空已刷盘的列表

                    this .requestsRead.clear();

                } else {

                    // Because of individual messages is set to not sync flush, it

                    // will come to this process

                    CommitLog. this .mappedFileQueue.flush( 0 );

                }

            }

        }

刷盘的时候依次读取 requestsRead 中的数据写入磁盘, 写入完成后清空 requestsRead 。

读写分离设计的目的是在刷盘时不影响任务提交到列表。

CommitLog.this.mappedFileQueue.flush(0);是刷盘操作:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

public boolean flush( final int flushLeastPages) {

    boolean result = true ;

    MappedFile mappedFile = this .findMappedFileByOffset( this .flushedWhere, this .flushedWhere == 0 );

    if (mappedFile != null ) {

        long tmpTimeStamp = mappedFile.getStoreTimestamp();

        int offset = mappedFile.flush(flushLeastPages);

        long where = mappedFile.getFileFromOffset() + offset;

        result = where == this .flushedWhere;

        this .flushedWhere = where;

        if ( 0 == flushLeastPages) {

            this .storeTimestamp = tmpTimeStamp;

        }

    }

 

    return result;

}

通过MappedFile映射的CommitLog文件写入磁盘

这就是RocketMQ高可用设计之同步刷盘的基本情况了,大体思路就是一个读写分离的队列来刷盘,同步刷盘任务提交后会在刷盘队列中等待刷盘完成后再返回,而GroupCommitService每隔10毫秒写一批数据到磁盘。

到此这篇关于RocketMQ设计之同步刷盘的文章就介绍到这了,更多相关RocketMQ同步刷盘内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

原文链接:https://blog.51cto.com/u_15460453/5075812

查看更多关于RocketMQ设计之同步刷盘的详细内容...

  阅读:12次