说明:想要理解RabbitMQ,需要先理解MQ是什么?能做什么?然后根据基础知识去理解RabbitMQ是什么、提供了什么功能。
一、MQ的简单理解1. 什么是MQ?- 消息队列(message Queue),是基础数据结构中 “先进先出” 的一种数据结构。
- 一般用来解决应用解耦、异步消息、流量削峰等问题,实现高性能、高可用、可伸缩和最终一致性架构。
- 生产者产生消息并把传输的数据(消息)放在队列中,用队列机制来实现消息传递。
- 消费者可以到指定的队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息。
- 解耦:一个业务需要多个模块共同实现,或一条消息有多个系统对应处理,只需要在主业务完成以后,发送一条MQ,其余模块消费MQ消息,即可实现业务,降低模块之间的耦合。
- 异步:主业务执行结束后,从属业务通过MQ异步处理,减少业务的响应时间,提高用户体验。
- 削峰:高并发情况下,业务异步处理,提供高峰期业务处理能力,避免系统瘫痪。
- 系统可用性降低。依赖服务越多,服务越容易挂掉。需要考虑MQ瘫痪的情况。
- 系统的复杂性提高。需要考虑消息丢失、消息重复消费、消息传递的顺序性。
- 业务一致性。主业务和从属业务一致性的处理。
RabbitMQ是消息代理,它接受并转发消息。
- RabbitMQ可以理解为一个邮箱,或者一个邮局,或者是一个邮递员,保证 “张三” 的信件最终传递给 “李四”。
- RabbitMQ与上述所描述的邮局(邮箱、邮递员)的主要区别在于它不处理纸张,而是接受、存储和转发二进制数据块消息。
- 生产:生产只意味着发送。发送消息的程序是生产者(production)。
- 队列:队列是位于RabbitMQ中的“邮箱”的名称。尽管消息流经RabbitMQ和应用程序,但他们只能存在于队列中。队列只受主机的内存和磁盘限制,它的本质上是一个打的消息缓冲区。许多生产者可以向一个队列发送消息,许多消费者可以尝试从一个队列接收数据。
- 消费(接收):消费与接收具有相似的含义。一个消费者(consumer)是一个程序,主要是等待接收信息。
注意:生产者、消费者、代理不必部署在同一主机上,应用程序既可以是生产者,又可以是消费者
三、RabbitMQ安装3.1环境说明(本文以RabbitMQ3.8.11为例)RabbitMQ对Erlang版本要求(Rabbit是基于Erlang编写的)
RabbitMQ对JDK版本要求
3.2 安装Erlang步骤(本文以windows版安装为例)3.2.1 下载Erlang,或访问如下链接进行下载:
http://erlang.org/download/otp_win64_23.2.exe
3.2.2 双击运行 otp_win64_23.2.exe ,点击下一步完成安装。
3.2.3 安装完成后配置环境变量,如下图所示
3.2.4 运行窗口输入cmd,在dos窗口输入 erl ,返回如图中所示,则代表erlang安装完成。
3.2 安装RibbitMQ步骤(本文以windows版安装为例)3.2.1 点击下载RibbitMQ,或访问如下链接进行下载:
http://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.11/rabbitmq-server-3.8.11.exe
3.2.2 双击运行 rabbitmq-server-3.8.11.exe,点击下一步完成安装。
3.2.3 双击RabbitMQ Service - start 运行RabbitMQ
出现如下提示,则代表服务启动成功:
3.2.4 访问RabbitMQ控制台
控制台地址: http://localhost:15672/
控制台用户名/密码 : guest/guest
四、RabbitMQ传递消息的方式(Java客户端)- Work queues(工作队列)
- Publish/Subscribe(发布/订阅)
- Routing(路由)
- Topics(主题)
- RPC(远程过程调用)
- Publisher Confirms(发布者确认)
环境要求:
- JDK版本为15(1.8 即可)
- amqp-client 5.10.0
添加依赖:
<!--ribbitMq-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
4.1 Work queues(工作队列)
官方描述:
工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成。相反,我们把任务安排在以后完成。我们将任务封装为消息并将其发送到队列。后台运行的工作进程将弹出任务并最终执行作业。当您运行多个worker时,任务将在它们之间共享。
代码示例:
生产者:
1 public class NewTask {
2
3 private static final String TASK_QUEUE_NAME = "task_queue";
4
5 public static void main(String[] args) throws Exception{
6
7 Connectionfactory factory = new ConnectionFactory();
8
9 // 设置IP
10 factory.setHost("127.0.0.1");
11
12 // 设置端口号
13 factory.setPort(5672);
14
15 try (Connection connection = factory.newConnection();
16 Channel channel = connection.createChannel()){
17 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
18
19 String message = String.join(" ", "four");
20
21 channel.basicPublish("", TASK_QUEUE_NAME,
22 MessageProperties.PERSISTENT_TEXT_PLAIN,
23 message.getBytes(StandardCharsets.UTF_8));
24
25 System.out.println(" [x] Sent '" message "'");
26 }
27 }
28 }
消费者:
1 public class Worker {
2
3 private static final String TASK_QUEUE_NAME = "task_queue";
4
5 public static void main(String[] args )throws Exception {
6
7 ConnectionFactory factory = new ConnectionFactory();
8
9 // 设置IP
10 factory.setHost("127.0.0.1");
11
12 // 设置端口号
13 factory.setPort(5672);
14
15 final Connection connection = factory.newConnection();
16 final Channel channel = connection.createchannel();
17
18 channel.queueDeclare(TASK_QUEUE_NAME, true, false,false,null);
19 System.out.println(" [*] Waiting for messages. To exit press CTRL C");
20
21 channel.basicQos(1);
22
23 DeliverCallback deliverCallback = (comsumerTag, delivery) ->{
24 String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
25
26 System.out.println(" [x] Received '" message "'");
27
28 try {
29 doWork(message);
30 } finally {
31 System.out.println("[x] Done");
32 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
33 }
34 };
35 channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, comsumerTag -> {});
36 }
37
38 private static void doWork(String task){
39 for (char ch : task.toCharArray()){
40 if(ch == '.'){
41 try {
42 Thread.sleep(1000);
43 } catch (InterruptedException e) {
44 Thread.currentThread().interrupt();
45 }
46 }
47 }
48 }
49 }
4.3 Publish/Subscribe(发布/订阅)
官方描述:
RabbitMQ消息传递模型中的核心思想是生产者从不将任何消息直接发送到队列。实际上,生产者经常甚至根本不知道是否将消息传递到任何队列。相反,生产者只能将消息发送到交换机。交流是一件非常简单的事情。一方面,它接收来自生产者的消息,另一方面,将它们推入队列。交易所必须确切知道如何处理收到的消息。是否应将其附加到特定队列?是否应该将其附加到许多队列中?还是应该丢弃它。规则由交换类型定义 。
简而言之:
相当于我们关注了一个微信公众号,公众号每次推文我们都能及时的收到。我们就相当于消费者,公众号相当于消息中转站,文章作者相当于生产者。
代码示例:
生产者:
1 public class EmitLog {
2
3 private static final String ExCHANGE_NAME = "logs";
4
5 public static void main(String[] args) throws Exception{
6
7 ConnectionFactory factory = new ConnectionFactory();
8
9 // 设置IP
10 factory.setHost("127.0.0.1");
11
12 // 设置端口号
13 factory.setPort(5672);
14
15 try (Connection connection = factory.newConnection();
16 Channel channel = connection.createChannel()){
17 channel.exchangeDeclare(ExCHANGE_NAME, "fanout");
18
19 String message = args.length < 1 ? "info: Hello World!" : String.join(" ", args);
20
21 channel.basicPublish(ExCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
22
23 System.out.println(" [x] Sent '" message "'");
24
25 }
26
27 }
28
29 }
消费者:
1 public class ReceiveLogs {
2
3 private static final String ExCHANGE_NAME = "logs";
4
5 public static void main(String[] args) throws Exception{
6 ConnectionFactory factory = new ConnectionFactory();
7
8 // 设置IP
9 factory.setHost("127.0.0.1");
10
11 // 设置端口号
12 factory.setPort(5672);
13
14 Connection connection = factory.newConnection();
15 Channel channel = connection.createChannel();
16
17 channel.exchangeDeclare(ExCHANGE_NAME, "fanout");
18 String queueName = channel.queueDeclare().getQueue();
19 channel.queueBind(queueName, ExCHANGE_NAME, "");
20
21 System.out.println(" [*] Waiting for messages. To exit press CTRL C");
22
23 DeliverCallback deliverCallback = (sonsumerTag, delivery) -> {
24 String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
25 System.out.println(" [x] Received '" message "'");
26 };
27 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
28 }
29 }
4.4 Routing(路由)
官方描述:
接上例,我们可能希望将日志消息写入磁盘的程序仅接收严重错误,而不会在警告或信息日志消息上浪费磁盘空间。
简而言之:
如果我们只想接收某些信息,比如日志级别有INFO、ERROR、DEBUG等,我们只愿接收INFO日志。可以使用Routing进行过滤。
代码示例:
生产者:
1 public class EmitLogDirect {
2
3 private static final String EXCHANGE_NAME = "direct_logs";
4
5 public static void main(String[] args) throws Exception{
6
7 ConnectionFactory factory = new ConnectionFactory();
8
9 // 设置IP
10 factory.setHost("127.0.0.1");
11
12 // 设置端口号
13 factory.setPort(5672);
14
15 try (Connection connection = factory.newConnection();
16 Channel channel = connection.createChannel()){
17 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
18
19 String severity = getServerity(args);
20 String message = getMessage(args);
21
22 channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes(StandardCharsets.UTF_8));
23 System.out.println(" [x] Sent '" severity "':'" message "'");
24
25 }
26
27 }
28
29 private static String getServerity(String[] strings){
30 if (strings.length < 1){
31 return "info";
32 }
33 return strings[0];
34
35 }
36
37 private static String getMessage(String[] strings){
38 if (strings.length < 2) {
39 return "Hello World!";
40 }
41 return joinStrings(strings, " ", 1);
42 }
43
44 private static String joinStrings(String[] strings, String delimiter, int startIndex){
45 int length = strings.length;
46 if(length == 0){
47 return "";
48 }
49 if(length <= startIndex){
50 return "";
51 }
52 StringBuilder words = new StringBuilder(strings[startIndex]);
53 for (int i = startIndex 1; i < length; i ){
54 words.append(delimiter).append(strings[i]);
55 }
56 return words.toString();
57
58 }
59 }
消费者:
1 public class ReceiveLogsDirect {
2
3 private static final String EXCHANGE_NAME = "direct_logs";
4
5 public static void main(String[] args) throws Exception{
6
7 ConnectionFactory factory = new ConnectionFactory();
8
9 // 设置IP
10 factory.setHost("127.0.0.1");
11
12 // 设置端口号
13 factory.setPort(5672);
14
15 Connection connection = factory.newConnection();
16
17 Channel channel = connection.createChannel();
18
19 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
20
21 String queueName = channel.queueDeclare().getQueue();
22
23 if(args.length < 1){
24 System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
25 System.exit(1);
26 }
27
28 for (String severity : args){
29 channel.queueBind(queueName, EXCHANGE_NAME, severity);
30 }
31 System.out.println(" [*] Waiting for messages. To exit press CTRL C");
32
33 DeliverCallback deliverCallback = (consumerTag, delivery)->{
34 String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
35 System.out.println(" [x] Received '" delivery.getEnvelope().getRoutingKey() "':'" message "'");
36 };
37
38 channel.basicConsume(queueName, true, deliverCallback, comsumerTag ->{});
39 }
40 }
4.5 Topics(主题)
官方描述:
发送到主题交换机的消息不能具有任意的 routing_key-它必须是单词列表,以点分隔。这些词可以是任何东西,但通常它们指定与消息相关的某些功能。一些有效的路由关键示例:“ stock.usd.nyse ”,“ nyse.vmw ”,“ quick.orange.rabbit ”。路由关键字中可以包含任意多个单词,最多255个字节。
绑定密钥也必须采用相同的形式。主题交换背后的逻辑 类似于直接交换-用特定路由键发送的消息将传递到所有用匹配绑定键绑定的队列。但是,绑定键有两个重要的特殊情况:
- *(星)只能代替一个单词。#(散列)可以代替零个或多个单词。
简而言之:
Topic会根据消息自身所携带的路由键(Routing Key)在所有的绑定关系中寻找,与消息相匹配的队列推送该消息。
注意:
当在绑定中不使用特殊字符“ * ”(星号)和“ # ”(哈希)时,主题交换的行为就像直接的一样。
代码示例:
生产者:
1 public class EmitLogTopic {
2
3 private static final String EXCHANGE_NAME = "topic_logs";
4
5 public static void main(String[] args) throws Exception{
6
7 ConnectionFactory factory = new ConnectionFactory();
8 // 设置IP
9 factory.setHost("127.0.0.1");
10
11 // 设置端口号
12 factory.setPort(5672);
13
14 try(Connection connection = factory.newConnection();
15 Channel channel = connection.createChannel()){
16
17 channel.exchangeDeclare(EXCHANGE_NAME, "topic");
18
19 String routingKey = getRouting(args);
20 String message = getMessage(args);
21
22 channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
23 System.out.println(" [x] Sent '" routingKey "':'" message "'");
24 }
25 }
26
27 private static String getRouting(String[] strings){
28 if (strings.length < 1){
29 return "anonymous.info";
30 }
31 return strings[0];
32 }
33
34 private static String getMessage(String[] strings){
35 if (strings.length < 2){
36 return "hello world";
37 }
38 return joinStrings(strings, " ", 1);
39 }
40
41 private static String joinStrings(String[] strings, String delimiter, int startIndex){
42 int length = strings.length;
43 if(length == 0){
44 return "";
45 }
46 if(length < startIndex){
47 return "";
48 }
49 StringBuilder words = new StringBuilder(strings[startIndex]);
50 for (int i = startIndex 1; i < length; i ){
51 words.append(delimiter).append(strings[i]);
52 }
53 return words.toString();
54 }
55 }
消费者:
1 public class ReceiveLogTopic {
2
3 private static final String EXCHANGE_NAME = "topic_logs";
4
5 public static void main(String[] args) throws Exception{
6
7 ConnectionFactory factory = new ConnectionFactory();
8 // 设置IP
9 factory.setHost("127.0.0.1");
10
11 // 设置端口号
12 factory.setPort(5672);
13
14 Connection connection = factory.newConnection();
15 Channel channel = connection.createChannel();
16
17 channel.exchangeDeclare(EXCHANGE_NAME, "topic");
18
19 String queueName = channel.queueDeclare().getQueue();
20
21 if(args.length < 1){
22 System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
23 System.exit(1);
24 }
25
26 for (String bindingKey : args){
27 channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
28 }
29
30 System.out.println(" [*] Waiting for messages. To exit press CTRL C");
31
32 DeliverCallback deliverCallback = (consumerTag, delivery) -> {
33 String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
34 System.out.println(" [x] Received '" delivery.getEnvelope().getRoutingKey() "':'" message "'");
35 };
36 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
37 }
38 }
4.6 RPC(远程过程调用)
官方描述:
尽管RPC是计算中非常普遍的模式,但它经常受到批评。当程序员不知道函数调用是本地的还是缓慢的RPC时,就会出现问题。这样的混乱会导致系统变幻莫测,并给调试增加了不必要的复杂性。滥用RPC可能会导致无法维护的意大利面条代码,而不是简化软件。
代码示例:
生产者
1 public class RPCServer {
2
3 private static final String RPC_QUEUE_NAME = "rpc_queue";
4
5 private static int fib(int n){
6 if(n == 0){
7 return 0;
8 }
9 if(n == 1){
10 return 1;
11 }
12 return fib(n - 1) fib(n - 2);
13 }
14
15 public static void main(String[] args) throws Exception{
16
17 // 创建服务器的连接
18 ConnectionFactory factory = new ConnectionFactory();
19
20 // 设置IP
21 factory.setHost("127.0.0.1");
22
23 // 设置端口号
24 factory.setPort(5672);
25
26 try (Connection connection = factory.newConnection();
27 Channel channel = connection.createChannel()) {
28 channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
29 channel.queuePurge(RPC_QUEUE_NAME);
30
31 channel.basicQos(1);
32
33 System.out.println(" [x] Awaiting RPC requests");
34
35 Object monitor = new Object();
36 DeliverCallback deliverCallback = (consumerTag, delivery) ->{
37 AMQP.BasicProperties replyProps = new AMQP.BasicProperties
38 .Builder()
39 .correlationId(delivery.getProperties().getCorrelationId())
40 .build();
41
42 String response = "";
43
44 try{
45 String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
46 int n = Integer.parseInt(message);
47
48 System.out.println(" [.] fib(" message ")");
49 response = fib(n);
50 }catch (RuntimeException e){
51 System.out.println(" [.] " e.toString());
52 }finally {
53 channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes(StandardCharsets.UTF_8));
54 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
55
56 // RabbitMq consumer worker thread notifies the RPC server owner thread
57 // RabbitMq使用者工作线程通知RPC服务器所有者线程
58 synchronized (monitor){
59 monitor.notify();
60 }
61 }
62 };
63 channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> {}));
64 // Wait and be prepared to consume the message from RPC client.
65 // 等待并准备使用来自RPC客户端的消息。
66 while(true){
67 synchronized (monitor){
68 try {
69 monitor.wait();
70 }catch (InterruptedException e){
71 e.printStackTrace();
72 }
73 }
74 }
75 }
76 }
77 }
消费者:
1 public class RPCClient {
2
3 private Connection connection;
4 private Channel channel;
5 private String requestQueueName = "rpc_queue";
6
7 public RPCClient() throws IOException, TimeoutException {
8 // 创建服务器的连接
9 ConnectionFactory factory = new ConnectionFactory();
10
11 // 设置IP
12 factory.setHost("127.0.0.1");
13
14 // 设置端口号
15 factory.setPort(5672);
16
17 connection = factory.newConnection();
18 channel = connection.createChannel();
19 }
20
21 public static void main(String[] args) throws Exception{
22 RPCClient fibonacciRpc = new RPCClient();
23 for (int i = 0; i < 32; i ) {
24 String i_str = Integer.toString(i);
25 System.out.println(" [x] Requesting fib(" i_str ")");
26 String response = fibonacciRpc.call(i_str);
27 System.out.println(" [.] Got '" response "'");
28 }
29
30 }
31
32 public String call(String message) throws IOException, InterruptedException {
33 final String corrId = UUID.randomUUID().toString();
34
35 String replyQueueName = channel.queueDeclare().getQueue();
36 AMQP.BasicProperties props = new AMQP.BasicProperties
37 .Builder()
38 .correlationId(corrId)
39 .replyTo(replyQueueName)
40 .build();
41
42 channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
43
44 final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
45
46 String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
47 if (delivery.getProperties().getCorrelationId().equals(corrId)) {
48 response.offer(new String(delivery.getBody(), "UTF-8"));
49 }
50 }, consumerTag -> {
51 });
52
53 String result = response.take();
54 channel.basicCancel(ctag);
55 return result;
56 }
57
58 public void close() throws IOException {
59 connection.close();
60 }
61 }
4.7 Publisher Confirms(发布者确认)
官方描述:
在某些应用程序中,确保将发布的消息发送到代理非常重要。发布者确认是RabbitMQ功能,可以帮助满足此要求。发布者确认本质上是异步的,但也可以同步处理它们。没有确定的方法可以实现发布者确认,这通常归结为应用程序和整个系统中的约束。典型的技术有:
- 单独发布消息,同步等待确认:简单,但吞吐量非常有限。
- 批量发布消息,同步等待批量确认:简单,合理的吞吐量,但是很难推断出什么时候出了问题。
- 异步处理:最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是可以正确实施。
代码示例:
1 public class PublisherConfirms {
2
3 static final int MESSAGE_COUNT = 50_000;
4
5 static Connection createConnection() throws Exception{
6
7 ConnectionFactory cf = new ConnectionFactory();
8
9 // 设置IP
10 cf.setHost("127.0.0.1");
11
12 // 设置端口号
13 cf.setPort(5672);
14
15 // 设置用户名
16 cf.setUsername("guest");
17
18 // 设置密码
19 cf.setPassword("guest");
20
21 return cf.newConnection();
22 }
23
24 public static void main(String[] args) throws Exception{
25 publishMessagesIndividually();
26 publishMessagesInBatch();
27 handlePublishConfirmsAsynchronously();
28 }
29
30 static void publishMessagesIndividually() throws Exception{
31 try(Connection connection = createConnection()){
32 Channel ch = connection.createChannel();
33
34 String queue = UUID.randomUUID().toString();
35 ch.queueDeclare(queue, false, false, true, null);
36
37 ch.confirmSelect();
38
39 long start = System.nanoTime();
40 for (int i = 0; i < MESSAGE_COUNT; i ){
41 String body = String.valueOf(i);
42 ch.basicPublish("", queue, null, body.getBytes(StandardCharsets.UTF_8));
43 ch.waitForConfirmsOrDie(5_000);
44 }
45 long end = System.nanoTime();
46 System.out.format("Published %,d messages individually in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
47 }
48 }
49
50 static void publishMessagesInBatch() throws Exception {
51 try (Connection connection = createConnection()) {
52 Channel ch = connection.createChannel();
53
54 String queue = UUID.randomUUID().toString();
55 ch.queueDeclare(queue, false, false, true, null);
56
57 ch.confirmSelect();
58
59 int batchSize = 100;
60 int outstandingMessageCount = 0;
61 long start = System.nanoTime();
62 for (int i = 0; i < MESSAGE_COUNT; i ) {
63 String body = String.valueOf(i);
64 ch.basicPublish("", queue, null, body.getBytes());
65 outstandingMessageCount ;
66
67 if (outstandingMessageCount == batchSize) {
68 ch.waitForConfirmsOrDie(5_000);
69 outstandingMessageCount = 0;
70 }
71 }
72
73 if (outstandingMessageCount > 0) {
74 ch.waitForConfirmsOrDie(5_000);
75 }
76 long end = System.nanoTime();
77 System.out.format("Published %,d messages in batch in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
78
79 }
80
81 }
82
83 static void handlePublishConfirmsAsynchronously() throws Exception {
84 try (Connection connection = createConnection()) {
85 Channel ch = connection.createChannel();
86
87 String queue = UUID.randomUUID().toString();
88 ch.queueDeclare(queue, false, false, true, null);
89
90 ch.confirmSelect();
91
92 ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
93
94 ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
95 if (multiple) {
96 ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(
97 sequenceNumber, true
98 );
99 confirmed.clear();
100 } else {
101 outstandingConfirms.remove(sequenceNumber);
102 }
103 };
104
105 ch.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
106 String body = outstandingConfirms.get(sequenceNumber);
107 System.err.format(
108 "Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
109 body, sequenceNumber, multiple
110 );
111 cleanOutstandingConfirms.handle(sequenceNumber, multiple);
112 });
113
114 long start = System.nanoTime();
115 for (int i = 0; i < MESSAGE_COUNT; i ) {
116 String body = String.valueOf(i);
117 outstandingConfirms.put(ch.getNextPublishSeqNo(), body);
118 ch.basicPublish("", queue, null, body.getBytes());
119 }
120
121 if (!waitUntil(Duration.ofSeconds(60), () -> outstandingConfirms.isEmpty())) {
122 throw new IllegalStateException("All messages could not be confirmed in 60 seconds");
123 }
124
125 long end = System.nanoTime();
126 System.out.format("Published %,d messages and handled confirms asynchronously in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
127 }
128 }
129
130 static boolean waitUntil(Duration timeout, BooleanSupplier condition) throws InterruptedException {
131 int waited = 0;
132 while (!condition.getAsBoolean() && waited < timeout.toMillis()) {
133 Thread.sleep(100L);
134 waited = 100;
135 }
136 return condition.getAsBoolean();
137 }
138 }
五、总结
总的来说,RabbitMQ还是比较简单的。目前文章只是简单记录一下,后期会更深入学习。
,作者: 学海无涯519
原文链接:http://www.cnblogs.com/wgx519/p/14371511.html