首页 > 经验记录

 

上一篇文章:  RabbitMQ 死信机制真的可以作为延时任务这个场景的解决方案吗? 里最终得出的结论:

RabbitMQ 死信机制可以作为延时任务这个场景的解决方案

但是,由于 RabbitMQ 消息死亡并非异步化,而是阻塞的。所以无法作为复杂延时场景——需要每条消息的死亡相互独立这种场景  下的解决方案。

如果说,果真我的业务就是有这个需求呢?

既需要延时触发、也可以满足延时时间不定长

 

一、解决方案

本身 RabbitMQ 没有这种功能,不过仍然可以使用 RabbitMQ 解决这个场景。

那就是使用插件大法。这也应该是使用 RabbitMQ 时,除了管控台插件外用的最多的一个插件。

需要用到的插件就是这个: rabbitmq_delayed_message_exchange 插件

见名思意,延时消息交换机;  对,他的实现方式已经和队列已经无关了。

这个插件启用后的作用是在原来的 direct、topic、fanout 等这些 exchange 基础上,又新加了一个 exchange 。这个 exchange 的类型是 x-delayed-message

只要发送消息时指定的是这个交换机,那么只需要在消息 header 中指定参数 x-delay [:毫秒值] 就能够实现每条消息的异步延时

 

二、如何安装插件

之前安装 RabbitMQ 的时候,那是真的搞了我一段时间。可以看这篇文章 –> RabbitMQ基本简介与我亲身经历的安装流程(CentOS7)

现在都 0202 年了,还这么搞就太挫了。其实早在N久之前我电脑上的 RabbitMQ 就已经改成了 docker 运行。

那么就在这次插件安装过程中来顺便说一下。

 

首先 docker 安装 RabbitMQ 很简单,这是安装并运行 RabbitMQ3.7.7 的命令

还自带了管控台插件,只需配置好端口映射/文件夹映射,并设置下默认账号密码就完事了

 

好,只要有了RabbitMQ 接下来就只需要安装插件并启用了。

这是插件的github地址 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

先将兼容版本的插件下载到本地, 然后复制进docker容器内执行 rabbitmq-plugins 命令启用就OK了

 

 

 

三、在项目中使用

那么现在万事俱备了,就只需要在项目中实际的使用起来,测试一下是否真能达到想要的效果。

下面的代码都会排除干扰项,删掉之前写的所有和本次无关的配置,以便于阅读

 

3.1、 首先,当然还是先创建绑定关系。

 

值得注意的是,在使用插件给我们带来的新的延迟交换机时, 由于 SpringAMQP 中并没有内置这种模型,所以需要创建 CustomExchange,也就是自定义交换机。

并且需要设置 CustomExchange 的类型为 x-delayed-message

至于队列和绑定关系的设置,该怎么配就怎么配。

在创建绑定关系时,最终需要调用一下 noargs() 方法,BindingBuilder 在绑定 CustomExchange 时 with() 方法返回值并不会是 Binding 类。

 

3.2、消费者编写

在绑定关系创建完毕之后,对应的消费者也是需要的。

其实这个消费者没有任何特殊的地方,毕竟使用了此插件也只是交换机和发消息时要做出改变,队列本身是没有变化的。

 

3.3、测试延时消息发送

那么现在是 “真” 万事俱备。

写个测试类,来往指定的交换机里发送消息。这里当然是向我们刚创建的延时交换机发消息啦。

在发送消息的地方,也是需要做出处理的。

可以通过以下方法来设置消息的 Header。来达到指定延时时间的目的。

但是有一点很奇妙的是 SpringAMQP 他居然自己集成了对应的API (那为啥不集成延时交换机的API? )

所以可以通过这个方式来设置延时时间:

 

 

最后,代码均编写完毕。

启动消费者服务用以监听队列,然后启动测试类观察消息投放,

 

最终控制台打印:

2020-01-18 12:26:28.808 INFO 24592 — [ main] com.skypyb.test.RabbitmqTest : —–消息发送完毕—–

2020-01-18 12:26:31.827 INFO 22844 — [cTaskExecutor-1] c.s.rabbitmq.controller.DelayReceiver : 监听延时交换机, 收到消息: 消息体-3s

2020-01-18 12:26:33.813 INFO 22844 — [cTaskExecutor-3] c.s.rabbitmq.controller.DelayReceiver : 监听延时交换机, 收到消息: 消息体-5s

 

可以看到其完美的符合了需求。

 

 

结语

延时任务这个场景具体的解决方案也就差不多到这了。

死信机制除了比较复杂的延时场景以外,其实也可以满足大多数需求。

那么若是遇到了死信也解决不了的延时场景,RabbitMQ 本身的机制无法实现的话,那么我们可以靠插件来实现对应的需求。

确确实实,RabbitMQ 的这个延时交换机插件还是有点东西的,也难怪 Spring 给他集成了对应的 API。

看了这两篇文章的人,以后若是遇到对应的场景,该用什么就不用我多说了吧 (

 

阅读全文

 

以前我写的几篇 RabbitMQ 相关文章:

RabbitMQ基本简介与我亲身经历的安装流程(CentOS7)

RabbitMQ/AMQP核心概念 , 以及消息流转流程

SpringBoot整合RabbitMQ, 实现生产者与消费者的功能。以及这期间我踩的坑。

关于SpringBoot中RabbitMQ发送消息的回调以及消息确认、重试机制

 

 

关于延时任务,在业务场景中实在是太常见了。比如订单,下单xx分钟未支付就要将订单关闭。 比如红包, XX分钟未抢,则红包失效。

 

那么说起延时任务的实现方案的话,可能有很多人第一时间会想到轮询,即设置定时任务,而稍有经验的开发者就知道。轮询这机制会给数据库带来很大压力,小业务当然无所谓。如果是大量数据要处理的业务用轮询肯定是不行的。而且你如果要保证高可用,就又得牵扯出分布式定时任务。怎么搞都很麻烦。

很多小机灵鬼知道可以用消息队列来实现。确实,MQ的异步性和解耦性在延时任务的这种场景下可以爆发出很强的战斗力。而 RabbitMQ 因其被广泛使用,关于如何实现延时任务自然也有其解决方案。

 

下面本文基于 SpringBoot 环境演示一下使用 RabbitMQ 实现延时任务的方案

用文字和UML活动图来讲一讲所谓 RabbitMQ的 “死信” 机制如何实现延时消息的需求及其功能上的 不足

 

 

 

1、死信是什么

说起死信,balabala的什么死信队列、死信交换机这种名词就出来了。

这个词语有点抽象,但也不是那么难以理解。死信死信,就当他死了(雾

比如你生产者发送一条消息到 MQ Broker ,这条消息因为各种原因没被消费掉,消息最终挂掉了/死了。就可以认为他是死信

那么死信队列呢?死信交换机呢? 其实这两个东西 和普通的队列、交换机是一样的,并没有本质区别

不过可以通过对 RabbitMQ 的配置,将其设置为 “死信” 的处理者。 就是一条消息因为种种原因没被消费掉,最终死了,那么就把这个消息转发给死信交换机、由他来对这个死亡的消息进行处理

这种设置、处理 在 RabbitMQ 中是点对点的,即一个普通队列 可以绑定一个死信交换机。

 

指定队列的死信交换机需要设置队列的属性参数 (arguments)

具体参数名:

绑定死信交换机 : x-dead-letter-exchange

路由死信时使用的 routingkey : x-dead-letter-routing-key

 

2、什么情况会产生死信

在RabbitMQ中,产生死信有这么几种情况

1、队列长度满了

2、消费者拒绝消费消息(丢弃)

3、消息TTL过期

 

这里说到了 TTL ,那么就需要解释一下这是个什么东西。

TTL 是 time to live 的缩写,即生存时间。

RabbitMQ 中可以在队列上、单条消息上设置TTL。如果是设置在队列上,则可以认为该条队列中所有消息的TTL为设定值。

队列TTL属性参数: x-message-ttl

单条消息TTL参数: expiration

 

 

如果设置了TTL值,消息待在队列中的时间超过TTL值后还未被消费的话,消息队列则会将消息丢弃,产生”死信”。

产生死信后,若队列配置了死信交换机,则会将消息流转到绑定的死信交换机中,然后再由死信交换机路由到死信队列

死信队列再推送给这个队列的消费者

 

3、基于死信机制的延时任务实现方案

那么,根据上述 1、2 知识点,对应的延时任务实现方案自然就出来了。

具体方案:

1、创建一个没有消费者的队列,设置TTL值,并绑定死信交换机

2、所有需要延时的消息全部向这条队列发送。

3、死信交换机绑定对应的死信队列,其消费者即为处理延时消息的服务

 

根据以上方案逻辑,在发消息到队列后,必定会等待到消息过期后——即指定的延时时间后,才会有消费者对消息进行处理。

可以实现延时任务的需求。

 

活动图如下所示:

 

 

3、Spring中RabbitMQ死信实现方式

既然知道了原理和机制,那么就先真实上手撸一个出来。

依赖的配置以及具体application.yml文件的书写就不在此进行说明了,想了解详情可以看我以前文章。

 

最重要最核心的是RabbitMQ的队列、交换机配置。

据上述知识点可以得出,只要配置好了TTL、死信交换机,即可实现功能。

那么这里我就直接将我写的配置类贴出:

可以看到我定义了关于 普通队列相关 以及 死信队列相关 的几个常量。

并且基于这些常量实例化出了对应的交换机、队列,并设置了绑定关系。

在实例化普通队列时对其进行了特殊处理; 给普通队列绑定上了死信交换机,并指定好死信 routing key。指定好了其 TTL 值( 5s过期 )后才进行实例化。

 

那么现在以这么一个配置,就已经实现了延时消息需要的所有条件了。

写个消费者、发送者来测试一下。

消费者:

 

发送者:

 

最终控制台结果,确实实现了延时队列的功能:

2020-01-12 11:14:17.582 INFO 12032 — [ main] com.skypyb.test.RabbitmqTest : —–消息发送完毕—–

2020-01-12 11:14:22.599 INFO 10576 — [cTaskExecutor-2] c.s.rabbitmq.controller.DeadReceiver : 死信队列消费者接收消息: 消息体

2020-01-12 11:14:22.599 INFO 10576 — [cTaskExecutor-1] c.s.rabbitmq.controller.DeadReceiver : 死信队列消费者接收消息: 消息体

 

 

除了队列TTL以外,粒度为消息级别的TTL也是可以设置的。

SpringAMQP 对单条消息的TTL设置,需要在 MessageProperties 类中进行,每个消息都会内置一个此类。

为了方便,SpringAMQP在消息发送流程中提供了一个钩子可以让我们设置Message的属性,那就是 MessagePostProcessor

 

既然他用了 @FunctionalInterface 注解,那为了方便我就用lambda表达式写一个,设置单个消息的TTL为3秒:

将代码修改后再次发送,控制台输出:

2020-01-12 11:51:22.788 INFO 26232 — [ main] com.skypyb.test.RabbitmqTest : —–消息发送完毕—–

2020-01-12 11:51:25.787 INFO 10576 — [cTaskExecutor-4] c.s.rabbitmq.controller.DeadReceiver : 死信队列消费者接收消息: 消息体

2020-01-12 11:51:27.784 INFO 10576 — [cTaskExecutor-5] c.s.rabbitmq.controller.DeadReceiver : 死信队列消费者接收消息: 消息体

 

可以看到,嘿 果不其然,消息接收的有时间差别了,正好符合设置的消息TTL 3s 和队列 TTL 5s 。

 

 

但是,这个功能是有缺陷的

这是使用 RabbitMQ 死信机制来作为延时任务必定会出现的不足之处

下面解释一下

 

4、RabbitMQ死信实现方式缺陷

将上边的发送消息代码,顺序调转一下,如下所示:

 

运行代码,结果,执行偏离了想象… 控制台打印:

2020-01-12 15:00:19.371 INFO 9680 — [ main] com.skypyb.test.RabbitmqTest : —–消息发送完毕—–

2020-01-12 15:00:24.380 INFO 10576 — [cTaskExecutor-1] c.s.rabbitmq.controller.DeadReceiver : 死信队列消费者接收消息: 消息体

2020-01-12 15:00:24.380 INFO 10576 — [cTaskExecutor-3] c.s.rabbitmq.controller.DeadReceiver : 死信队列消费者接收消息: 消息体

 

可以看到,消费者消费消息时,都等了整整 5s  !

 

◾ 这是为什么?

这是因为 RabbitMQ 的特性导致的。

RabbitMQ 的队列是一个 FIFO 的有序队列,投入的消息都顺序的压进 MQ 中。

而 RabbitMQ 也只会对队尾的消息进行超时判定,所以就出现了上述的情况。

即哪怕第二条在第3秒时就过期了,但由于第一条消息5秒过期,RabbitMQ会等待到第一条被丢弃后,才对第二条进行判断。最终出现了第一条过期后第二条才跟着过期的结果。

 

 

结语

其实就平时可能遇见的场景而言,使用RabbitMQ 的死信机制就已经足够了。

毕竟大部分延时任务都是固定时间的,比如下单后半小时未支付则关闭订单这种场景。

只要场景是有着固定时间的延时任务的话,RabbitMQ无疑可以很好的承担起这个需求。

针对标题的疑问作出回答的话,可以说出:

RabbitMQ 死信机制能作为延时任务这个场景的解决方案

但是,由于 RabbitMQ 消息死亡并非异步化,而是阻塞的。所以无法作为复杂延时场景——需要每条消息的死亡相互独立这种场景  下的解决方案。

 

 

阅读全文

nodejs 这玩意,写法基本上和 js 是差不多的。

想着到时候微服务跨语言调用的时候说不准也能有机会用到,就看了看。

 

也挺简单的,如果是指增删改查之类的话。

不过起个服务、写写增删改查着实没意思,所以今天写了个爬虫玩玩

git地址: https://github.com/skypyb/anime-spider

 

 

运行起来(如果你的电脑有node环境的话):

 

说明:

  • 主要是采用事件机制监听爬虫数据, 方便做聚合等操作
  • 现在的话只能爬一下B站和bangumi的,MAL和SATI 之后再看吧,没空写了
  • 现在都是一页页的爬,之后改成异步爬虫 ( B站爬的还是挺快的,毕竟是API请求。 bangumi这种后端渲染出来的HTML页面爬起来是有点慢 )

 

main.js:

 

能获取到的数据

  • bilibili 的番剧排行榜-评分正序
    • 事件  spider.animerank.bilibili
  • bangumi 的动画排行榜-评分正序
    • 事件  spider.animerank.bangumi

 

依赖

  • superagent:    HTTP请求
  • cheerio :   Dom解析
阅读全文

 

相信泛型做 Java 开发的都不陌生,也是天天接触的玩意了。不过真正自己写代码玩泛型玩的比较溜的我看还是比较少的。

基础应用、泛型是什么 这些东西就不说了。J2EE的东西到处都有,而且在职的 Java 开发看这种基础肯定没什么意思。

这篇主要就说一些泛型相关的骚操作。把泛型,给他玩的灵性起来。

 


 

  • 传入 A 对象返回指定的 B 对象如何做到?

实现了这个操作的效果是很感人的,看起来非常魔法,具体是个什么效果? 看下面代码:

 

随便想想就知道,在请求/响应模型的设计中。比如接口请求、第三方API请求等场景用起来的感受。

你封装一个指定的Request,  比如OrderCreateRequest, 中间执行者请求一下,马上一个 OrderCreateResponse 就返回回来了。直接拿起这个对象就能操作下一步业务。 美妙。

那么具体如何实现这样的效果? 感觉有意思的可以先自己写一写,我下面直接贴代码、讲设计:

 

首先是 AClass 和 BClass 这两个基类的设计, 可以将其看为Request/Response :

 

可以看到, B平平无奇。 而 A 有一个泛型。 这个泛型限定的就是B及B的子类

这个A基类就是最重要的设计,所有的奥妙全在这里。并且A 是一个抽象类, 提供了一个 getBType 的方法,  返回值就是泛型的class。

这里是专门为继承做的设计, 如果是在真实业务中,一般这些传输对象都会有几个固定的字段: 比如用户ID/商户ID 之类的。

如果是Request/Response 都有的字段,还会抽出一个BaseModel出来, 两边都继承自它。

 

两方基类已经定义好,那么接下来就是两方的具体实现。

B的实现就不贴了,因为我也没在里面写东西,这里主要看A的实现。

这里 AClassOne 继承了AClass 后, 指定的泛型为 BClassOne。

并且实现了基类的抽象方法, 返回了BClassOne 的 Class 对象.

那么现在如果有一个 AClassOne 对象,  我们就可以直接通过其 getBType() 方法获取到BClassOne.class。

 

好, 现在A实现已经有了,与其对应的B实现也有了。就需要一个中间转化层。

这个转化层一般来说就是做一些Http请求啊、RPC啊之类的。获取到了返回值后封装为你需要的对象返回给你的调用者程序就可以了。

重要是这个方法是怎么定义的?  没看代码的话是有点抽象的, 但实际上看了、写了就明白了。是个很简单的东西:

该方法定义的泛型为: <B extends BClass>    返回的是B实体对象。入参接收一个 AClass<B>

如何获取这个B的class? 从 aClass.getBType() 中获取。

为什么要这么获取? 那是因为java的泛型是会被擦除掉的。你如果不这么定义一个方法,程序跑起来不知道你传进来的是个什么鬼东西。

上面加粗标红的三处关键字, 就是整个设计的精髓。 简单、有效、解耦彻底、拓展性高。

 


 

  • 泛型递归的用法

泛型递归? 有人看到这可能就感觉很诡异。 这又是个什么玩意,听都没听过。

来我贴一个实现, 自己感受一下,这玩意也挺抽象,我来细细讲一下。

你看到的这个类: GenericRecursive 泛型定义为E ,E的约束条件是必须为 GenericRecursive 实现类,如果只是这样还好。

但是偏偏后边还跟了一个<E> 。GenericRecursive<E>  这个E是什么呀?

那不就是<E extends GenericRecursive<E>>

这不就递归起来了?什么意思?

 

其实这里这个 E 就是用于约束实现类的。

这里我写了个实现类:

这个实现类,实现了 GenericRecursive 接口,泛型只能指定为自己。

即 GenericRecursiveImpl 实现 GenericRecursive 时, E 必为 GenericRecursiveImpl 。

通过继承和泛型,可以强制子类型所有泛型相关对象的都是子类型本身

 

这个泛型的逻辑:

已知:

GenericRecursive<E extends GenericRecursive<E>>

条件:

E = GenericRecursiveImpl :  泛型约束 E extends GenericRecursive 成立

所以:

因为E = GenericRecursiveImpl 得出:  GenericRecursive<GenericRecursiveImpl>

GenericRecursiveImpl implement GenericRecursive<GenericRecursiveImpl>

 

 

那他的用处呢? 可以在哪里使用到?

他的最主要作用就是强制约束,泛型递归可以让你的系统更加的严谨。

java.lang.Enum 这个类是所有枚举的基类, 看下他的源码,他就使用到了泛型递归:

其实可以从他的角度思考一下,强制约束子类的泛型是子类,有什么用?

观察他的 compareTo() 方法:

为了具体说明泛型递归在枚举中的用处,我写了个演示:

随便写了个临时枚举: TempEnum

可以看到他的 compareTo 方法,传入的类型被限定死了,只能传入 TempEnum

相信从这点中小伙伴们肯定对于泛型递归的好处和用处都可以理解通透了。

 


 

结语:

泛型这玩意, 这些骚操作要是不说出来整理一下,可能还有不少人都不知道Java泛型能这么玩。

当然,其实这种操作你在平时的业务中确实不怎么可能会有机会用到。

但是,有一天你要自己写工具、SDK的时候。相信我,这篇文章里的泛型操作肯定有机会用到的。用出来后那肯定是效果拉满。

我在一些工具类、框架里边已经发现了。

不瞒你说,这篇文章里 传入 A 返回 B 这个操作,就是我从阿里的某SDK中偷学来的 ( 笑 。

 

阅读全文

 

引言

这篇主要表明先开个坑,   后续能填多少是多少。反正坑也有几个没填完的

比如:   探秘分布式解决方案啦(目前4篇)  微服务组件使用啦(目前10篇)   —  [ 2019-12-15数据 ]   慢慢来吧。

目前想法是能写出这么几篇文章: 泛型骚操作、枚举骚操作、链式调用法设计、将 Java8 的Stream终止操作玩出花、解耦合之道(设计思想: 迪米特、开闭、依赖倒置等)、我经历过的设计模式实际应用场景、从主流框架源码取(抄)架构实现自己的工具、 CRUD业务代码细节优雅之道

咳咳,但是能写不代表会写,懂我意思吧。

探秘分布式解决方案  我还有消息驱动分布式事务、分布式锁具体实现代码、分布式定时任务、分布式Session等一堆没写,微服务组件使用就更多了,填不完。

小声BB :   甚至现在有 6 篇其他技术内容的草稿写了一半放在那里没动了

 


 

说在前头:

做程序员的,虽说每天在公司CRUD、改屎山,但是多多少少的都会有点想写个巨牛逼的东西给别人使用。

一年半前我自己还捣鼓出了一个小型ORM,只支持单表。设计现在来看那是惨不忍睹,各种强耦合。不过如果是现在的我,在写工具类时已经可以写的高大上了。

虽然说大部分人自己一个人捣鼓出的东西都是自己自嗨,也包括我。

不过哪怕只是自嗨用,也要以最为优雅的代码来实现,起码当作品完成时,回头一看: 啊,这就是爷写出的代码,比业务代码高出不知道多少,牛批!

 

 

抛开自嗨, 在真实的生产环境、公司的业务代码里,也会有机会可以将你优雅的设计体现出来。

公司的代码大部分逻辑都是CRUD,想把这些玩意写优雅挺困难的。但是总有一天,可能是全新模块、可能是让你对接什么服务, 有一个机会摆在你的面前时。此时若是由你来作为主导,那么 优雅的设计、漂亮的代码,将会使你写出的代码价值最大化。避免后人的埋怨。 人家点开一看: “哇,这设计太牛逼了”。  想想也是令人很激动的。

更别说指不定哪天有人让你封装一些工具啥的。要是有这种机会那就真的太爽了。这种完全和业务无关,作为解决方案的代码可以将你的毕生绝学发挥出来,那这个时候你要是身上没有绝学,就很尴尬,只能封装出一坨奇妙的东西。

 

 

我开这个坑的主要动力。

就是记录下我知道的一些技巧性质的操作 (骚操作)。还有设计理念。

上次设计模式的文章还是 2018-11-06 号写的状态模式了。虽然后续也学了不少其他的设计模式,但都没写出来。挺遗憾的。

 

 

 

 

 

 

阅读全文
EA PLAYER &

历史记录 [ 注意:部分数据仅限于当前浏览器 ]清空

      00:00/00:00