1. MQ
MQ本质是个队列,是一种跨进程的通信机制,是一种上下游“逻辑解耦+物理解耦”的消息通信服务。消息发送上游只需要依赖 MQ,不用依赖其他服务。
使用MQ三大好处:
- 流量消峰
服务器的处理能力有限,若在某个时刻QPS(每秒查询率)激增,可能会导致系统无法承受而崩溃,此时可以将请求堆积在MQ中,保护下游系统。
QPS指的是“每秒查询率”;而TPS指的是“事务数/秒”。
对于一个页面的一次访问,形成一个Tps;而一次页面请求,可能产生多次对服务器的请求,服务器对这些请求,就可计入“Qps”之中。
- 应用解耦
当每次要给系统添加功能时都需要重新修改原来系统的代码,大大增加系统的维护成本。使用MQ后新加服务只需要消费数据即可,不需要修改原系统。
- 异步处理
以支付系统为例,用户发起支付请求,如果后台业务复杂,可能会花费较长时间,此时用户体验会很不好。支付系统在接收请求后将支付结果发送给MQ并直接响应用户,无论后端业务逻辑有多复杂,用户都无法察觉。比如我们日常在完成支付后都会立刻返回支付成功,但过一会才会收到扣费的通知。
分类
- ActiveMQ
优点:单机吞吐量万级,时效性 ms 级,可用性高,基于主从架构实现高可用性,较低的概率丢失数据
缺点:官方社区现在对 ActiveMQ 5.x 维护越来越少,高吞吐量场景较少使用。
- Kafka
为大数据而生的消息中间件, 以其百万级 TPS 的吞吐量闻名。
优点:性能卓越,单机写入 TPS 约在百万条/秒,最大的优点就是吞吐量高。时效性 ms 级,可用性非常高,kafka 是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,消费者采用 Pull 方式获取消息,消息有序,通过控制能够保证所有消息被消费且仅被消费一次;有优秀的第三方 Kafka Web 管理界面 Kafka-Manager;在日志领域比较成熟,功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用。
缺点:Kafka 单机超过 64 个队列/分区,cpu负载(load)会发生明显的飙高现象,队列越多,load 越高,发送消 息响应时间变长,使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试;支持消息顺序, 但是一台代理宕机后,就会产生消息乱序,社区更新较慢;
- RocketMQ
自阿里巴巴的开源产品,用 Java 语言实现
优点:单机吞吐量十万级,可用性非常高,分布式架构,消息可以做到 0 丢失,MQ 功能较为完善,还是分布式的,扩展性好,支持 10 亿级别的消息堆积,不会因为堆积导致性能下降。
缺点:支持的客户端语言不多,目前是 java 及 c++
- RabbitMQ
在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。
优点:由于 erlang 语言的高并发特性,性能较好;吞吐量到万级,MQ 功能比较完备,健壮、稳定、易用、跨平台、支持多种语言
2. RabbitMQ
RabbitMQ 是一个消息中间件:它接受、存储和转发消息数据。
四大核心:
- 生产者:产生数据发送消息的程序是生产者
- 交换机:一方面接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,由交换机类型决定。
- 队列:RabbitMQ 内部使用的一种数据结构,队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。
- 消费者:消费者大多时候是一个等待接收消息的程序。
核心组成部分
Server:又称Broker ,接受客户端的连接,实现AMQP实体服务。 安装rabbitmq-server
Connection:连接,应用程序与Broker的网络连接 TCP/IP/ 三次握手和四次挥手
Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。
Message :服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
Virtual Host:虚拟地址,用于进行逻辑隔离,不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,是最上层的消息路由,一个虚拟主机可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange
Exchange:交换机,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
Bindings:Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key.
Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。
Queue:保存消息并将它们转发给消费者。
安装
RabbitMQ安装需要安装对应的erlang环境,可以在rabbitmq - Repositories · packagecloud查找。
RabbitMQ的官网难找到对应操作系统的版本,可以在上述网站中查,根据介绍安装即可。
1 2 3 4
| #检测是否安装成功 rpm -qa|grep rabbitmq #设置开机启动 chkconfig rabbitmq-server on
|
RabbiltMQ开启web管理插件:
1
| rabbitmq-plugins enable rabbitmq_management
|
访问地址http://ip:15672/
创建账户:
1 2 3 4 5 6
| rabbitmqctl add_user 用户名 密码 #rabbitmqctl delete_user 用户名 #设置超级管理员角色 rabbitmqctl set_user_tags 用户名 administrator #查看所有用户和角色 rabbitmqctl list_users
|
1 2 3 4
| #设置用户权限 set_permissions [-p <vhostpath>] <user> <conf> <write> <read> #具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限 rabbitmqctl set_permissions -p "/" 用户名 ".*" ".*" ".*"
|
web应用命令:
1 2 3 4 5 6
| #关闭应用 rabbitmqctl stop_app #清除 rabbitmqctl reset #重启 rabbitmqctl start_app
|
3. 工作模式
RabbitMQ官方介绍有七种工作模式,详情见RabbitMQ Tutorials
Hello World
由一个生产者和一个消费者组成,也称为简单模式。
代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public class Producer { private static final String NAME="HELLO"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.64.105"); factory.setUsername("john"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.queueDeclare(NAME,false,false,false,null);
channel.basicPublish("",NAME,null,"hello".getBytes()); System.out.println("消息发送完毕"); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class Consumer { public static void main(String[] args) throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.64.105"); factory.setUsername("john"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); DeliverCallback dc=(tag,delivery)->{ System.out.println(new String(delivery.getBody())); }; CancelCallback cc=(tag)->{ System.out.println("消费中断"); };
channel.basicConsume("HELLO",true,dc,cc); } }
|
Work Queues
简单模式的强化版,也称为工作队列。一个队列可以有多个消费者来竞争消费消息,但是我们仍需保证队列的幂等性(任意次执行对资源本身所产生的影响与一次执行的影响相同),队列存在就不能再创建同名队列。
轮训分发消息
生产者:
1 2 3 4 5 6
| Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String s = scanner.nextLine(); channel.basicPublish("", NAME, null, s.getBytes()); System.out.println("消息发送完毕"); }
|
消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| channel.basicConsume("HELLO", false, (tag, delivery) -> { System.out.println(new String(delivery.getBody())); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("w1执行完成,删除消息!"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); }, (tag) -> { System.out.println("消费中断"); }); }
|
启用两个工作线程,进入Run/Debug Configurations
界面,点击Modify options
:
当我们开启多个工作线程时,队列会默认采取轮训的方式发送消息。简单模式时和工作队列模式的区别只是增加了消费者,队列会自动处理。
消息应答
为了保证消息在发送过程的安全性,rabbitmq引入了消息应答机制:消费者在接收到消息并且处理该消息之后,会通知rabbitmq,随后 rabbitmq 把该消息删除。
自动应答
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡。
一方面消费者连接失败或channel关闭会导致消息丢失;另一方面没有对传递的消息数量进行限制,消费者可能因为消息的积压最终宕机,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。
手动应答
1 2
| Channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false)
|
否定确认时,需要指定是丢弃掉这条消息,还是让这条消息重新排队,过一会再来,又或者是让这条消息重新排队,并尽快让另一个消费者接收并处理它
1 2 3 4
| channel.BasicNack(deliveryTag: e.DeliveryTag, multiple: false, requeue: false);
channel.BasicNack(deliveryTag: e.DeliveryTag, multiple: false, requeue: true);
|
multiple 的 true 代表批量应答 channel 上未应答的消息,false 代表仅确认当前消息。
设置手动应答后,当某一个消费者处理消息时宕机,rabbitmq会将消息重新排队,确保消息不会丢失。
持久化
为了让rabbitmq崩溃时能保证队列和消息的安全性,我们需要将队列和消息都标记为持久化。
队列持久化
1
| channel.queueDeclare(NAME,true,false,false,null);
|
注意:不能同时存在同名的队列,需要将原先的队列删除。
其中D就表示持久化,此时即使重启rabbitmq队列也依然存在。
消息持久化
1
| channel.basicPublish("", NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, s.getBytes());
|
消息标记为持久化并不能完全保证不会丢失消息,消息的缓存和持久化还有一段时间,还需要发布确认章节的知识才能组成更强有力的持久化策略。
不公平分发
最开始我们学习的轮训模式不是很合理,由于物理机的性能差异,所以我们应该使用不公平分发,即“能者多劳”
消费者:
相当于每个消费者声明只能处理一个任务,完成当前任务后才会接取下一个任务,但如果所有的消费者都没有完成当前任务,可能会出现队列被撑满的情况。
预取值
channel中存在一个未确认的消息缓冲区,我们可以限制此缓冲区的大小,以实现最大的性能。
该值过大会增加消费者的RAM消耗(随机存取存储器)
发布确认模式
官方名称为Publisher Confirms
队列持久化+消息持久化+发布确认能极高地保证消息的安全性。
开启发布确认:
1 2
| channel.confirmSelect();
|
- 单个确认发布
是一种同步确认发布的方式,也就是一个消息被确认发布,后续的消息才能继续发布。
如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出
1 2 3 4 5 6 7 8 9 10 11 12
| public static void singlePublish() throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.confirmSelect(); for (int i = 0; i < 10; i++) { channel.basicPublish("", "HELLO", null, (i + "").getBytes(StandardCharsets.UTF_8)); boolean b = channel.waitForConfirms(); if (b) { System.out.println("发送成功!"); } } }
|
缺点:发布速度特别慢。
- 批量确认发布
这种方式可以极大地提高吞吐量,缺点是发布失败时,不知道哪个消息出现问题,当然这种方式也是同步的,一样会阻塞消息的发布。
1 2 3 4 5 6 7 8 9 10 11
| public static void batchPublish() throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.confirmSelect(); for (int i = 0; i < 100; i++) { channel.basicPublish("", "HELLO", null, (i + "").getBytes(StandardCharsets.UTF_8)); if (i != 0 && i % 9 == 0) { channel.waitForConfirms(); System.out.println(i / 9); } } }
|
- 异步确认发布
异步确认发布利用回调函数可靠地传递消息,不会阻塞消息发布,可靠性和效率都是最高的。
需要添加异步的监听器:
1 2
| ConfirmListener addConfirmListener(ConfirmCallback var1, ConfirmCallback var2);
|
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| ConcurrentSkipListMap<Long,String> map=new ConcurrentSkipListMap<>();
channel.addConfirmListener((messageNumber,multiple)->{ if(multiple){ ConcurrentNavigableMap<Long, String> temp = map.headMap(messageNumber, true); temp.clear(); }else{
} },(messageNumber,multiple)->{ if(multiple){
}else{
} });
|
可以把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。
交换机
RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。
生产者只能将消息发送到交换机(exchange);之前我们传递消息使用的是默认交换机。
从上图我们可以看到 Exchanges 的类型总共有四种:
- 扇出(fanout)
- 直接(direct)
- 主题(topic)
- 标题(headers):基本不用
无名交换机
前面我们没有指定交换机,依然能将消息发送到队列:
1
| channel.basicPublish("", "HELLO", null, s.getBytes());
|
第一个参数是交换机的名称,空串表示默认交换机;第二个参数是交换机和队列的绑定关系:routingKey(bindingkey)
默认交换机隐式绑定到队列,路由密钥等于队列名称。无法显式绑定到默认交换或取消绑定。
临时队列
可以创建一个具有随机名称的队列,一旦我们断开了消费者的连 接,队列将被自动删除。
1
| String queueName = channel.queueDeclare().getQueue();
|
绑定(bindings)
binding 是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和哪个队 列进行了绑定关系。
Fanout
扇出交换机将接收到的所有消息广播到它绑定的所有队列中。也称为发布/订阅(Publish/Subscribe)模式。
消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare("exchange1", BuiltinExchangeType.FANOUT); channel.queueDeclare("test1",false,false,false,null); channel.queueDeclare("test2",false,false,false,null); channel.queueBind("test1", "exchange1", "log"); channel.queueBind("test2", "exchange1", "log"); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { channel.basicPublish("exchange1", "log", false, null, scanner.nextLine().getBytes(StandardCharsets.UTF_8)); System.out.println("成功发送!"); } }
|
test1,test2
两个不同的队列会接收到相同的消息。即使 binding key 不同也会收到相同的消息。
Direct
消息只去到它绑定的 routingKey 队列中去。功能比Fanout强大,也称为路由(Routing)模式。
1 2 3 4 5 6 7 8 9 10
| channel.exchangeDeclare("exchange1", BuiltinExchangeType.DIRECT); ... channel.queueBind("test1", "exchange1", "log1"); channel.queueBind("test2", "exchange1", "log2"); for (int i = 1; i < 6; i++) { channel.basicPublish("exchange1", "log1", null, ("消息" + i).getBytes(StandardCharsets.UTF_8)); } for (int i = 11; i < 16; i++) { channel.basicPublish("exchange1", "log2", null, ("消息" + i).getBytes(StandardCharsets.UTF_8)); }
|
test1,test2
两个队列由于 binding key 的不同会收到不同的消息。实际运用中可以使用map存储binding key和消息。
多重绑定
当多个队列使用同一个 binding key ,则功能类似于Fanout。一个队列绑定多个 binding key :
Topics
Direct 尽管已经可以有选择性地发送消息,但仍然不够灵活。
topic 交换机的 routing_key 不能随意,必须是一个单词列表,以点号分隔开。
替换符:
1 2 3 4 5 6 7 8 9 10
| channel.exchangeDeclare("exchange1", BuiltinExchangeType.TOPIC); ...
channel.queueBind("test1", "exchange1", "#.test"); channel.queueBind("test1", "exchange1", "log.*"); channel.queueBind("test2", "exchange1", "*.rabbitmq.*");
channel.basicPublish("exchange1", "log.rabbitmq.test", null, ("都能收到").getBytes(StandardCharsets.UTF_8)); channel.basicPublish("exchange1", "test.rabbitmq.log", null, ("test2的消息").getBytes(StandardCharsets.UTF_8)); channel.basicPublish("exchange1", "test.log.rabbitmq", null, ("都收不到").getBytes(StandardCharsets.UTF_8));
|
死信队列
无法被消费的消息被称为死信,处理死信的队列称为死信队列。
死信的来源:
- 消息 TTL 过期
- 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
- 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.
消息过期
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare("normalE", BuiltinExchangeType.DIRECT); channel.exchangeDeclare("deadE", BuiltinExchangeType.DIRECT); channel.queueDeclare("deadQ", false, false, false, null); channel.queueBind("deadQ","deadE","dead"); Map<String, Object> map = new HashMap<>(); map.put("x-dead-letter-exchange","deadE"); map.put("x-dead-letter-routing-key","dead"); channel.queueDeclare("normalQ", false, false, false, map); channel.queueBind("normalQ", "normalE", "normal"); AMQP.BasicProperties properties=new AMQP.BasicProperties().builder().expiration("10000").build(); channel.basicPublish("normalE","normal",properties,"死信".getBytes(StandardCharsets.UTF_8)); }
|
消息过期后会被送到”deadQ”,如果我们不配置死亡队列,消息过期后会丢失。
队列达到最大长度
1 2
| map.put("x-max-length",6);
|
消息被拒绝
1 2 3 4 5 6 7 8 9 10
| public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.basicConsume("normalQ",false,(tag,message)->{ System.out.println(new String(message.getBody(), StandardCharsets.UTF_8)); channel.basicReject(message.getEnvelope().getDeliveryTag(),false); },(tag)->{
}); }
|
4. 延迟队列
延时队列最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理。
使用场景:
- 订单在十分钟之内未支付则自动取消
- 用户发起退款,如果三天内没有得到处理则通知相关运营人员
- 预定的时间点前十分钟通知各个与会人员参加会议等
可以使用死信队列的方式来实现延迟队列的效果
整合SpringBoot
使用 Spring Initailizr 生成项目,导入依赖:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency>
<dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.9.2</version> </dependency>
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency>
|
修改配置文件:
1 2 3 4 5 6
| spring: rabbitmq: host: 192.168.64.105 port: 5672 username: john password: 123456
|
添加Swagger配置类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.servlet.config.annotation.EnableWebMvc; import springfox.documentation.builders.ApiInfoBuilder; import springfox.documentation.service.ApiInfo; import springfox.documentation.service.Contact; import springfox.documentation.spi.DocumentationType; import springfox.documentation.spring.web.plugins.Docket; import springfox.documentation.swagger2.annotations.EnableSwagger2;
@Configuration
@EnableWebMvc @EnableSwagger2 public class SwaggerConfig { @Bean public Docket webApiConfig() { return new Docket(DocumentationType.SWAGGER_2) .groupName("webApi") .apiInfo(webApiInfo()) .select() .build(); }
private ApiInfo webApiInfo() { return new ApiInfoBuilder() .title("rabbitmq 接口文档") .description("本文档描述了 rabbitmq 微服务接口定义") .version("1.0") .contact(new Contact("LemonPuer", "https://baidu.com", "123456789@qq.com")) .build(); } }
|
SpringBoot在2.6版本之后处理映射匹配的默认策略发生变化;导致出现与Swagger不兼容的情况
1 2 3 4 5 6 7 8
| public static class Pathmatch { private MatchingStrategy matchingStrategy = MatchingStrategy.ANT_PATH_MATCHER; }
public static class Pathmatch { private MatchingStrategy matchingStrategy = MatchingStrategy.PATH_PATTERN_PARSER; }
|
解决方案1:配置文件添加@EnableWebMvc
注解
方案2:配置文件中添加spring.mvc.pathmatch.matching-strategy=ant_path_matcher
示例
配置文件类中创建好交换机和队列:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @Configuration public class MyConfiguration { @Bean public DirectExchange xExchange() { return ExchangeBuilder.directExchange("X").build(); } @Bean("queueA") public Queue queueA() { Map<String, Object> map = new HashMap<>(); map.put("x-dead-letter-exchange", "Y"); map.put("x-dead-letter-routing-key", "YD"); map.put("x-message-ttl", 10000); return QueueBuilder.nonDurable("QA").withArguments(map).build(); }
@Bean public Binding queueABindingX( //@Bean的方法名就是对象名 @Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange ) { return BindingBuilder.bind(queueA).to(xExchange).with("XA"); } }
|
存在多个相同类型的对象,所以这里使用@Qualifier根据对象名引入
生产者代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Slf4j @RestController @RequestMapping("/test") public class TestController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping(value = "/{message}",produces = "application/json;charset=utf-8") public String getTest1(@PathVariable String message){ log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date(), message); rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10S 的队列: "+message); rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl 为 40S 的队列: "+message); return "接收完毕!"; } }
|
设置全部返回数据的编码还是需要配置CharacterEncodingFilter
消费者:
1 2 3 4 5 6 7 8 9
| @Slf4j @Component public class DelateConsume { @RabbitListener(queues = "QD") public void receiveD(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg); } }
|
这样我们就使用死信队列完成了延迟队列的功能,但仍有不足:队列的延迟时间固定,不够灵活。
设置消息的TTL
1 2 3 4 5 6 7 8 9 10 11 12 13
| @GetMapping(value = "/{message}/{time}", produces = "application/json;charset=utf-8") public String getTest2( @PathVariable("message") String message, @PathVariable("time") String time ) { log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(),time, message); rabbitTemplate.convertAndSend("X", "XC", message, msg -> { msg.getMessageProperties().setExpiration(time); return msg; }); return "接收完毕!"; }
|
看似好像解决了问题,但消息可能不会“按时死亡”:
因为RabbitMQ 只会检查第一个消息是否过期,后面的消息过期也不会被优先执行。
延时队列插件
进入RabbitMQ的社区插件
下载rabbitmq_delayed_message_exchange
插件,然后放置到 RabbitMQ 的插件目录/usr/lib/rabbitmq/lib/rabbitmq_server-3.10.0/plugins
。
1 2
| #开启插件 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
|
注意需要下载对应版本的插件
重启RabbitMQ生效:
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Bean public CustomExchange zExchange() { Map<String, Object> map = new HashMap<>(); map.put("x-delayed-type", "direct"); return new CustomExchange("Z", "x-delayed-message", false, false, map); }
@Bean public Queue queueE() { return QueueBuilder.nonDurable("QE").build(); }
@Bean public Binding queueEBindingX( @Qualifier("queueE") Queue queueE, @Qualifier("zExchange") CustomExchange zExchange ) { return BindingBuilder.bind(queueE).to(zExchange).with("ZE").noargs(); }
|
生产者:
1 2 3 4 5 6 7 8 9 10 11 12 13
| @GetMapping(value = "/{message}/{time}", produces = "application/json;charset=utf-8") public String getTest3( @PathVariable("message") String message, @PathVariable("time") Integer time ) { log.info("当前时间:{},发送一条延迟{}毫秒的信息给延迟队列 C:{}", new Date(), time, message); rabbitTemplate.convertAndSend("Z", "ZE", message, msg -> { msg.getMessageProperties().setDelay(time); return msg; }); return "接收完毕!"; }
|
注意这里使用的是setDelay()
消费者:
1 2 3 4 5
| @RabbitListener(queues = "QE") public void receiveE(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("当前时间:{},收到延迟队列信息{}", new Date().toString(), msg); }
|
结果:
当然,延时队列还有很多其它选择,比如利用Java
的DelayQueue
,利用Redis
的zset
,利用Quartz
或者利用kafka
的时间轮,这些方式各有特点,看需要适用的场景。
5. 高级发布确认
当RabbitMQ故障重启期间,生产者投递的消息都会丢失,需要手动处理和恢复。
单机环境下,可以让交换机在收到消息后再删除缓存中的消息。
配置文件:
1
| spring.rabbitmq.publisher-confirm-type=correlated
|
- NONE(默认):禁用发布确认模式‘
- CORRELATED:发布消息成功到交换器后触发回调方法
- SIMPLE:会触发回调方法;并且发布消息成功后使用
rabbitTemplate
调用waitForConfirms
或waitForConfirmsOrDie
方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie
方法如果返回false则会关闭channel,则接下来无法发送消息到broker
接收回调
回调接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Slf4j @Component public class MyCallback implements RabbitTemplate.ConfirmCallback {
@Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String id = correlationData != null ? correlationData.getId() : ""; if (ack) { log.info("交换机已经收到 id 为:{}的消息", id); } else { log.info("交换机还未收到 id 为:{}消息,由于原因:{}", id, cause); } } }
|
生产者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @Slf4j @RestController public class PublishProducer { @Autowired public RabbitTemplate rabbitTemplate; @Autowired public MyCallback myCallback; @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(myCallback); }
@GetMapping("/{message}") public String getTest4(@PathVariable String message){ CorrelationData data1=new CorrelationData(); data1.setId("1"); rabbitTemplate.convertAndSend("Z","ZE","ZE+"+message,data1); log.info("ZE发送消息内容:{}",message); CorrelationData data2=new CorrelationData(); data2.setId("2"); rabbitTemplate.convertAndSend("Z","AE","AE+"+message,data2); log.info("AE发送消息内容:{}",message); return "success!"; }
|
如猜想一样,此方式只能确保交换机能接收到消息,并不能保证消息的安全性
回退消息
通过设置 mandatory 参数可以在消息传递过程中不可达目的地时将消息返回给生产者。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| @Slf4j @Component public class MyCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback { @Autowired public RabbitTemplate rabbitTemplate;
@PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnsCallback(this); }
@Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String id = correlationData != null ? correlationData.getId() : ""; if (ack) { log.info("交换机已经收到 id 为:{}的消息", id); } else { log.info("交换机还未收到 id 为:{}消息,由于原因:{}", id, cause); } }
@Override public void returnedMessage(ReturnedMessage returnedMessage) { log.info("消息:{}被服务器退回,退回原因:{}, 交换机是:{}, 路由 key:{}", new String(returnedMessage.getMessage().getBody()), returnedMessage.getReplyText(), returnedMessage.getExchange(), returnedMessage.getRoutingKey()); } }
|
mandatory
、publisher-confirms
、publisher-return
属性:
1 2 3 4 5 6 7 8
| spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.template.mandatory=true
spring.rabbitmq.publisher-returns=true
|
mandatory
、publisher-return
差异:
- mandatory属性的优先级更高
- mandatory属性可能会返回三种值null、false、true
- mandatory结果为null(即不配置)时结果由publisher-returns确定
备份交换机
程序架构:
1 2 3 4 5 6 7
| public static final String CONFIRM_EXCHANGE = "confirm.exChange"; @Bean public DirectExchange confirmExChange() { return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE).durable(false) .withArgument("alternate-exchange", BACKUP_EXCHANGE).build(); }
|
这里展示交换机需要添加的参数,其它部分正常编写即可
消费者:
1 2 3 4
| @RabbitListener(queues = MyConfiguration.WARNING_QUEUE) public void receiveWarning(Message message, Channel channel) throws IOException { log.error("报警发现不可路由消息:{}", new String(message.getBody())); }
|
mandatory
参数与备份交换机可以一起使用,但备份交换机的优先级更高。
6. 其它知识点
幂等性
用户对于同一操作发起的一次请求或多次请求的结果是一致的,不会因为多次点击而产生副作用。
消息重复消费:
消费者在给 MQ 返回 ack 时网络中断,该条消息被重新发给其他的消费者,或者在网络重连后再次发送给该消费者,造成消费者消费了重复的消息。
业界主流的幂等性有两种操作:
指纹码就是根据某些规则得到的唯一信息码。利用查询语句判断该id是否存在数据库。
优势:实现简单。劣势:高并发场景下窜在写入性能瓶颈,可以采用分库分表提升性能。
利用setnx
指令从而避免重复消费。
优先级队列
在订单催付场景中,我们需要根据客户级别调整订单处理顺序。
队列创建:
1 2 3 4
| Map<String, Object> params = new HashMap();
params.put("x-max-priority", 10); channel.queueDeclare("hello", false, false, false, params);
|
生产者:
1 2 3
| AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();
|
消息的默认优先级为最低
唯有消息在队列中堆积,才会有优先级的判断。
惰性队列
3.6.0 版本开始引入了惰性队列的概念。一般情况下,队列中的消息存在于内存中,惰性队列会将消息存入磁盘,在消费之消费到消息时才会被加载到内存中 。
其设计目标是能够支持更长的队列,即支持更多的消息存储。
惰性队列可以在声明队列的时候使用:
1 2 3
| Map<String, Object> args = new HashMap<String, Object>(); args.put("x-queue-mode", "lazy"); channel.queueDeclare("myqueue", false, false, false, args);
|
也可以通过 Policy 的方式设置, Policy 的方式具备更高的优先级。
Policy
Policies是RabbitMQ的Server端机制
打开Add / update an operator policy
- Name:Policy的名称
- Pattern:此Policy根据正则表达式去匹配Queues/Exchanges名称
- Apply to:此Policy对Queue还是对Exchange生效,或者两者都适用
- Priority:优先级
- Definition:添加的args,KV键值对。
Operator Policy 和 User Policy 的区别:
- Operator Policy 是给服务提供商或公司基础设施部门用来设置某些需要强制执行的通用规则
- User Policy 是给业务应用用来设置的规则
Operator Policy 和 User Policy 会合并后作用于队列,并且为防止 Operator Policy 对队列某些关键属性例如死信队列交换器Dead Letter Exchange
的覆盖导致业务应用产生非预期的结果,Operator Policy 只支持 expire
、message-ttl
、max-length
、max-length-bytes
4个参数。
7. RabbitMQ集群
主从副本集群:
- 修改 3 台机器的主机名称
- 配置各个节点的 hosts 文件,让各个节点都能互相识别
- 确保各个节点的 cookie 文件使用的是同一个值
1
| scp /var/lib/rabbitmq/.erlang.cookie root@从机器名:/var/lib/rabbitmq/.erlang.cookie
|
- 启动 RabbitMQ 服务,顺带启动 Erlang 虚拟机和 RbbitMQ 应用服务(全部机器都要执行)
1
| rabbitmq-server -detached
|
- 从机器执行:
1 2 3 4 5 6 7
| #rabbitmqctl stop 会将 Erlang 虚拟机关闭 #rabbitmqctl stop_app 只关闭 RabbitMQ 服务 rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl join_cluster rabbit@集群中的机器名 #只启动应用服务 rabbitmqctl start_app
|
- 查看集群状态
1
| rabbitmqctl cluster_status
|
- 重新设置用户(集群中任意一台机器)
1 2 3 4 5 6
| #创建账号 rabbitmqctl add_user admin 123 #设置用户角色 rabbitmqctl set_user_tags admin administrator #设置用户权限 rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
|
- 解除集群节点
1 2 3 4 5
| rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl start_app rabbitmqctl cluster_status rabbitmqctl forget_cluster_node rabbit@从机器名(上级机器上执行)
|
默认情况下RabbitMQ代理操作所需的所有数据/状态都将跨所有节点复制。
但消息队列是例外,默认情况下,消息队列仅位于一个节点上,尽管它们可以从所有节点看到和访问
当队列所在的机器宕机,队列就会失去作用。
镜像集群:
镜像集群是基于主从副本集群的,解决了队列的单点故障问题。
搭建步骤:
随便再一个节点中添加policy
改策略会将mirrior
开头的交换机和队列备份一份,即整个集群有两份。
该策略会保证集群中一直有两份数据,当其中一份数据所在的节点宕机,会自动在其他节点中再备份一份。
实现高可用负载均衡
使用Haproxy+Keepalive
两项工具
Haproxy
HAProxy 提供高可用性、负载均衡及基于 TCPHTTP 应用的代理,支持虚拟主机,它是免费、快速并且可靠的一种解决方案。
目前关于网站架构一般比较合理流行的架构方案:Web前端采用Nginx/HAProxy+Keepalived作负载均衡器;后端采用MySQL数据库一主多从和读写分离,采用LVS+Keepalived的架构。
整体架构大都相似:
搭建:
- 下载
haproxy
- 修改配置文件的ip为当前机器ip
1
| vim /etc/haproxy/haproxy.cfg
|
- 启动
1 2
| haproxy -f /etc/haproxy/haproxy.cfg ps -ef | grep haproxy
|
- 访问地址:
ip:8888/stats
Keepalived
能够通过自身健康检查、资源接管功能做高可用(双机热备),实现故障转移。
搭建:
- 下载
1
| yum -y install keepalived
|
- 修改配置文件
1
| vim /etc/keepalived/keepalived.conf
|
Nginx章节已做简单介绍,此处不再赘述
- 添加
haproxy_chk.sh
该脚本用来检测 HAProxy 务的状态,当 HAProxy 服务挂掉之后该脚本会自动重启 HAProxy 的服务,如果不成功则关闭 Keepalived 服务,这样便可以切换到 Backup 继续工作。
- 启动
1
| systemctl start keepalived
|
- 观察
Keepalived
的日志
1
| tail -f /var/log/messages -n 200
|
- 观察最新添加的 vip
Federation
Federation 插件的设计目标是使 RabbitMQ 在不同的 Broker 节点之间进行消息传递而无须建立集群,该功能在很多场景下都非常有用。
Federation 插件能够在不同管理域(可能设置了不同的用户和 vhost,也可能运行在不同版本的 RabbitMQ 和 Erlang 上)中的 Broker 或者集群之间传递消息。
Federation 插件基于 AMQP 0-9-1 协议在不同的 Broker 之间进行通信,并设计成能够容忍不稳定的网络连接情况。
一个 Broker 节点中可以同时存在联邦交换器(或队列)或者本地交换器(或队列),只需要对特定的交换器(或队列)创建 Federation 连接(Federation link)。
一个联邦交换器(federated exchange)或者一个联邦队列(federated queue)接收上游(upstream)的消息,这里的上游是指位于其他 Broker 上的交换器或者队列。联邦交换器能够将原本发送给上游交换器(upstream exchange)的消息路由到本地的某个队列中;联邦队列则允许一个本地消费者接收到来自上游队列(upstream queue)的消息。
Shovel
与 Federation 具备的数据转发功能类似,Shovel 能够可靠、持续地从一个 Broker 中的队列拉取数据并转发至另一个 Broker 中的交换器。作为源端的队列和作为目的端的交换器可以同时位于同一个 Broker 上,也可以位于不同的 Broker 上。
主要优势:
- 松耦合。Shovel 可以移动位于不同管理域中的 Broker(或者集群)上的消息,这些 Broker(或者集群)可以包含不同的用户和 vhost,也可以使用不同的 RabbitMQ 和 Erlang 版本。
- 支持广域网。Shovel 插件同样基于 AMQP 协议在 Broker 之间进行通信,被设计成可以容忍时断时续的连通情形,并且能够保证消息的可靠性。
- 高度定制。当 Shovel 成功连接后,可以对其进行配置以执行相关的 AMQP 命令。