作用
缺点
常见消息中间件对比
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|
语言 | Java | Erlang | Java | Scala |
单机吞吐 | 万 | 万 | 十万 | 十万 |
时效性 | ms | us | ms | ms(以内) |
可用性 | 高(主从架构) | 高(主从架构) | 非常高 (分布式架构) | 非常高 (分布式架构) |
功能特性 | 成熟的产品,在很多公司得到应用;有较多的文档;各种协议支持较好 | 基于erlang开发,所以并发能力很强,性能极其好,延时很低;管理界面较丰富 | MQ功能比较完备,扩展性佳 | 只支持主要的MQ功能,像一些消息查询,消息回溯等功能没有提供,毕竟是为大数据准备的,在大数据领域应用广。 |
安装
Erlang
1 下载 erlang 安装包
在官网下载然后上传到 Linux 上或者直接使用下面的命令下载对应的版本。
1
| [root@SnailClimb local]#wget https://erlang.org/download/otp_src_19.3.tar.gz
|
erlang 官网下载:https://www.erlang.org/downloads
2 解压 erlang 安装包
[root@SnailClimb local]#tar -xvzf otp_src_19.3.tar.gz
|
3 删除 erlang 安装包
[root@SnailClimb local]#rm -rf otp_src_19.3.tar.gz
|
4 安装 erlang 的依赖工具
[root@SnailClimb local]#yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel unixODBC-devel
|
5 进入erlang 安装包解压文件对 erlang 进行安装环境的配置
新建一个文件夹
[root@SnailClimb local]# mkdir erlang
|
对 erlang 进行安装环境的配置
[root@SnailClimb otp_src_19.3]#
./configure --prefix=/usr/local/erlang --without-javac
|
6 编译安装
[root@SnailClimb otp_src_19.3]#
make && make install
|
7 验证一下 erlang 是否安装成功了
[root@SnailClimb otp_src_19.3]# ./bin/erl
|
运行下面的语句输出“hello world”
io:format("hello world~n", []).
|
8 配置 erlang 环境变量
[root@SnailClimb etc]# vim profile
|
追加下列环境变量到文件末尾
#erlang
ERL_HOME=/home/jungle/erlang/otp_src_19.3/erlang
PATH=$ERL_HOME/bin:$PATH
export ERL_HOME PATH
|
运行下列命令使配置文件profile
生效
1
| [root@SnailClimb etc]# source /etc/profile
|
输入 erl 查看 erlang 环境变量是否配置正确
[root@SnailClimb etc]# erl
|
RabbitMQ
1. 下载rpm
wget https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.8/rabbitmq-server-3.6.8-1.el7.noarch.rpm
|
或者直接在官网下载
https://www.rabbitmq.com/install-rpm.html
2. 安装rpm
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
|
紧接着执行:
yum install rabbitmq-server-3.6.8-1.el7.noarch.rpm
|
中途需要你输入”y”才能继续安装。
3 开启 web 管理插件
rabbitmq-plugins enable rabbitmq_management
|
4 设置开机启动
chkconfig rabbitmq-server on
|
5. 启动服务
service rabbitmq-server start
|
6. 查看服务状态
service rabbitmq-server status
|
工作模式
简单模式
工作队列
Work queues
发布/订阅模式
Publish/Subscribe
路由模式
Routing
主题模式
Topics
远程调用
RPC
发送确认
Publisher Confirms
高可用
RabbitMQ分为三种模式:单机
、普通集群
、镜像集群
单位
:生产不用
普通集群
:创建的queue
仅保存在一台实例,其他实例拉取该实例的元数据(配置信息,可通过此找到数据真正存放处)
如果消费时连接的不是queue
信息保存的该实例,则连接的实例会通过元数据(配置信息)去拉取queue信息,拉取数据有消耗
如果保证消费的queue
一直连接固定实例,那就是单机了
queue
数据仅保存在一台实例,遇到宕机,得等到实例恢复,没用高可用
镜像集群
:每个实例都有全部元数据
和queue
重复消费(保证幂等)
具体业务考虑
可靠性(不丢失)
生产者 -> RabbitMQ ->消费者
生产者弄丢消息
解决方式一:开启事务
try {
// 通过工厂创建连接
connection = factory.newConnection();
// 获取通道
channel = connection.createChannel();
// 开启事务
channel.txSelect();
// 这里发送消息
channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
// 模拟出现异常
int result = 1 / 0;
// 提交事务
channel.txCommit();
} catch (IOException | TimeoutException e) {
// 捕捉异常,回滚事务
channel.txRollback();
}
|
同步方法,吞吐量太小
解决方式二:使用confirm模式
1.普通 confirm 模式:每发送一条消息后,调用 waitForConfirms()
方法,等待服务器端 confirm,如果服务端返回 false 或者在一段时间内都没返回,客户端可以进行消息重发。
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
if (!channel.waitForConfirms()) {
// 消息发送失败
// ...
}
|
2.批量 confirm 模式:每发送一批消息后,调用 waitForConfirms()
方法,等待服务端 confirm。
channel.confirmSelect();
for (int i = 0; i < batchCount; ++i) {
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
}
if (!channel.waitForConfirms()) {
// 消息发送失败
// ...
}
|
3.异步 confirm 模式:提供一个回调方法,服务端 confirm 了一条或者多条消息后客户端会回调这个方法。
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
});
while (true) {
long nextSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
confirmSet.add(nextSeqNo);
}
|
RabbitMQ弄丢消息
消费者弄丢消息
使用ack
机制,在声明队列时,设置noAck=false
使RabbitMQ在收到消费者ack信号后才会在队列中删除消息
保证顺序
每个queue一个consumer