RabbitMQ
RabbitMQ
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。
AMQP(Advanced Message Queuing Protocol)高级消息队列协议,一个提供统一消息服务的应用层标准协议,是应用层协议的一个开放标准,为面向消息的中间件设计。AMQP是一个进程间传递异步消息的网络协议。
Producer(生产者):生产消息
ConnectionFactory(连接工厂):生产Connection的的工厂
Connection(连接):连接,应用程序与Broker的网络连接TCP/IP/三次握手和四次挥手。
AMQP连接通常是长连接。AMQP是一个使用 TCP提供可靠投递的应用层协议。AMQP 使用认证机制并且提供 TLS(SSL)保护。当一个应用不再需要连接到 AMQP 代理的时候,需要优雅的释放掉 AMQP 连接,而不是直接将 TCP 连接关闭。
Channel(信道):网络信道,是建立在Connection连接之上的一种轻量级的连接。几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立多各Channel,每个Channel代表一个会话任务。
大部分的业务操作是在Channel这个接口中完成的,包括:
队列的声明queueDeclare
交换机的声明exchangeDeclare
队列的绑定queueBind
发布消息basicPublish
消费消息basicConsume等。
Broker(中间件)VirtualHost(虚拟主机):虚拟主机,用于进行逻辑隔离,一个虚拟主机理由可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名字的Exchange。
为了在一个单独的代理上实现多个隔离的环境(用户、用户组、交换机、队列 等),AMQP 提供了一个虚拟主机(virtual hosts - vhosts)的概念。这跟 Web servers虚拟主机概念非常相似,这为 AMQP 实体提供了完全隔离的环境。当连接被建立的时候,AMQP 客户端来指定使用哪个虚拟主机。
Exchange(交换机):交换机,接受消息,根据路由键发送消息到绑定的队列(不具备消息存储的能力)。
交换机是用来发送消息的 AMQP 实体。交换机拿到一个消息之后将它路由给一个或零个队列。它使用哪种路由算法是由交换机类型和绑定(Bindings)规则所决定的。
Consumer(消费者):消费者消费消息。在AMQP中,消费者获取待消费消息的途径有两种:
- 消息中间件将消息投递给消费者(push API)
- 消费者主动获取消息 (pull API)
需要注意:多个消费者监听同一个队列时,队列中的消息只会被其中一个消费者消费(并不会每个消费者都消费一次)
Message(消息):消息,服务与应用程序之间传送的数据,由Properties(属性)和body(主体)组成。属性是对消息进行修饰,比如消息的优先级,延迟等高级特性,主体则就是消息体的内容。
优点:
- 应用耦合
- 异步处理
- 流量削峰
RabbitMQ优势
- 基于AMQP协议
- 高并发(是一个容量的概念,服务器可以接受的最大任务数量)
- 高性能(是一个速度的概念,单位时间内服务器可以处理的任务数)
- 高可用(是一个持久的概念,单位时间内服务器可以正常工作的时间比例)
- 强大的社区支持
- 支持插件
- 支持多语言
RabbitMQ界面
概要
连接
通道
- 一个连接可以创建多个通道
- 一个应用或者一个线程,都是一个通道
- 在通道中创建队列,生产者的通道一般立马关闭,消费者是一只在监听,通道一直存在
交换机
| Type | 解释 |
| —————– | ———————————————————— |
| direct | 它会把消息路由到那些 binding key 与 routing key 完全匹配的 Queue 中 |
| fanout | 它会把所有发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中 |
| headers | headers 类型的 Exchange 不依赖于 routing key 与 binding key 的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。(headers 类型的交换器性能差,不实用,基本不会使用。) |
| topic | 与direct模型相比,多了个可以使用通配符 |
| x-delayed-message | 延迟交换机,可以延迟接收消息 || Features | 解释 |
| ———— | ———————————————————— |
| D | d 是 durable 的缩写,代表这个队列中的消息支持持久化 |
| AD | ad 是 autoDelete 的缩写。代表当前队列的最后一个消费者退订时被自动删除。注意:此时不管队列中是否还存在消息,队列都会删除。 |
| excl | 是 exclusive 的缩写。代表这是一个排他队列。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。 |
| Args | 是 arguments 的缩写。代表该队列配置了 arguments 参数。 |
| TTL | 是 x-message-ttl 的缩写。设置队列中的所有消息的生存周期(统一为整个队列的所有消息设置生命周期), 也可以在发布消息的时候单独为某个消息指定剩余生存时间,单位毫秒。 |
| Exp | Auto Expire,是 x-expires 配置的缩写。当队列在指定的时间没有被访问(consume, basicGet, queueDeclare…)就会被删除,Features=Exp。注意这里是删除队列,不是队列中的消息。 |
| Lim | 说明该队列配置了 x-max-length。限定队列的消息的最大值长度,超过指定长度将会把最早的几条删除掉。 |
| Lim B | 说明队列配置了 x-max-length-bytes。限定队列最大占用的空间大小, 一般受限于内存、磁盘的大小。 |
| DLX | 说明该队列配置了 x-dead-letter-exchange。当队列消息长度大于最大长度、或者过期的等,将从队列中删除的消息推送到指定的交换机中去而不是丢弃掉。 |
| DLK | x-dead-letter-routing-key 的缩写,将删除的消息推送到指定交换机的指定路由键的队列中去。 |
| Pri | x-max-priority 的缩写,优先级队列。表明该队列支持优先级,先定义最大优先级值(定义最大值一般不要太大),在发布消息的时候指定该消息的优先级, 优先级更高(数值更大的)的消息先被消费。 |
| Ovfl | x-overflow 的缩写。队列中的消息溢出时,如何处理这些消息。要么丢弃队列头部的消息,要么拒绝接收后面生产者发送过来的所有消息。有两个配置项:drop-head,代表丢弃队列头部的消息,默认行为;reject-publish 设置队列中的消息溢出后,该队列的行为:”拒绝接收”(所有消息)。 |
| ha-all | 镜像队列。all 表示镜像到集群上的所有节点,ha-params 参数忽略。 |
七大示例
Hello World
工作队列
假如一共20条消息,A处理时间1s,B处理时间2s
轮询
A,B同时拿到各自的10条消息,然后再处理,所以A经过10s处理完,B要经过20s才能处理完
公平
A和B不能直接拿到所有消息,设置int prefetchCount = 1表示消费者每次只能接收一条消息,只有处理完这条消息,才能处理下一条
1
channel.basicQos(prefetchCount);
这样A处理的快,所以A可以不断从信道中得到消息,B处理的比A慢,间隔2s才可以获得一条消息
**没有资源浪费**
发布订阅(使用交换机,使用广播模式fanout)
- 生产者绑定交换机,将消息发送至交换机
- 交换机生成排他队列,然后将交换机与队列进行绑定
- 消费者监听对应队列的消息
交换机将消息转发至队列,消费者可以收到生产发布的消息。
路由队列(使用交换机,使用路由模式direct)
- 生产者绑定交换机,并且定义不同的routingKey,生产者将携带不同routingKey的消息发送至交换机
- 交换机生成排他队列,同时将队列与交换机进行绑定,此时不同的队列绑定不同的routingKey
- 消费者监听对应队列的消息
交换机将携带不同的routingKey的消息转发至对应routingKey的队列,消费者可以收到生产发布的消息
主题队列(使用交换机,使用主题模式topic)
* 匹配一个字符串
# 匹配0个或多个
队列和交换机绑定才可以使用通配符,发送消息时必须使用具体的路由名称
PRC模式
客户端同时是生产者和消费者,服务端者同时是生产者和消费者
客户端发送请求携带 reply_to 表示服务端收到消息后转发的队列名 以及 correlation_id 表示该消息的唯一id
RabbitMQ消息的事务机制
- 通过AMQP事务机制实现;
- 通过将channel设置成confirm模式来实现
两种事务控制形式不能同时开启
通过AMQP事务机制实现
- txSelect() 将当前channel设置成transaction模式,即开启事务
- txCommit() 提交事务
- txRollback() 回滚事务
在通过 txSelect() 开启事务之后,便可以发布消息给 broker 代理服务器了,如果 txCommit() 提交成功了,则消息一定到达了 broker 了,如果在 txCommit() 执行之前 broker 异常崩溃或者由于其他原因抛出异常,这个时候便可以捕获异常通过 txRollback() 回滚事务。
缺点:降低RabbitMQ的性能
confirm确认模式
生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;
如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置 basicAck 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。
confirm模式最大的好处在于它是可以 异步 的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。
在 channel 被设置成 confirm 模式之后,所有被 publish 的后续消息都将被 confirm(即 ack)或者 nack 一次,但是没有对消息被 confirm 的快慢做任何保证,并且同一条消息不会既被 confirm 又被 nack。
实现方式
1 |
|
普通 confirm 模式
这是一种简单的确认方式,它是一种 同步 确认发布的方式,也就是发布一个消息之后,只有收到了确认发布,后续的消息才能继续发布,waitForConfirms() 这个方法在消息被确认的时候返回true,如果在指定时间范围内,这个消息没有被确认那么它将返回false。
waitForConfirmsOrDie() 这个方法只有在消息被确认的时候才返回,如果在指定时间范围内,这个消息没有被确认那么它将抛出异常。
这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有收到确认发布的消息,就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。
批量 confirm 模式
先发布一批消息,然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是 同步的,也一样阻塞消息的发布。
异步 confirm 模式
channel 利用回调函数 ConfirmListener() 来达到消息可靠性传递的,这个回调函数只包含 deliveryTag(当前Channel发出的消息序号) ,需要自己手动维护一个 unconfirm 的消息序号集合,每发布一条消息,集合中元素+1 , 每回调一次
handleAck
方法,unconfirm集合删除相应的一条或多条记录1
2
3
4
5
6
7
8
9
10//信道添加监听回调函数
channel.addConfirmListener(new ConfirmListener(){
//重写 handleAck 回调方法
//deliveryTag 消息发送序号
//multiple 是否是多条
@Override
public void handleAck(long deliveryTag , boolean multiple) throws IOException{
处理逻辑
}
})
SpringAMQP
Spring AMQP 是对 Spring 基于 AMQP 的消息收发解决方案,它是一个抽象层,不依赖于特定的 AMQP Broker 实现和客户端的抽象,所以可以很方便地替换。
邮件发送
邮箱中开启SMTP服务
依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.6.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
<version>2.6.4</version>
</dependency>
<dependency>
<groupId>com.boyolo</groupId>
<artifactId>yeb-server</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
</dependencies>
application.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
server:
#端口
port: 8082
spring:
#邮件配置
mail:
#邮件服务器地址
host: smtp.163.com
#协议
protocol: smtp
#编码格式
default-encoding: utf-8
#授权码(在邮箱开通服务时获取)
password: NBFXNQFSTKVDEBTN
#发送者邮箱地址
username: beau_renbo@163.com
#端口(不同邮箱端口号不同)
port: 25
#rabbitmq配置
rabbitmq:
#用户名
username: guest
#密码
password: guest
#服务器地址
host: localhost
#端口
port: 5672
listener:
simple:
#开启手动确认
acknowledge-mode: manual
redis:
#超时时间
timeout: 10000ms
#服务器地址
host: 127.0.0.1
#服务器端口
port: 6379
#数据库
database: 0
password: buzhidao
lettuce:
pool:
#最大连接数
max-active: 1024
#最大连接阻塞等待时间
max-wait: 10000ms
#最大空闲连接
max-idle: 200
#最小空闲连接
min-idle: 5
设置邮件发送常量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.boyolo.server.pojo;
/**
* @author renbo
*/
public class MailConstants {
/**
* 消息投递中
*/
public static final Integer DELIVERING = 0;
/**
* 消息投递成功
*/
public static final Integer SUCCESS = 1;
/**
* 消息投递失败
*/
public static final Integer FAILURE = 2;
/**
* 最大尝试次数
*/
public static final Integer MAX_TRY_COUNT = 3;
/**
* 消息超时时间
*/
public static final Integer MSG_TIMEOUT = 1;
/**
* 对列
*/
public static final String MAIL_QUEUE_NAME = "mail.queue";
/**
* 交换机
*/
public static final String MAIL_EXCHANGE_NAME = "mail.exchange";
/**
* 路由键
*/
public static final String MAIL_ROUTING_KEY_NAME = "mail.routing.key";
}
邮件接收
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package com.boyolo.mail;
import com.boyolo.server.pojo.Employee;
import com.boyolo.server.pojo.MailConstants;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.mail.MailProperties;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.javamail.MimeMessageHelper;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;
import org.thymeleaf.TemplateEngine;
import org.thymeleaf.context.Context;
import javax.mail.internet.MimeMessage;
import java.io.IOException;
import java.util.Date;
/**
* @author renbo
*/
@Component
public class MailReceiver {
//日志
private static final Logger LOGGER = LoggerFactory.getLogger(MailReceiver.class);
//邮件发送
@Autowired
private JavaMailSender javaMailSender;
//邮件配置
@Autowired
private MailProperties mailProperties;
//模版引擎 负责把对象数据呈现成文本数据
/*
1. 指令简单 - 内置不超过10个常用的指令.
2. 基于XPath - 直接使用XPath查找对象,并且可以使用XPath强大的语法和函数.
3. 容易扩展 - 无论是指令还是XPath函数都可以自定义.
4. 解析超快 - 基于其语法特点,解析模板结构非常快.
*/
@Autowired
private TemplateEngine templateEngine;
@Autowired
private RedisTemplate redisTemplate;
@RabbitListener(queues = MailConstants.MAIL_QUEUE_NAME)
public void handler(Message message, Channel channel) {
Employee employee = (Employee) message.getPayload();
MessageHeaders headers = message.getHeaders();
//消息序号
long tag = (long) headers.get(AmqpHeaders.DELIVERY_TAG);
String msgId = (String) headers.get("spring_returned_message_correlation");
HashOperations hashOperations = redisTemplate.opsForHash();
try {
if (hashOperations.entries("mail_log").containsKey(msgId)) {
LOGGER.error("消息已经被消费==========>{}", msgId);
/**
* 手动确认消息
* tag:消息序号
* multiple:是否确认多条
*/
channel.basicAck(tag, false);
return;
}
MimeMessage msg = javaMailSender.createMimeMessage();
MimeMessageHelper helper = new MimeMessageHelper(msg);
//发件人
helper.setFrom(mailProperties.getUsername());
//收件人
helper.setTo(employee.getEmail());
//主题
helper.setSubject("入职欢迎邮件");
//发送日期
helper.setSentDate(new Date());
//邮件内容
Context context = new Context();
context.setVariable("name", employee.getName());
context.setVariable("posName", employee.getPosition().getName());
context.setVariable("jobLevelName", employee.getJoblevel().getName());
context.setVariable("departmentName", employee.getDepartment().getName());
String mail = templateEngine.process("mail", context);
helper.setText(mail, true);
javaMailSender.send(msg);
LOGGER.info("邮件发送成功");
//将消息Id存入redis
hashOperations.put("mail_log", msgId, "OK");
//手动确认消息
channel.basicAck(tag, false);
} catch (Exception e) {
try {
/**
* 手动确认消息
* tag:消息序号
* multiple:是否确认多条
* requeue:是否退回队列
*/
channel.basicNack(tag, false, true);
} catch (IOException ioException) {
LOGGER.error("邮件发送失败========>{}", e.getMessage());
}
LOGGER.error("邮件发送失败========>{}", e.getMessage());
}
}
}
EmployeeServiceImpl.java 添加员工发送消息时消息可靠性–消息落库
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
if (1 == employeeMapper.insert(employee)) {
Employee emp = employeeMapper.getEmployee(employee.getId()).get(0);
//数据库记录发送的消息
String msgId = UUID.randomUUID().toString();
// String msgId = "123456";
MailLog mailLog = new MailLog();
mailLog.setMsgId(msgId);
mailLog.setEid(employee.getId());
mailLog.setStatus(0);
mailLog.setRouteKey(MailConstants.MAIL_ROUTING_KEY_NAME);
mailLog.setExchange(MailConstants.MAIL_EXCHANGE_NAME);
mailLog.setCount(0);
mailLog.setTryTime(LocalDateTime.now().plusMinutes(MailConstants.MSG_TIMEOUT));
mailLog.setCreateTime(LocalDateTime.now());
mailLog.setUpdateTime(LocalDateTime.now());
mailLogMapper.insert(mailLog);
//发送信息
rabbitTemplate.convertAndSend(MailConstants.MAIL_EXCHANGE_NAME, MailConstants.MAIL_ROUTING_KEY_NAME, emp, new CorrelationData(msgId));
return RespBean.success("添加成功!");
RabbitMQ配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package com.boyolo.server.config;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.boyolo.server.pojo.MailConstants;
import com.boyolo.server.pojo.MailLog;
import com.boyolo.server.service.IMailLogService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMq 配置类
*
* @author renbo
*/
@Configuration
public class RabbitMqConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMqConfig.class);
@Autowired
private CachingConnectionFactory cachingConnectionFactory;
@Autowired
private IMailLogService mailLogService;
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
/**
* 消息确认回调,确认消息是否到达broker
* data:消息唯一标识
* ack:确认结果
* cause:失败原因
*/
rabbitTemplate.setConfirmCallback((data, ack, cause) -> {
String msgId = data.getId();
if (ack) {
LOGGER.info("{}========>消息发送成功", msgId);
mailLogService.update(new UpdateWrapper<MailLog>().set("status", 1).eq("msgID", msgId));
} else {
LOGGER.error("{}========>消息发送失败", msgId);
}
});
/**
* 消息失败回调 比如route不到queue
* smg:消息主题
* repCode:响应码
* repTExt:响应,描述
* exchange:交换机
* routingkey:路由键
*/
// rabbitTemplate.setReturnCallback((msg, repCode, repText, exchange, routingkey)
rabbitTemplate.setReturnsCallback((msg) -> {
LOGGER.error("{}========>消息发送queue时失败",msg.getMessage().getBody());
});
return rabbitTemplate;
}
@Bean
public Queue queue() {
return new Queue(MailConstants.MAIL_QUEUE_NAME);
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange(MailConstants.MAIL_EXCHANGE_NAME);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(directExchange()).with(MailConstants.MAIL_ROUTING_KEY_NAME);
}
}
邮件发送定时任务–失败重传
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.boyolo.server.task;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.boyolo.server.pojo.Employee;
import com.boyolo.server.pojo.MailConstants;
import com.boyolo.server.pojo.MailLog;
import com.boyolo.server.service.IEmployeeService;
import com.boyolo.server.service.IMailLogService;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.List;
/**
* 邮件发送定时任务
*
* @author renbo
*/
@Component
public class MailTask {
@Autowired
private IMailLogService mailLogService;
@Autowired
private IEmployeeService employeeService;
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 邮件发送定时任务
* 10s一次
*/
@Scheduled(cron = "0/10 * * * * ?")
public void mailTask() {
List<MailLog> list = mailLogService.list(new QueryWrapper<MailLog>().eq("status", 0).lt("tryTime", LocalDateTime.now()));
list.forEach(mailLog -> {
//如果重试次数超过三次,更新状态为投递失败,不再重试
if (3 <= mailLog.getCount()){
mailLogService.update(new UpdateWrapper<MailLog>().set("status",2).eq("msgId",mailLog.getMsgId()));
}
mailLogService.update(new UpdateWrapper<MailLog>().set("count",mailLog.getCount()+1).set("updateTime",LocalDateTime.now()).set("tryTime",LocalDateTime.now().plusMinutes(MailConstants.MSG_TIMEOUT)).eq("msgId",mailLog.getMsgId()));
Employee employee = employeeService.getEmployee(mailLog.getEid()).get(0);
//重新发送消息
rabbitTemplate.convertAndSend(MailConstants.MAIL_EXCHANGE_NAME,MailConstants.MAIL_ROUTING_KEY_NAME,employee,new CorrelationData(mailLog.getMsgId()));
});
}
}
RabbitMQ消息可靠性
保证消息百分百发送到消息队列中去
- 保证mq节点成功接受消息
- 消息发送端需要接受到mq服务端接受到消息的确认应答
- 完善的消息补偿机制,发送失败的消息可以再感知并⼆次处理
确保生产者发送的消息被rabbitmq接收了;
确保队列中的消息能够持久化;
创建 queue 的时候将其设置为持久化
这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是它是不会持久化 queue 里的数据的。
第二个是发送消息的时候将消息的 deliveryMode 设置为 2
就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。
必须要同时设置这两个持久化才行
确保消费者已经把消息处理完了。
RabbitMQ提供了消息确认机制
消费者在订阅队列时,可以指定autoAck参数,
当autoAck等于false时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除)。
当autoAck等于true时,RabbitMQ会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正的消费到了这些消息。
RabbitMQ不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开,这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很久很久。
- 本文作者:bobo
- 本文链接:https://boyolo.github.io/article/33708.html
- 版权声明:本博客所有文章均采用 BY-NC-SA 许可协议,转载请注明出处!