首页 > 经验记录 > java > 解决死信队列消息过期非异步问题,RabbitMQ 延时消息更优解——插件大法(Docker版)

解决死信队列消息过期非异步问题,RabbitMQ 延时消息更优解——插件大法(Docker版)

 

上一篇文章:  RabbitMQ 死信机制真的可以作为延时任务这个场景的解决方案吗? 里最终得出的结论:

RabbitMQ 死信机制可以作为延时任务这个场景的解决方案

但是,由于 RabbitMQ 消息死亡并非异步化,而是阻塞的。所以无法作为复杂延时场景——需要每条消息的死亡相互独立这种场景  下的解决方案。

如果说,果真我的业务就是有这个需求呢?

既需要延时触发、也可以满足延时时间不定长

 

一、解决方案

本身 RabbitMQ 没有这种功能,不过仍然可以使用 RabbitMQ 解决这个场景。

那就是使用插件大法。这也应该是使用 RabbitMQ 时,除了管控台插件外用的最多的一个插件。

需要用到的插件就是这个: rabbitmq_delayed_message_exchange 插件

见名思意,延时消息交换机;  对,他的实现方式已经和队列已经无关了。

这个插件启用后的作用是在原来的 direct、topic、fanout 等这些 exchange 基础上,又新加了一个 exchange 。这个 exchange 的类型是 x-delayed-message

只要发送消息时指定的是这个交换机,那么只需要在消息 header 中指定参数 x-delay [:毫秒值] 就能够实现每条消息的异步延时

 

二、如何安装插件

之前安装 RabbitMQ 的时候,那是真的搞了我一段时间。可以看这篇文章 –> RabbitMQ基本简介与我亲身经历的安装流程(CentOS7)

现在都 0202 年了,还这么搞就太挫了。其实早在N久之前我电脑上的 RabbitMQ 就已经改成了 docker 运行。

那么就在这次插件安装过程中来顺便说一下。

 

首先 docker 安装 RabbitMQ 很简单,这是安装并运行 RabbitMQ3.7.7 的命令

还自带了管控台插件,只需配置好端口映射/文件夹映射,并设置下默认账号密码就完事了

 

好,只要有了RabbitMQ 接下来就只需要安装插件并启用了。

这是插件的github地址 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

先将兼容版本的插件下载到本地, 然后复制进docker容器内执行 rabbitmq-plugins 命令启用就OK了

 

 

 

三、在项目中使用

那么现在万事俱备了,就只需要在项目中实际的使用起来,测试一下是否真能达到想要的效果。

下面的代码都会排除干扰项,删掉之前写的所有和本次无关的配置,以便于阅读

 

3.1、 首先,当然还是先创建绑定关系。

 

值得注意的是,在使用插件给我们带来的新的延迟交换机时, 由于 SpringAMQP 中并没有内置这种模型,所以需要创建 CustomExchange,也就是自定义交换机。

并且需要设置 CustomExchange 的类型为 x-delayed-message

至于队列和绑定关系的设置,该怎么配就怎么配。

在创建绑定关系时,最终需要调用一下 noargs() 方法,BindingBuilder 在绑定 CustomExchange 时 with() 方法返回值并不会是 Binding 类。

 

3.2、消费者编写

在绑定关系创建完毕之后,对应的消费者也是需要的。

其实这个消费者没有任何特殊的地方,毕竟使用了此插件也只是交换机和发消息时要做出改变,队列本身是没有变化的。

 

3.3、测试延时消息发送

那么现在是 “真” 万事俱备。

写个测试类,来往指定的交换机里发送消息。这里当然是向我们刚创建的延时交换机发消息啦。

在发送消息的地方,也是需要做出处理的。

可以通过以下方法来设置消息的 Header。来达到指定延时时间的目的。

但是有一点很奇妙的是 SpringAMQP 他居然自己集成了对应的API (那为啥不集成延时交换机的API? )

所以可以通过这个方式来设置延时时间:

 

 

最后,代码均编写完毕。

启动消费者服务用以监听队列,然后启动测试类观察消息投放,

 

最终控制台打印:

2020-01-18 12:26:28.808 INFO 24592 — [ main] com.skypyb.test.RabbitmqTest : —–消息发送完毕—–

2020-01-18 12:26:31.827 INFO 22844 — [cTaskExecutor-1] c.s.rabbitmq.controller.DelayReceiver : 监听延时交换机, 收到消息: 消息体-3s

2020-01-18 12:26:33.813 INFO 22844 — [cTaskExecutor-3] c.s.rabbitmq.controller.DelayReceiver : 监听延时交换机, 收到消息: 消息体-5s

 

可以看到其完美的符合了需求。

 

 

结语

延时任务这个场景具体的解决方案也就差不多到这了。

死信机制除了比较复杂的延时场景以外,其实也可以满足大多数需求。

那么若是遇到了死信也解决不了的延时场景,RabbitMQ 本身的机制无法实现的话,那么我们可以靠插件来实现对应的需求。

确确实实,RabbitMQ 的这个延时交换机插件还是有点东西的,也难怪 Spring 给他集成了对应的 API。

看了这两篇文章的人,以后若是遇到对应的场景,该用什么就不用我多说了吧 (

 

           


1 COMMENT

  1. 股票历史数据下载2020-01-19 15:55

    学习了学习了 :grin:

EA PLAYER &

历史记录 [ 注意:部分数据仅限于当前浏览器 ]清空

      00:00/00:00