好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

RocketMQ设计之异步刷盘

上一篇 RocketMQ设计之同步刷盘

异步刷盘方式: 在返回写成功状态时,消息可能只是被写入了内存的 PAGECACHE ,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入

RocketMQ 默认采用异步刷盘,异步刷盘两种策略:开启缓冲池,不开启缓冲池

CommitLog的handleDiskFlush方法:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {

    // Synchronization flush

    if (FlushDiskType.SYNC_FLUSH == this .defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {

        final GroupCommitService service = (GroupCommitService) this .flushCommitLogService;

        if (messageExt.isWaitStoreMsgOK()) {

            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());

            service.putRequest(request);

            boolean flushOK = request.waitForFlush( this .defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());

            if (!flushOK) {

                log.error( "do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()

                    + " client address: " + messageExt.getBornHostString());

                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);

            }

        } else {

            service.wakeup();

        }

    }

    // Asynchronous flush

    else {

        if (! this .defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {

            flushCommitLogService.wakeup();

        } else {

            commitLogService.wakeup();

        }

    }

}

不开启缓冲池: 默认不开启,刷盘线程 FlushRealTimeService 会每间隔500毫秒尝试去刷盘。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

class FlushRealTimeService extends FlushCommitLogService {

    private long lastFlushTimestamp = 0 ;

    private long printTimes = 0 ;

 

    public void run() {

        CommitLog.log.info( this .getServiceName() + " service started" );

 

        while (! this .isStopped()) {

            boolean flushCommitLogTimed = CommitLog. this .defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();

 

            //每次Flush间隔500毫秒

            int interval = CommitLog. this .defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();

            //每次Flush最少4页内存数据(16KB)

            int flushPhysicQueueLeastPages = CommitLog. this .defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();

 

                //距离上次刷盘时间阈值为10秒

            int flushPhysicQueueThoroughInterval =

                CommitLog. this .defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();

 

            boolean printFlushProgress = false ;

 

            // Print flush progress

            long currentTimeMillis = System.currentTimeMillis();

            if (currentTimeMillis >= ( this .lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {

                this .lastFlushTimestamp = currentTimeMillis;

                flushPhysicQueueLeastPages = 0 ;

                printFlushProgress = (printTimes++ % 10 ) == 0 ;

            }

 

            try {

                if (flushCommitLogTimed) {

                    Thread.sleep(interval);

                } else {

                    this .waitForRunning(interval);

                }

 

                if (printFlushProgress) {

                    this .printFlushProgress();

                }

 

                long begin = System.currentTimeMillis();

                CommitLog. this .mappedFileQueue.flush(flushPhysicQueueLeastPages);

                long storeTimestamp = CommitLog. this .mappedFileQueue.getStoreTimestamp();

                if (storeTimestamp > 0 ) {

                    CommitLog. this .defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);

                }

                long past = System.currentTimeMillis() - begin;

                if (past > 500 ) {

                    log.info( "Flush data to disk costs {} ms" , past);

                }

            } catch (Throwable e) {

                CommitLog.log.warn( this .getServiceName() + " service has exception. " , e);

                this .printFlushProgress();

            }

        }

 

        // Normal shutdown, to ensure that all the flush before exit

        boolean result = false ;

        for ( int i = 0 ; i < RETRY_TIMES_OVER && !result; i++) {

            result = CommitLog. this .mappedFileQueue.flush( 0 );

            CommitLog.log.info( this .getServiceName() + " service shutdown, retry " + (i + 1 ) + " times " + (result ? "OK" : "Not OK" ));

        }

 

        this .printFlushProgress();

 

        CommitLog.log.info( this .getServiceName() + " service end" );

    }

 

    @Override

    public String getServiceName() {

        return FlushRealTimeService. class .getSimpleName();

    }

 

    private void printFlushProgress() {

        // CommitLog.log.info("how much disk fall behind memory, "

        // + CommitLog.this.mappedFileQueue.howMuchFallBehind());

    }

 

    @Override

    public long getJointime() {

        return 1000 * 60 * 5 ;

    }

}

判断是否超过10秒没刷盘了,如果超过强制刷盘 等待Flush间隔500ms 通过 MappedFile 刷盘 设置 StoreCheckpoint 刷盘时间点 超过500ms的刷盘记录日志 Broker正常停止前,把内存page中的数据刷盘

开启缓冲池:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

class CommitRealTimeService extends FlushCommitLogService {

 

    private long lastCommitTimestamp = 0 ;

 

    @Override

    public String getServiceName() {

        return CommitRealTimeService. class .getSimpleName();

    }

 

    @Override

    public void run() {

        CommitLog.log.info( this .getServiceName() + " service started" );

        while (! this .isStopped()) {

            //每次提交间隔200毫秒

            int interval = CommitLog. this .defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();

 

            //每次提交最少4页内存数据(16KB)

            int commitDataLeastPages = CommitLog. this .defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();

 

            //距离上次提交时间阈值为200毫秒

            int commitDataThoroughInterval =

                CommitLog. this .defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();

 

            long begin = System.currentTimeMillis();

            if (begin >= ( this .lastCommitTimestamp + commitDataThoroughInterval)) {

                this .lastCommitTimestamp = begin;

                commitDataLeastPages = 0 ;

            }

 

            try {

                boolean result = CommitLog. this .mappedFileQueue.commit(commitDataLeastPages);

                long end = System.currentTimeMillis();

                if (!result) {

                    this .lastCommitTimestamp = end; // result = false means some data committed.

                    //now wake up flush thread.

                    flushCommitLogService.wakeup();

                }

 

                if (end - begin > 500 ) {

                    log.info( "Commit data to file costs {} ms" , end - begin);

                }

                this .waitForRunning(interval);

            } catch (Throwable e) {

                CommitLog.log.error( this .getServiceName() + " service has exception. " , e);

            }

        }

 

        boolean result = false ;

        for ( int i = 0 ; i < RETRY_TIMES_OVER && !result; i++) {

            result = CommitLog. this .mappedFileQueue.commit( 0 );

            CommitLog.log.info( this .getServiceName() + " service shutdown, retry " + (i + 1 ) + " times " + (result ? "OK" : "Not OK" ));

        }

        CommitLog.log.info( this .getServiceName() + " service end" );

    }

}

RocketMQ 申请一块和 CommitLog 文件相同大小的堆外内存来做缓冲池,数据会先写入缓冲池,提交线程CommitRealTimeService也每间隔500毫秒尝试提交到文件通道等待刷盘,刷盘最终由 FlushRealTimeService 来完成,和不开启缓冲池的处理一致。使用缓冲池的目的是多条数据合并写入,从而提高io性能。

判断是否超过200毫秒没提交,需要强制提交 提交到 MappedFile ,此时还未刷盘 然后唤醒刷盘线程 在Broker正常停止前,提交内存page中的数据

到此这篇关于RocketMQ设计之异步刷盘的文章就介绍到这了,更多相关RocketMQ异步刷盘内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

原文链接:https://blog.51cto.com/u_15460453/5075814

查看更多关于RocketMQ设计之异步刷盘的详细内容...

  阅读:14次