今天來(lái)分享 RocketMQ 的定時(shí)任務(wù)。通過(guò)這些定時(shí)任務(wù),能讓我們更加理解 RocketMQ 的消息處理機(jī)制和設(shè)計(jì)理念。
從 RocketMQ 4.9.4 的源代碼上看,RocketMQ 的定時(shí)任務(wù)有很多,今天主要講解一些核心的定時(shí)任務(wù)。
1 架構(gòu)回顧
首先再來(lái)回顧一下 RocketMQ 的架構(gòu)圖:
Name Server 集群部署,但是節(jié)點(diǎn)之間并不會(huì)同步數(shù)據(jù),因?yàn)槊總€(gè)節(jié)點(diǎn)都會(huì)保存完整的數(shù)據(jù)。因此單個(gè)節(jié)點(diǎn)掛掉,并不會(huì)對(duì)集群產(chǎn)生影響。
Broker 可以采用主從集群部署,實(shí)現(xiàn)多副本存儲(chǔ)和高可用。每個(gè) Broker 節(jié)點(diǎn)都要跟所有的 Name Server 節(jié)點(diǎn)建立長(zhǎng)連接,定義注冊(cè) Topic 路由信息和發(fā)送心跳。
Producer 和 Consumer 跟 Name Server 的任意一個(gè)節(jié)點(diǎn)建立長(zhǎng)連接,定期從 Name Server 拉取 Topic 路由信息。
2 Producer 和 Consumer
2.1 獲取 NameServer 地址
Producer 和 Consumer 要跟 Name Server 建立連接,就必須首先獲取 Name Server 地址。Producer 和 Consumer 采用定時(shí)任務(wù)每?jī)煞昼姭@取 Name Server 地址并更新本地緩存。代碼如下:
//MQClientInstance類 this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr(); ??}?catch?(Exception?e)?{ ???log.error("ScheduledTask?fetchNameServerAddr?exception",?e); ??} ?} },?1000?*?10,?1000?*?60?*?2,?TimeUnit.MILLISECONDS);
2.2 更新路由信息
Producer 和 Consumer 會(huì)定時(shí)從 Name Server 獲取定時(shí)訂閱信息,更新本地緩存,默認(rèn)間隔是 30s(可以配置)。代碼如下
?
//MQClientInstance類 this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???MQClientInstance.this.updateTopicRouteInfoFromNameServer(); ??}?catch?(Exception?e)?{ ???log.error("ScheduledTask?updateTopicRouteInfoFromNameServer?exception",?e); ??} ?} },?10,?this.clientConfig.getPollNameServerInterval(),?TimeUnit.MILLISECONDS);
?
?
2.3 向 Broker 發(fā)送心跳
Producer 和 Consumer 會(huì)從本地緩存的 Broker 列表中定時(shí)清除離線的 Broker,并且向 Broker 發(fā)送心跳,默認(rèn)間隔是 30s(可以配置)。代碼如下:
//MQClientInstance類 this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???MQClientInstance.this.cleanOfflineBroker(); ???MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); ??}?catch?(Exception?e)?{ ???log.error("ScheduledTask?sendHeartbeatToAllBroker?exception",?e); ??} ?} },?1000,?this.clientConfig.getHeartbeatBrokerInterval(),?TimeUnit.MILLISECONDS);
?
?
2.4 持久化 Offset
消費(fèi)者需要定時(shí)持久化 MessageQueue 的偏移量,默認(rèn)每 5s 更新一次(可以配置)。
注意:集群模式需要向 Broker 發(fā)送持久化消息,因?yàn)榧耗J狡屏勘4嬖?Broker 端,而廣播模式只需要把偏移量保存在消費(fèi)者本地文件。 代碼如下:
?
?
//MQClientInstance類 this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???MQClientInstance.this.persistAllConsumerOffset(); ??}?catch?(Exception?e)?{ ???log.error("ScheduledTask?persistAllConsumerOffset?exception",?e); ??} ?} },?1000?*?10,?this.clientConfig.getPersistConsumerOffsetInterval(),?TimeUnit.MILLISECONDS);
?
?
2.5 調(diào)整核心線程數(shù)
對(duì)于消費(fèi)者采用推模式的情況,消費(fèi)者會(huì)根據(jù)未消費(fèi)的消息數(shù)量,定期更新核心線程數(shù),默認(rèn)每 1m 一次。
注意:在 4.9.4 這個(gè)版本,更新核心線程數(shù)的代碼并沒(méi)有實(shí)現(xiàn),只是預(yù)留了接口。 代碼如下:
?
?
//MQClientInstance類 this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???MQClientInstance.this.adjustThreadPool(); ??}?catch?(Exception?e)?{ ???log.error("ScheduledTask?adjustThreadPool?exception",?e); ??} ?} },?1,?1,?TimeUnit.MINUTES);
?
?
2.6 失效過(guò)期請(qǐng)求
Producer 和 Consumer 會(huì)定時(shí)掃描緩存在本地的請(qǐng)求,如果請(qǐng)求開(kāi)始時(shí)間加超時(shí)時(shí)間(再加 1s)小于當(dāng)前時(shí)間,則這個(gè)請(qǐng)求過(guò)期。通過(guò)定時(shí)任務(wù)(3s 一次)讓過(guò)期請(qǐng)求失效,并且觸發(fā)回調(diào)函數(shù)。
?
?
//NettyRemotingClient.java this.timer.scheduleAtFixedRate(new?TimerTask()?{ ?@Override ?public?void?run()?{ ??try?{ ???NettyRemotingClient.this.scanResponseTable(); ??}?catch?(Throwable?e)?{ ???log.error("scanResponseTable?exception",?e); ??} ?} },?1000?*?3,?1000);
?
?
2.7 生產(chǎn)者
2.7.1 性能記錄
生產(chǎn)者發(fā)送消息后,會(huì)對(duì)成功失敗的狀態(tài)、花費(fèi)時(shí)間進(jìn)行記錄,以此來(lái)計(jì)算吞吐量 TPS,響應(yīng)時(shí)間 RT,代碼如下:
?
?
//Producer.java executorService.scheduleAtFixedRate(new?TimerTask()?{ ?@Override ?public?void?run()?{ ??snapshotList.addLast(statsBenchmark.createSnapshot()); ??if?(snapshotList.size()?>?10)?{ ???snapshotList.removeFirst(); ??} ?} },?1000,?1000,?TimeUnit.MILLISECONDS); executorService.scheduleAtFixedRate(new?TimerTask()?{ ?private?void?printStats()?{ ??if?(snapshotList.size()?>=?10)?{ ???doPrintStats(snapshotList,??statsBenchmark,?false); ??} ?} ?@Override ?public?void?run()?{ ??try?{ ???this.printStats(); ??}?catch?(Exception?e)?{ ???e.printStackTrace(); ??} ?} },?10000,?10000,?TimeUnit.MILLISECONDS);
?
?
2.8 消費(fèi)者
2.8.1 MessageQueue 加鎖
對(duì)于順序消息,要保證同一個(gè) MessageQueue 只能被同一個(gè) Consumer 消費(fèi)。消費(fèi)者初始化的時(shí)候,會(huì)啟動(dòng)一個(gè)定時(shí)任務(wù),定時(shí)(默認(rèn) 20s,可以配置)地向 Broker 發(fā)送鎖定消息,Broker 收到請(qǐng)求后,就會(huì)把 MessageQueue、group 和 clientId 進(jìn)行綁定,這樣其他客戶端就不能從這個(gè) MessageQueue 拉取消息。
代碼如下:
?
?
//ConsumeMessageOrderlyService.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???ConsumeMessageOrderlyService.this.lockMQPeriodically(); ??}?catch?(Throwable?e)?{ ???log.error("scheduleAtFixedRate?lockMQPeriodically?exception",?e); ??} ?} },?1000?*?1,?ProcessQueue.REBALANCE_LOCK_INTERVAL,?TimeUnit.MILLISECONDS);
?
?
注意:Broker 的加鎖是有時(shí)效的(默認(rèn) 60s,可以配置),過(guò)期后,有可能被其他 Consumer 進(jìn)行消費(fèi)。
2.8.2 性能快照
Consumer 每秒會(huì)記錄一次性能快照,比如消息從創(chuàng)建到消費(fèi)花費(fèi)的時(shí)間,消息從保存到消費(fèi)花費(fèi)的時(shí)間,接收到消息的總數(shù)量,失敗總數(shù)量。代碼如下:
?
?
//Consumer.java executorService.scheduleAtFixedRate(new?TimerTask()?{ ?@Override ?public?void?run()?{ ??snapshotList.addLast(statsBenchmarkConsumer.createSnapshot()); ??if?(snapshotList.size()?>?10)?{ ???snapshotList.removeFirst(); ??} ?} },?1000,?1000,?TimeUnit.MILLISECONDS);
?
?
上面記錄了性能快照后,Consumer 會(huì)每隔 10s 進(jìn)行性能參數(shù)計(jì)算和打印。代碼如下:
?
?
//Consumer.java executorService.scheduleAtFixedRate(new?TimerTask()?{ private?void?printStats()?{ ?if?(snapshotList.size()?>=?10)?{ ??Long[]?begin?=?snapshotList.getFirst(); ??Long[]?end?=?snapshotList.getLast(); ??final?long?consumeTps?= ???(long)?(((end[1]?-?begin[1])?/?(double)?(end[0]?-?begin[0]))?*?1000L); ??final?double?averageB2CRT?=?(end[2]?-?begin[2])?/?(double)?(end[1]?-?begin[1]); ??final?double?averageS2CRT?=?(end[3]?-?begin[3])?/?(double)?(end[1]?-?begin[1]); ??final?long?failCount?=?end[4]?-?begin[4]; ??final?long?b2cMax?=?statsBenchmarkConsumer.getBorn2ConsumerMaxRT().get(); ??final?long?s2cMax?=?statsBenchmarkConsumer.getStore2ConsumerMaxRT().get(); ??statsBenchmarkConsumer.getBorn2ConsumerMaxRT().set(0); ??statsBenchmarkConsumer.getStore2ConsumerMaxRT().set(0); ??System.out.printf("Current?Time:?%s?TPS:?%d?FAIL:?%d?AVG(B2C)?RT(ms):?%7.3f?AVG(S2C)?RT(ms):?%7.3f?MAX(B2C)?RT(ms):?%d?MAX(S2C)?RT(ms):?%d%n", ????System.currentTimeMillis(),?consumeTps,?failCount,?averageB2CRT,?averageS2CRT,?b2cMax,?s2cMax ??); ?} }
?
?
通過(guò)性能參數(shù)的日志輸出,可以很方便的對(duì) RocketMQ 的消費(fèi)者進(jìn)行監(jiān)控。
2.8.3 清除過(guò)期消息
消費(fèi)者會(huì)定期檢查本地拉取的消息列表,如果列表中的消息已經(jīng)過(guò)期(默認(rèn) 15 分鐘過(guò)期,可以配置),則把過(guò)期消息再次發(fā)送給 Broker,然后從本地消息列表刪除。代碼如下:
?
?
//ConsumeMessageConcurrentlyService.java this.cleanExpireMsgExecutors.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???cleanExpireMsg(); ??}?catch?(Throwable?e)?{ ???log.error("scheduleAtFixedRate?cleanExpireMsg?exception",?e); ??} ?} },?this.defaultMQPushConsumer.getConsumeTimeout(),?this.defaultMQPushConsumer.getConsumeTimeout(),?TimeUnit.MINUTES);
?
?
2.8.4 清除過(guò)期消息
消費(fèi)者會(huì)每隔 30s 向 NameServer 拉取 MessageQueue 信息,然后跟本地保存的進(jìn)行比較,如果不一致,則更新本地緩存信息。代碼如下:
?
?
//DefaultLitePullConsumerImpl.java scheduledExecutorService.scheduleAtFixedRate( new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???fetchTopicMessageQueuesAndCompare(); ??}?catch?(Exception?e)?{ ???log.error("ScheduledTask?fetchMessageQueuesAndCompare?exception",?e); ??} ?} },?1000?*?10,?this.getDefaultLitePullConsumer().getTopicMetadataCheckIntervalMillis(),?TimeUnit.MILLISECONDS);
?
?
3 Broker
3.1 狀態(tài)采樣
Broker 端會(huì)對(duì)狀態(tài)進(jìn)行采用,比如一個(gè) Topic、MessageQueue、Group 總共發(fā)送了多少條消息,Topic 總共發(fā)送的消息大小。Broker 會(huì)對(duì)這些狀態(tài)按照秒、分鐘、小時(shí)為單位進(jìn)行采樣并且定時(shí)打印,這里一共有 6 個(gè)定時(shí)任務(wù)。比如下面是按照秒進(jìn)行采樣的定時(shí)任務(wù):
?
?
//StatsItemSet.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???samplingInSeconds(); ??}?catch?(Throwable?ignored)?{ ??} ?} },?0,?10,?TimeUnit.SECONDS);
?
?
3.2 記錄消息延時(shí)
Broker 讀取消息時(shí)會(huì)記錄消息從保存磁盤到被讀取的時(shí)間差并定時(shí)打印。定時(shí)任務(wù)代碼如下:
?
?
//MomentStatsItemSet.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???printAtMinutes(); ??}?catch?(Throwable?ignored)?{ ??} ?} },?Math.abs(UtilAll.computeNextMinutesTimeMillis()?-?System.currentTimeMillis()),?1000?*?60?*?5,?TimeUnit.MILLISECONDS);
?
?
3.3 持久化數(shù)據(jù)
Broker 會(huì)定時(shí)持久化消費(fèi)偏移量、Topic 配置、定閱組配置等,默認(rèn) 10s 一次(可以配置)。代碼如下:
?
?
//ScheduleMessageService.java this.deliverExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???if?(started.get())?{ ????ScheduleMessageService.this.persist(); ???} ??}?catch?(Throwable?e)?{ ???log.error("scheduleAtFixedRate?flush?exception",?e); ??} ?} },?10000,?this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval(),?TimeUnit.MILLISECONDS);
?
?
3.4 失效過(guò)期請(qǐng)求
Broker 會(huì)定時(shí)掃描緩存在本地的請(qǐng)求,如果請(qǐng)求開(kāi)始時(shí)間加超時(shí)時(shí)間(再加 1s)小于當(dāng)前時(shí)間,則這個(gè)請(qǐng)求過(guò)期。通過(guò)定時(shí)任務(wù)(3s 一次)讓過(guò)期請(qǐng)求失效,并且觸發(fā)回調(diào)函數(shù)。
?
?
//NettyRemotingServer.java this.timer.scheduleAtFixedRate(new?TimerTask()?{ ?@Override ?public?void?run()?{ ??try?{ ???NettyRemotingServer.this.scanResponseTable(); ??}?catch?(Throwable?e)?{ ???log.error("scanResponseTable?exception",?e); ??} ?} },?1000?*?3,?1000);
?
?
3.5 過(guò)濾服務(wù)
消費(fèi)者可能會(huì)向 Broker 注冊(cè) filterClass 用來(lái)過(guò)濾消息。Broker 收到消費(fèi)者注冊(cè)的 filterClass 后會(huì)用定時(shí)任務(wù)來(lái)創(chuàng)建 FilterServer。代碼如下:
?
?
//FilterServerManager.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???FilterServerManager.this.createFilterServer(); ??}?catch?(Exception?e)?{ ???log.error("",?e); ??} ?} },?1000?*?5,?1000?*?30,?TimeUnit.MILLISECONDS);
?
?
這樣消費(fèi)者拉取消息時(shí)首先從 FilterServer 拉取消息,F(xiàn)ilterServer 從 Broker 拉取消息后進(jìn)行過(guò)濾,只把消費(fèi)者感興趣的消息返回給消費(fèi)者。一個(gè) Broker 可以有多個(gè) FilterServer。如下圖:
3.6 記錄消息總量
Broker 每天會(huì)記錄前一天收發(fā)消息的總數(shù)量,定時(shí)任務(wù)如下(period 是 1 天):
?
?
//BrokerController.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???BrokerController.this.getBrokerStats().record(); ??}?catch?(Throwable?e)?{ ???log.error("schedule?record?error.",?e); ??} ?} },?initialDelay,?period,?TimeUnit.MILLISECONDS);
?
?
3.7 持久化 Offset
Broker 默認(rèn)每隔 5s(可以配置) 會(huì)持久化一次消息的 Offset,代碼如下:
?
?
//BrokerController.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???BrokerController.this.consumerOffsetManager.persist(); ??}?catch?(Throwable?e)?{ ???log.error("schedule?persist?consumerOffset?error.",?e); ??} ?} },?1000?*?10,?this.brokerConfig.getFlushConsumerOffsetInterval(),?TimeUnit.MILLISECONDS);
?
?
3.8 持久化過(guò)濾參數(shù)
上面提到過(guò),消費(fèi)者可能會(huì)向 Broker 注冊(cè) filterClass,Broker 解析消費(fèi)者注冊(cè)的 filterClass 后,會(huì)把解析后的 FilterData 持久化到文件,代碼如下:
?
?
//BrokerController.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???BrokerController.this.consumerFilterManager.persist(); ??}?catch?(Throwable?e)?{ ???log.error("schedule?persist?consumer?filter?error.",?e); ??} ?} },?1000?*?10,?1000?*?10,?TimeUnit.MILLISECONDS);
?
?
3.9 Broker 自我保護(hù)
當(dāng)消費(fèi)者讀取消息緩慢時(shí),Broker 為了保護(hù)自己,會(huì)把這個(gè)消費(fèi)者設(shè)置為不允許讀取的狀態(tài),這樣這個(gè)消費(fèi)組就不能再拉取消息了,代碼如下:
?
?
//BrokerController.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???BrokerController.this.protectBroker(); ??}?catch?(Throwable?e)?{ ???log.error("protectBroker?error.",?e); ??} ?} },?3,?3,?TimeUnit.MINUTES);
?
?
3.10 Broker 打印水位
Broker 會(huì)每隔 1s 打印一次水位,包括發(fā)送消息的延遲、接收消息的延遲、事務(wù)消息的延遲、查詢消息的延遲,代碼如下:
?
?
//BrokerController.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???BrokerController.this.printWaterMark(); ??}?catch?(Throwable?e)?{ ???log.error("printWaterMark?error.",?e); ??} ?} },?10,?1,?TimeUnit.SECONDS);
?
?
3.11 Broker 打印 Offset 差
Broker 會(huì)定時(shí)打印最新的消息 Offset 和已經(jīng)分發(fā)給 MessageQueue 和 Index 索引的 Offset 差距,代碼如下:
//BrokerController.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???log.info("dispatch?behind?commit?log?{}?bytes",?BrokerController.this.getMessageStore().dispatchBehindBytes()); ??}?catch?(Throwable?e)?{ ???log.error("schedule?dispatchBehindBytes?error.",?e); ??} ?} },?1000?*?10,?1000?*?60,?TimeUnit.MILLISECONDS);
3.12 獲取 NameServer 地址
Broker 會(huì)定期獲取 NameServer 的地址,并更新本地緩存,代碼如下:
//BrokerController.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???BrokerController.this.brokerOuterAPI.fetchNameServerAddr(); ??}?catch?(Throwable?e)?{ ???log.error("ScheduledTask?fetchNameServerAddr?exception",?e); ??} ?} },?1000?*?10,?1000?*?60?*?2,?TimeUnit.MILLISECONDS);
3.13 打印主從偏移量差距
Broker 會(huì)定時(shí)打印 master 節(jié)點(diǎn)和 slave 節(jié)點(diǎn)消息 Offset 的差距,代碼如下:
//BrokerController.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???BrokerController.this.printMasterAndSlaveDiff(); ??}?catch?(Throwable?e)?{ ???log.error("schedule?printMasterAndSlaveDiff?error.",?e); ??} ?} },?1000?*?10,?1000?*?60,?TimeUnit.MILLISECONDS);
3.14 向 NameServer 注冊(cè)
Broker 會(huì)定時(shí)向(默認(rèn) 30s,可配置,最高不超過(guò) 60s)所有 NameServer 發(fā)送注冊(cè)消息,代碼如下:
//BrokerController.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???BrokerController.this.registerBrokerAll(true,?false,?brokerConfig.isForceRegister()); ??}?catch?(Throwable?e)?{ ???log.error("registerBrokerAll?Exception",?e); ??} ?} },?1000?*?10,?Math.max(10000,?Math.min(brokerConfig.getRegisterNameServerPeriod(),?60000)),?TimeUnit.MILLISECONDS);
3.15 同步 Slave
Broker 的 Master 節(jié)點(diǎn)會(huì)每間隔 10s 向 Slave 節(jié)點(diǎn)同步數(shù)據(jù),包括 Topic 配置、消費(fèi)偏移量、延遲偏移量、消費(fèi)組配置,代碼如下:
//BrokerController.java slaveSyncFuture?=?this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???BrokerController.this.slaveSynchronize.syncAll(); ??} ??catch?(Throwable?e)?{ ???log.error("ScheduledTask?SlaveSynchronize?syncAll?error.",?e); ??} ?} },?1000?*?3,?1000?*?10,?TimeUnit.MILLISECONDS);
3.16 刪除過(guò)期文件
Broker 會(huì)周期性(默認(rèn) 10s,可以配置)地執(zhí)行刪除任務(wù),刪除過(guò)期的 CommitLog 文件和 ConsumeQueue 文件,代碼如下:
//DefaultMessageStore.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??DefaultMessageStore.this.cleanFilesPeriodically(); ?} },?1000?*?60,?this.messageStoreConfig.getCleanResourceInterval(),?TimeUnit.MILLISECONDS);
3.17 文件大小檢查
Broker 會(huì)每隔 10 分鐘檢查 CommitLog 文件和 ConsumeQueue 文件,用當(dāng)前文件的最?。ㄆ鹗迹?Offset 減去上一個(gè)文件最?。ㄆ鹗迹?Offset,如果不等于一個(gè)文件的大小,就說(shuō)明文件存在問(wèn)題。代碼如下:
//DefaultMessageStore.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??DefaultMessageStore.this.checkSelf(); ?} },?1,?10,?TimeUnit.MINUTES);
3.18 保存堆棧映射
Broker 會(huì)每隔 1s 記錄所有存活線程的堆棧映射信息,前提是 debugLockEnable 開(kāi)關(guān)配置是打開(kāi)的。代碼如下:
//DefaultMessageStore.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??if?(DefaultMessageStore.this.getMessageStoreConfig().isDebugLockEnable())?{ ???try?{ ????if?(DefaultMessageStore.this.commitLog.getBeginTimeInLock()?!=?0)?{ ?????long?lockTime?=?System.currentTimeMillis()?-?DefaultMessageStore.this.commitLog.getBeginTimeInLock(); ?????if?(lockTime?>?1000?&&?lockTime?10000000)?{ ??????String?stack?=?UtilAll.jstack(); ??????final?String?fileName?=?System.getProperty("user.home")?+?File.separator?+?"debug/lock/stack-" ???????+?DefaultMessageStore.this.commitLog.getBeginTimeInLock()?+?"-"?+?lockTime; ??????MixAll.string2FileNotSafe(stack,?fileName); ?????} ????} ???}?catch?(Exception?e)?{ ???} ??} ?} },?1,?1,?TimeUnit.SECONDS);
3.19 檢查物理磁盤
Broker 會(huì)每隔 10s 檢查保存 CommitLog 的磁盤空間是否達(dá)到閾值,如果達(dá)到,會(huì)打印 error 級(jí)別的日志。代碼如下:
//DefaultMessageStore.java this.diskCheckScheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?public?void?run()?{ ??DefaultMessageStore.this.cleanCommitLogService.isSpaceFull(); ?} },?1000L,?10000L,?TimeUnit.MILLISECONDS);
3.20 持久化延時(shí)消息偏移量
RocketMQ 的延時(shí)消費(fèi)分為 18 個(gè)級(jí)別,定義如下:
//ScheduleMessageService.java private?String?messageDelayLevel?=?"1s?5s?10s?30s?1m?2m?3m?4m?5m?6m?7m?8m?9m?10m?20m?30m?1h?2h";
RocketMQ 會(huì)為每個(gè)延時(shí)級(jí)別定義要給 ConsumeQueue,每隔 ConsumeQueue 都會(huì)有一個(gè) Offset,通過(guò) offsetTable(ConcurrentMap) 來(lái)記錄不同延時(shí)級(jí)別對(duì)應(yīng)的 Offset。
RocketMQ 會(huì)周期性地(默認(rèn) 10s,可以配置)把 offsetTable 中保存的 Offset 持久化到文件。代碼如下:
//DefaultMessageStore.java this.deliverExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???if?(started.get())?{ ????ScheduleMessageService.this.persist(); ???} ??}?catch?(Throwable?e)?{ ???log.error("scheduleAtFixedRate?flush?exception",?e); ??} ?} },?10000,?this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval(),?TimeUnit.MILLISECONDS);
3.21 關(guān)閉異常連接
Broker 會(huì)定時(shí)掃描所有的長(zhǎng)連接,主要包括生產(chǎn)者、消費(fèi)者和 FilterServer,如果連接不活躍,則關(guān)閉該連接,并從本地連接列表中移除。代碼如下:
//ClientHousekeepingService.java this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???ClientHousekeepingService.this.scanExceptionChannel(); ??}?catch?(Throwable?e)?{ ???log.error("Error?occurred?when?scan?not?active?client?channels.",?e); ??} ?} },?1000?*?10,?1000?*?10,?TimeUnit.MILLISECONDS);
3.22 清理過(guò)期消息
如果 Broker 配置了允許快速失?。╞rokerFastFailureEnable),則會(huì)每隔 10ms 定時(shí)清理過(guò)期請(qǐng)求,包括要發(fā)送的消息、接收的消息、心跳消息、要結(jié)束的事務(wù)消息。代碼如下:
scheduledExecutorService.scheduleAtFixedRate( new?Runnable()?{ ?@Override ?public?void?run()?{ ??try?{ ???fetchTopicMessageQueuesAndCompare(); ??}?catch?(Exception?e)?{ ???log.error("ScheduledTask?fetchMessageQueuesAndCompare?exception",?e); ??} ?} },?1000?*?10,?this.getDefaultLitePullConsumer().getTopicMetadataCheckIntervalMillis(),?TimeUnit.MILLISECONDS);
注意:清理消息前會(huì)判斷是否系統(tǒng)繁忙,如果系統(tǒng)繁忙,會(huì)給發(fā)送隊(duì)列中的消息直接返回系統(tǒng)繁忙,暫時(shí)不做過(guò)期消息清理。
4 NameServer
4.1 檢查過(guò)期 Broker
在 3.14 節(jié)中講過(guò),Broker 會(huì)跟 NameServer 建立長(zhǎng)連接,定時(shí)向 NameServer 發(fā)送注冊(cè)消息。NameServer 會(huì)在本地維護(hù)一個(gè) Broker 列表,定時(shí)任務(wù)會(huì)輪詢本地保存的 Broker 列表,檢查注冊(cè)消息是否過(guò)期(超過(guò) 120s),如果注冊(cè)消息過(guò)期,則關(guān)閉長(zhǎng)連接,從本地緩存刪除這個(gè) Broker。代碼如下:
//NamesrvController.java this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker,?5,?10,?TimeUnit.SECONDS);
4.2 打印配置
NameServer 啟動(dòng)時(shí),會(huì)加載 KV 格式的配置文件到 configTable 這個(gè)變量,NameServer 客戶端也可以發(fā)送一個(gè) KV 配置請(qǐng)求給 NameServer,NameServer 收到請(qǐng)求后也會(huì)保存到 configTable。
NameServer 會(huì)定時(shí)打印 configTable 中的配置,代碼如下:
//NamesrvController.java this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically,?1,?10,?TimeUnit.MINUTES);
5 總結(jié)
RocketMQ 的定時(shí)任務(wù)很多,這些定時(shí)任務(wù)的加入讓 RocketMQ 的設(shè)計(jì)更加完備,包括業(yè)務(wù)處理、監(jiān)控日志、心跳、清理任務(wù)、關(guān)閉連接、持久化數(shù)據(jù)等。通過(guò)對(duì)定時(shí)任務(wù)的理解,能夠更深入地理解 RocketMQ 的設(shè)計(jì)理念。
編輯:黃飛
?
評(píng)論
查看更多