RocketMQ——消息文件过期原理

发布时间:2022-06-08 发布网站:脚本宝典
脚本宝典收集整理的这篇文章主要介绍了RocketMQ——消息文件过期原理脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。

 过期清理

所有的消费均是客户端发起Pull请求的,告诉消息的offset位置,broker去查询并返回。但是有一点需要非常明确的是,消息消费后,消息其实并没有物理地被清除,这是一个非常特殊的设计。本文来探索此设计的一些细节。

消费完后的消息去哪里了?

消息的存储是一直存在于CommitLog中的。而由于CommitLog是以文件为单位(而非消息)存在的,CommitLog的设计是只允许顺序写的,且每个消息大小不定长,所以这决定了消息文件几乎不可能按照消息为单位删除(否则性能会极具下降,逻辑也非常复杂)。所以消息被消费了,消息所占据的物理空间并不会立刻被回收。

但消息既然一直没有删除,那RocketMQ怎么知道应该投递过的消息就不再投递?——答案是客户端自身维护——客户端拉取完消息之后,在响应体中,broker会返回下一次应该拉取的位置,PushConsumer通过这一个位置,更新自己下一次的pull请求。这样就保证了正常情况下,消息只会被投递一次。

什么时候清理物理消息文件?

那消息文件到底删不删,什么时候删?

消息存储在CommitLog之后,的确是会被清理的,但是这个清理只会在以下任一条件成立才会批量删除消息文件(CommitLog):

  1. 消息文件过期(默认72小时),且到达清理时点(默认是凌晨4点),删除过期文件。
  2. 消息文件过期(默认72小时),且磁盘空间达到了水位线(默认75%),删除过期文件。
  3. 磁盘已经达到必须释放的上限(85%水位线)的时候,则开始批量清理文件(无论是否过期),直到空间充足。

注:若磁盘空间达到危险水位线(默认90%),出于保护自身的目的,broker会拒绝写入服务。

这样设计带来的好处

消息的物理文件一直存在,消费逻辑只是听客户端的决定而搜索出对应消息进行,这样做,笔者认为,有以下几个好处:

  1. 一个消息很可能需要被N个消费组(设计上很可能就是系统)消费,但消息只需要存储一份,消费进度单独记录即可。这给强大的消息堆积能力提供了很好的支持——  一个消息无需复制N份,就可服务N个消费组。

  2. 由于消费从哪里消费的决定权一直都是客户端决定,所以只要消息还在,就可以消费到,这使得RocketMQ可以支持其他传统消息中间件不支持的回溯消费。即我可以通过设置消费进度回溯,就可以让我的消费组重新像放快照一样消费历史消息;或者我需要另一个系统也复制历史的数据,只需要另起一个消费组从头消费即可(前提是消息文件还存在)。

  3. 消息索引服务。只要消息还存在就能被搜索出来。所以可以依靠消息的索引搜索出消息的各种原信息,方便事后排查问题。

注:在消息清理的时候,由于消息文件默认是1GB,所以在清理的时候其实是在删除一个大文件操作,这对于IO的压力是非常大的,这时候如果有消息写入,写入的耗时会明显变高。这个现象可以在凌晨4点(默认删时间节点)后的附近观察得到。

RocketMQ官方建议Linux下文件系统改为Ext4,对于文件删除操作相比Ext3有非常明显的提升。

跳过历史消息的处理

由于消息本身是没有过期的概念,只有文件才有过期的概念。那么对于很多业务场景—— 一个消息如果太老,是无需要被消费的,是不合适的。

这种需要跳过历史消息的场景,在RocketMQ要怎么实现呢?

对于一个全新的消费组,PushConsumer默认就是跳过以前的消息而从最尾开始消费的。

但对于已存在的消费组,RocketMQ没有内置的跳过历史消息的实现,但有以下手段可以解决:

  1. 自身的消费代码按照日期过滤,太老的消息直接过滤。如:

     @Override
     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
         for(MessageExt msg: msgs){
             if(System.currentTimeMillis()-msg.getBornTimestamp()>60*1000) {//一分钟之前的认为过期
                 continue;//过期消息跳过
             }

             //do consume here

         }
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
     }

  2.自身的消费代码判断消息的offset和MAX_OFFSET相差很远,认为是积压了很多,直接return CONSUME_SUCCESS过滤。

     @Override
     public ConsumeConcurrentlyStatus consumeMessage(//
         List<MessageExt> msgs, //
         ConsumeConcurrentlyContext context) {
         long offset = msgs.get(0).getQueueOffset();
         String maxOffset = msgs.get(0).getProperty(MessageConst.PROPERTY_MAX_OFFSET);
         long diff = Long. parseLong(maxOffset) - offset;
         if (diff > 100000) { //消息堆积了10W情况的特殊处理
             return ConsumeConcurrentlyStatus. CONSUME_SUCCESS;
         }
         //do consume here
         return ConsumeConcurrentlyStatus. CONSUME_SUCCESS;
     }

 

   3.消费者启动前,先调整该消费组的消费进度,再开始消费。可以人工使用控制台命令resetOffsetByTime把消费进度调整到后面,再启动消费。

 

  4.原理同3,但使用代码来控制。代码中调用内部的运维接口,具体代码实例祥见ResetOffsetByTimeCommand.java.

 

 转载:https://jaskey.github.io/blog/2017/02/16/rocketmq-clean-commitlog/

 

 

 

 
TRANSLATE with RocketMQ——消息文件过期原理 x
English
Arabic Hebrew Polish
Bulgarian Hindi Portuguese
Catalan Hmong Daw Romanian
Chinese Simplified Hungarian Russian
Chinese Traditional Indonesian Slovak
Czech Italian Slovenian
Danish Japanese Spanish
Dutch Klingon Swedish
English Korean Thai
Estonian Latvian Turkish
Finnish Lithuanian Ukrainian
French Malay Urdu
German Maltese Vietnamese
Greek Norwegian Welsh
Haitian Creole Persian  
RocketMQ——消息文件过期原理
RocketMQ——消息文件过期原理 RocketMQ——消息文件过期原理 RocketMQ——消息文件过期原理
 
TRANSLATE with RocketMQ——消息文件过期原理
COPY THE URL BELOW
RocketMQ——消息文件过期原理
RocketMQ——消息文件过期原理 Back
EMBED THE SNIPPET BELOW IN YOUR SITE RocketMQ——消息文件过期原理
Enable collaborative features and customize widget: Bing Webmaster Portal
Back
 
 
此页面的语言为中文(简体)
 
翻译为
 
 
 
 
  • 中文(简体)
  • 中文(繁体)
  • 丹麦语
  • 乌克兰语
  • 乌尔都语
  • 亚美尼亚语
  • 俄语
  • 保加利亚语
  • 克罗地亚语
  • 冰岛语
  • 加泰罗尼亚语
  • 匈牙利语
  • 卡纳达语
  • 印地语
  • 印尼语
  • 古吉拉特语
  • 哈萨克语
  • 土耳其语
  • 威尔士语
  • 孟加拉语
  • 尼泊尔语
  • 布尔语(南非荷兰语)
  • 希伯来语
  • 希腊语
  • 库尔德语
  • 德语
  • 意大利语
  • 拉脱维亚语
  • 挪威语
  • 捷克语
  • 斯洛伐克语
  • 斯洛文尼亚语
  • 旁遮普语
  • 日语
  • 普什图语
  • 毛利语
  • 法语
  • 波兰语
  • 波斯语
  • 泰卢固语
  • 泰米尔语
  • 泰语
  • 海地克里奥尔语
  • 爱沙尼亚语
  • 瑞典语
  • 立陶宛语
  • 缅甸语
  • 罗马尼亚语
  • 老挝语
  • 芬兰语
  • 英语
  • 荷兰语
  • 萨摩亚语
  • 葡萄牙语
  • 西班牙语
  • 越南语
  • 阿塞拜疆语
  • 阿姆哈拉语
  • 阿尔巴尼亚语
  • 阿拉伯语
  • 韩语
  • 马尔加什语
  • 马拉地语
  • 马拉雅拉姆语
  • 马来语
  • 马耳他语
  • 高棉语
 

脚本宝典总结

以上是脚本宝典为你收集整理的RocketMQ——消息文件过期原理全部内容,希望文章能够帮你解决RocketMQ——消息文件过期原理所遇到的问题。

如果觉得脚本宝典网站内容还不错,欢迎将脚本宝典推荐好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。
标签: