DevilKing's blog

冷灯看剑,剑上几分功名?炉香无需计苍生,纵一穿烟逝,万丈云埋,孤阳还照古陵

0%

延迟队列

原文链接

业务场景:

  • 当订单一直处于未支付状态时,如何及时的关闭订单,并退还库存?
  • 如何定期检查处于退款状态的订单是否已经退款成功?
  • 新创建店铺,N天内没有上传商品,系统如何知道该信息,并发送激活短信?等等

设计目标:

  • 消息传输可靠性:消息进入到延迟队列后,保证至少被消费一次。
  • Client支持丰富:由于业务上的需求,至少支持PHP和Python。
  • 高可用性:至少得支持多实例部署。挂掉一个实例后,还有后备实例继续提供服务。
  • 实时性:允许存在一定的时间误差。
  • 支持消息删除:业务使用方,可以随时删除指定消息。

设计结构

msg结构:

  • Topic:Job类型。可以理解成具体的业务名称。
  • Id:Job的唯一标识。用来检索和删除指定的Job信息。
  • Delay:Job需要延迟的时间。单位:秒。(服务端会将其转换为绝对时间)
  • TTR(time-to-run):Job执行超时时间。单位:秒。
  • Body:Job的内容,供消费者做具体的业务处理,以json格式存储。

一个msg的生命周期:

  • 用户对某个商品下单,系统创建订单成功,同时往延迟队列里put一个job。job结构为:{‘topic’:’orderclose’, ‘id’:’ordercloseorderNoXXX’, ‘delay’:1800 ,’TTR’:60 , ‘body’:’XXXXXXX’}
  • 延迟队列收到该job后,先往job pool中存入job信息,然后根据delay计算出绝对执行时间,并以轮询(round-robbin)的方式将job id放入某个bucket。
  • timer每时每刻都在轮询各个bucket,当1800秒(30分钟)过后,检查到上面的job的执行时间到了,取得job id从job pool中获取元信息。如果这时该job处于deleted状态,则pass,继续做轮询;如果job处于非deleted状态,首先再次确认元信息中delay是否大于等于当前时间,如果满足则根据topic将job id放入对应的ready queue,然后从bucket中移除;如果不满足则重新计算delay时间,再次放入bucket,并将之前的job id从bucket中移除。
  • 消费端轮询对应的topic的ready queue(这里仍然要判断该job的合理性),获取job后做自己的业务逻辑。与此同时,服务端将已经被消费端获取的job按照其设定的TTR,重新计算执行时间,并将其放入bucket。
  • 消费端处理完业务后向服务端响应finish,服务端根据job id删除对应的元信息。

  • timer是通过独立线程的无限循环来实现,在没有ready job的时候会对CPU造成一定的浪费。
  • 消费端在reserve job的时候,采用的是http短轮询的方式,且每次只能取的一个job。如果ready job较多的时候会加大网络I/O的消耗。
  • 数据存储使用的redis,消息在持久化上受限于redis的特性。

  • 基于wait/notify方式的Timer实现

  • 提供TCP长连的API,实现push或者long-polling的消息reserve方法

  • 拥有自己的存储方案(内嵌数据库、自定义数据结构写文件),确保消息的持久化

  • 考虑提供周期性任务的直接支持