首页 > 经验记录 > 解决死信队列消息过期非异步问题,RabbitMQ 延时消息更优解——插件大法(Docker版)

解决死信队列消息过期非异步问题,RabbitMQ 延时消息更优解——插件大法(Docker版)

 
上一篇文章:  RabbitMQ 死信机制真的可以作为延时任务这个场景的解决方案吗? 里最终得出的结论:
RabbitMQ 死信机制可以作为延时任务这个场景的解决方案
但是,由于 RabbitMQ 消息死亡并非异步化,而是阻塞的。所以无法作为复杂延时场景——需要每条消息的死亡相互独立这种场景  下的解决方案。
如果说,果真我的业务就是有这个需求呢?
既需要延时触发、也可以满足延时时间不定长
 

一、解决方案

本身 RabbitMQ 没有这种功能,不过仍然可以使用 RabbitMQ 解决这个场景。
那就是使用插件大法。这也应该是使用 RabbitMQ 时,除了管控台插件外用的最多的一个插件。
需要用到的插件就是这个: rabbitmq_delayed_message_exchange 插件
见名思意,延时消息交换机;  对,他的实现方式已经和队列已经无关了。
这个插件启用后的作用是在原来的 direct、topic、fanout 等这些 exchange 基础上,又新加了一个 exchange 。这个 exchange 的类型是 x-delayed-message
只要发送消息时指定的是这个交换机,那么只需要在消息 header 中指定参数 x-delay [:毫秒值] 就能够实现每条消息的异步延时
 

二、如何安装插件

之前安装 RabbitMQ 的时候,那是真的搞了我一段时间。可以看这篇文章 –> RabbitMQ基本简介与我亲身经历的安装流程(CentOS7)
现在都 0202 年了,还这么搞就太挫了。其实早在N久之前我电脑上的 RabbitMQ 就已经改成了 docker 运行。
那么就在这次插件安装过程中来顺便说一下。
 
首先 docker 安装 RabbitMQ 很简单,这是安装并运行 RabbitMQ3.7.7 的命令
还自带了管控台插件,只需配置好端口映射/文件夹映射,并设置下默认账号密码就完事了

docker run --name rabbitmq -dt\
  -v /opt/dockerdata/rabbitmq:/var/lib/rabbitmq\
  -p 5672:5672 -p 15672:15672\
  --hostname rabbit\
  -e RABBITMQ_DEFAULT_VHOST=/\
  -e RABBITMQ_DEFAULT_USER=admin\
  -e RABBITMQ_DEFAULT_PASS=614\
  rabbitmq:3.7.7-management

 
好,只要有了RabbitMQ 接下来就只需要安装插件并启用了。
这是插件的github地址 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
先将兼容版本的插件下载到本地, 然后复制进docker容器内执行 rabbitmq-plugins 命令启用就OK了

wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez
docker cp rabbitmq_delayed_message_exchange-3.8.0.ez rabbitmq:/plugins
rm rabbitmq_delayed_message_exchange-3.8.0.ez
docker exec -it rabbitmq bin/bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

 
 
 

三、在项目中使用

那么现在万事俱备了,就只需要在项目中实际的使用起来,测试一下是否真能达到想要的效果。
下面的代码都会排除干扰项,删掉之前写的所有和本次无关的配置,以便于阅读
 
3.1、 首先,当然还是先创建绑定关系。

/**
 * Rabbitmq的绑定配置,绑定Exchange、MQ、RoutingKey
 * Create by skypyb on 2019-11-16
 */
@Configuration
public class RabbitBindConfig {
    public final static String SKYPYB_DELAY_EXCHANGE = "skypyb-delay-exchange";
    public final static String SKYPYB_DELAY_QUEUE = "skypyb-delay-queue";
    public final static String SKYPYB_DELAY_KEY = "skypyb.key.delay";
    @Bean
    public CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        //自定义交换机
        return new CustomExchange(SKYPYB_DELAY_EXCHANGE, "x-delayed-message", false, true, args);
    }
    @Bean
    public Queue delayQueue() {
        return new Queue(SKYPYB_DELAY_QUEUE, false, false, true);
    }
    @Bean
    public Binding bindingDelayExchangeAndQueue() {
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(SKYPYB_DELAY_KEY).noargs();
    }
}

 
值得注意的是,在使用插件给我们带来的新的延迟交换机时, 由于 SpringAMQP 中并没有内置这种模型,所以需要创建 CustomExchange,也就是自定义交换机。
并且需要设置 CustomExchange 的类型为 x-delayed-message
至于队列和绑定关系的设置,该怎么配就怎么配。
在创建绑定关系时,最终需要调用一下 noargs() 方法,BindingBuilder 在绑定 CustomExchange 时 with() 方法返回值并不会是 Binding 类。
 
3.2、消费者编写
在绑定关系创建完毕之后,对应的消费者也是需要的。
其实这个消费者没有任何特殊的地方,毕竟使用了此插件也只是交换机和发消息时要做出改变,队列本身是没有变化的。

@RabbitListener(queues = {RabbitBindConfig.SKYPYB_DELAY_QUEUE})
@Component
public class DelayReceiver {
    private Logger logger = LoggerFactory.getLogger(DelayReceiver.class);
    @RabbitHandler
    public void onDelayMessage(@Payload String message,
                              @Headers Map<String, Object> headers,
                              Channel channel) throws IOException {
        logger.info("监听延时交换机, 收到消息: {}", message);
        //delivery tag可以从headers中get出来
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        try {
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            boolean redelivered = (boolean) headers.get(AmqpHeaders.REDELIVERED);
            channel.basicNack(deliveryTag, false, !redelivered);
        }
    }
}

 
3.3、测试延时消息发送
那么现在是 “真” 万事俱备。
写个测试类,来往指定的交换机里发送消息。这里当然是向我们刚创建的延时交换机发消息啦。

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class RabbitmqTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    private Logger logger = LoggerFactory.getLogger(RabbitmqTest.class);
    @Test
    public void testDelay() {
        rabbitTemplate.convertAndSend(RabbitBindConfig.SKYPYB_DELAY_EXCHANGE,
                RabbitBindConfig.SKYPYB_DELAY_KEY, "消息体-5s",
                (msg) -> {
                    msg.getMessageProperties().setDelay(5000);
                    return msg;
                });
        rabbitTemplate.convertAndSend(
                RabbitBindConfig.SKYPYB_DELAY_EXCHANGE,
                RabbitBindConfig.SKYPYB_DELAY_KEY,
                "消息体-3s",
                (msg) -> {
                    msg.getMessageProperties().setDelay(3000);
                    return msg;
                });
        logger.info("-----消息发送完毕-----");
    }
}

在发送消息的地方,也是需要做出处理的。
可以通过以下方法来设置消息的 Header。来达到指定延时时间的目的。

message.getMessageProperties().setHeader("x-delay",3000);

但是有一点很奇妙的是 SpringAMQP 他居然自己集成了对应的API (那为啥不集成延时交换机的API? )
所以可以通过这个方式来设置延时时间:

message.getMessageProperties().setDelay(3000);

 
 
最后,代码均编写完毕。
启动消费者服务用以监听队列,然后启动测试类观察消息投放,
 
最终控制台打印:

2020-01-18 12:26:28.808 INFO 24592 — [ main] com.skypyb.test.RabbitmqTest : —–消息发送完毕—–
2020-01-18 12:26:31.827 INFO 22844 — [cTaskExecutor-1] c.s.rabbitmq.controller.DelayReceiver : 监听延时交换机, 收到消息: 消息体-3s
2020-01-18 12:26:33.813 INFO 22844 — [cTaskExecutor-3] c.s.rabbitmq.controller.DelayReceiver : 监听延时交换机, 收到消息: 消息体-5s

 
可以看到其完美的符合了需求。
 
 

结语

延时任务这个场景具体的解决方案也就差不多到这了。
死信机制除了比较复杂的延时场景以外,其实也可以满足大多数需求。
那么若是遇到了死信也解决不了的延时场景,RabbitMQ 本身的机制无法实现的话,那么我们可以靠插件来实现对应的需求。
确确实实,RabbitMQ 的这个延时交换机插件还是有点东西的,也难怪 Spring 给他集成了对应的 API。
看了这两篇文章的人,以后若是遇到对应的场景,该用什么就不用我多说了吧 (
 

           


CAPTCHAis initialing...

2 COMMENTS

  1. 股票历史数据下载2020-01-19 15:55

    学习了学习了 :grin:

EA PLAYER &

历史记录 [ 注意:部分数据仅限于当前浏览器 ]清空

      00:00/00:00