作用

  • 解耦

  • 异步

  • 削峰

缺点

  • 系统可用性降低:需要保证消息中间件的正常

  • 系统复杂性提高:需要保证消息的正确使用,重复消费、丢失、顺序问题、一致性问题等都需要考虑

常见消息中间件对比

特性

ActiveMQ

RabbitMQ

RocketMQ

Kafka

语言

Java

Erlang

Java

Scala

单机吞吐

十万

十万

时效性

ms

us

ms

ms(以内)

可用性

高(主从架构)

高(主从架构)

非常高 (分布式架构)

非常高 (分布式架构)

功能特性

成熟的产品,在很多公司得到应用;有较多的文档;各种协议支持较好

基于erlang开发,所以并发能力很强,性能极其好,延时很低;管理界面较丰富

MQ功能比较完备,扩展性佳

只支持主要的MQ功能,像一些消息查询,消息回溯等功能没有提供,毕竟是为大数据准备的,在大数据领域应用广。

安装

Erlang

1 下载 erlang 安装包

在官网下载然后上传到 Linux 上或者直接使用下面的命令下载对应的版本。

1
[root@SnailClimb local]#wget https://erlang.org/download/otp_src_19.3.tar.gz

erlang 官网下载:https://www.erlang.org/downloads

2 解压 erlang 安装包

[root@SnailClimb local]#tar -xvzf otp_src_19.3.tar.gz

3 删除 erlang 安装包

[root@SnailClimb local]#rm -rf otp_src_19.3.tar.gz

4 安装 erlang 的依赖工具

[root@SnailClimb local]#yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel unixODBC-devel

5 进入erlang 安装包解压文件对 erlang 进行安装环境的配置

新建一个文件夹

[root@SnailClimb local]# mkdir erlang

对 erlang 进行安装环境的配置

[root@SnailClimb otp_src_19.3]# 
./configure --prefix=/usr/local/erlang --without-javac

6 编译安装

[root@SnailClimb otp_src_19.3]# 
make && make install

7 验证一下 erlang 是否安装成功了

[root@SnailClimb otp_src_19.3]# ./bin/erl

运行下面的语句输出“hello world”

io:format("hello world~n", []).

8 配置 erlang 环境变量

[root@SnailClimb etc]# vim profile

追加下列环境变量到文件末尾

#erlang
ERL_HOME=/home/jungle/erlang/otp_src_19.3/erlang
PATH=$ERL_HOME/bin:$PATH
export ERL_HOME PATH

运行下列命令使配置文件profile生效

1
[root@SnailClimb etc]# source /etc/profile

输入 erl 查看 erlang 环境变量是否配置正确

[root@SnailClimb etc]# erl

RabbitMQ

1. 下载rpm

wget https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.8/rabbitmq-server-3.6.8-1.el7.noarch.rpm

或者直接在官网下载

https://www.rabbitmq.com/install-rpm.html

2. 安装rpm

rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc

紧接着执行:

yum install rabbitmq-server-3.6.8-1.el7.noarch.rpm

中途需要你输入”y”才能继续安装。

3 开启 web 管理插件

rabbitmq-plugins enable rabbitmq_management

4 设置开机启动

chkconfig rabbitmq-server on

5. 启动服务

service rabbitmq-server start

6. 查看服务状态

service rabbitmq-server status

工作模式

简单模式

工作队列

Work queues

发布/订阅模式

Publish/Subscribe

路由模式

Routing

主题模式

Topics

远程调用

RPC

发送确认

Publisher Confirms

高可用

RabbitMQ分为三种模式:单机普通集群镜像集群

单位:生产不用

普通集群:创建的queue仅保存在一台实例,其他实例拉取该实例的元数据(配置信息,可通过此找到数据真正存放处)

  • 如果消费时连接的不是queue信息保存的该实例,则连接的实例会通过元数据(配置信息)去拉取queue信息,拉取数据有消耗

  • 如果保证消费的queue一直连接固定实例,那就是单机了

  • queue数据仅保存在一台实例,遇到宕机,得等到实例恢复,没用高可用

镜像集群:每个实例都有全部元数据queue

  • 可以保证高可用,但是每次消息更新都需要同步到所有实例,会有很高的带宽消耗,queue的存储大小超过了实例容量,也无法进行扩展

重复消费(保证幂等)

具体业务考虑

可靠性(不丢失)

生产者 -> RabbitMQ ->消费者

生产者弄丢消息

解决方式一:开启事务

try {
    // 通过工厂创建连接
    connection = factory.newConnection();
    // 获取通道
    channel = connection.createChannel();
    // 开启事务
    channel.txSelect();

    // 这里发送消息
    channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());

    // 模拟出现异常
    int result = 1 / 0;

    // 提交事务
    channel.txCommit();
} catch (IOException | TimeoutException e) {
    // 捕捉异常,回滚事务
    channel.txRollback();
}

​ 同步方法,吞吐量太小

解决方式二:使用confirm模式

1.普通 confirm 模式:每发送一条消息后,调用 waitForConfirms() 方法,等待服务器端 confirm,如果服务端返回 false 或者在一段时间内都没返回,客户端可以进行消息重发。

channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
if (!channel.waitForConfirms()) {
    // 消息发送失败
    // ...
}

2.批量 confirm 模式:每发送一批消息后,调用 waitForConfirms() 方法,等待服务端 confirm。

channel.confirmSelect();
for (int i = 0; i < batchCount; ++i) {
    channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
}
if (!channel.waitForConfirms()) {
    // 消息发送失败
    // ...
}

3.异步 confirm 模式:提供一个回调方法,服务端 confirm 了一条或者多条消息后客户端会回调这个方法。

SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        if (multiple) {
            confirmSet.headSet(deliveryTag + 1).clear();
        } else {
            confirmSet.remove(deliveryTag);
        }
    }

    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
        if (multiple) {
            confirmSet.headSet(deliveryTag + 1).clear();
        } else {
            confirmSet.remove(deliveryTag);
        }
    }
});

while (true) {
    long nextSeqNo = channel.getNextPublishSeqNo();
    channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
    confirmSet.add(nextSeqNo);
}

RabbitMQ弄丢消息

  • 创建queue的时候设置为持久化,持久化queue的元数据(不会持久化queue里数据)

  • 发送消息时,设置deliveryMode为2,持久化消息

消费者弄丢消息

使用ack机制,在声明队列时,设置noAck=false 使RabbitMQ在收到消费者ack信号后才会在队列中删除消息

保证顺序

每个queue一个consumer