消息队列
队列我们都知道是一种数据结构,从头部插入,从尾部删除。而消息队列就是在这种结构上进行完善而来,消息队列可以帮助程序实现解耦、异步、削峰,从而实现程序的高可用。
说到消息队列,就会有两个概念:生产者和消费者。这两个概念也是非常简单的,就和现实生活中的供给、需求是一样的。生产者生产产品,消费者消费产品。
解耦就是降低耦合度,假如我们需要给多个系统发送消息,就可以引入 MQ 进行处理,我们的系统只用把消息推送到 MQ 中,然后让其他系统订阅我们的主题,只要订阅了,就可以从 MQ 中获取到数据。这样我们就不用考虑调用其它系统时会发生的种种问题了,只要确保消息能够正确到达 MQ 中即可。
异步就非常简单了,现在的语言基本都有异步编程框架了。
削峰就是流量高峰期把 “流量信息” 都发送到 MQ 中,然后后续根据消费者的速度去慢慢处理,知道这些信息被消费者处理完。
典型的就是电商,时不时的搞一些大促(秒杀活动),流量会高于平时几十倍几百倍…大促的时候,举个例子假如是 5000 单,如果说下单要实时操作数据库,假设数据库最大承受每秒 2000 单,那么促销的时候数据库会挂掉导致系统直接不可用,那是多么严重的事情。 所以在这种场景下使用 MQ 完美的解决了这个问题,下游系统下单时只需要往 MQ 里发消息,我的订单系统可以设定消费的频率,比如每秒我就消费 2000 个消息(在数据库的可承受范围),不管你下游系统每秒下多少单,我都保持这个速率,既不会影响订单系统的数据库,也不影响下游系统的下单操作,很好的保护了系统,也提高了下单的吞吐量。
一般来说,大促会持续几分钟,往多了就说 4 个小时吧,每秒 5000,4 小时 7200W 单往 MQ 里写,订单系统每秒消费 2000 单,大促过后,MQ 里会积压 4320W 个消息,剩下的就慢慢消费呗。当然了,大促的时候肯定会临时申请加机器的,每秒消费可能不止2000。
这就是削峰,将某一段时间的超高流量分摊到更长的一段时间内去消化,避免了流量洪峰击垮系统。
想必大家随着开发经验的增长,都会意识到一样东西:引入一个中间件,系统的维护复杂度就会高起来。这也是没办法的。另外我们要确保消息的一致性以及 MQ 系统的高可用,很多系统都依赖于此,MQ 系统是不能出现问题的,一旦出现问题,就会发生“多米诺骨牌”效应。
RabbitMQ
RabbitMQ是轻量级的,在AMQP(Advanced Message Queuing Protocol )基础上实现的,易于在本地和云中部署。它支持多种消息传递协议。RabbitMQ 可以部署在分布式和联合配置中,以满足高规模、高可用性的需求。
一、介绍
RabbitMQ 在许多操作系统和云环境上运行,并为大多数流行语言提供了广泛的开发人员工具。比如提供了 Java、Spring Framework、.NET、Ruby、Python、PHP、JS、GO等语言的客户端、适配器和工具。
1. RabbitMQ的几种模型
graph LR
%% 最简单的模型
subgraph 简单模型
producer((P)) --> queue[[消息队列]] --> consumer((C))
end
%% 工作队列:在工作者之间分配任务(竞争使用者模式)
subgraph 工作队列
producer2((P)) -->
queue2[[消息队列]] --> consumer2-1((C1))
queue2[[消息队列]] --> consumer2-2((C2))
end
%% 发布订阅:一次向多个消费者发送消息
subgraph 发布/订阅
producer3((P)) --> exchange3((X))
exchange3 --> queue3-1[[消息队列]] --> consumer3-1((C1))
exchange3 --> queue3-2[[消息队列]] --> consumer3-2((C2))
end
%% 路由:有选择地接收消息
subgraph 路由
producer4((P)) --> exchange4((X))
exchange4 --error--> queue4-1[[消息队列]] --> consumer4-1((C1))
exchange4 --info--> queue4-2[[消息队列]] --> consumer4-2((C2))
exchange4 --error--> queue4-2[[消息队列]]
exchange4 --warning--> queue4-2[[消息队列]]
end
%% 主题:基于模式接收消息(主题)
subgraph 主题
producer5((P)) --> exchange5((X))
exchange5 --*.exchange--> queue5-1[[消息队列]] --> consumer5-1((C1))
exchange5 --> queue5-2[[消息队列]] --> consumer5-2((C2))
exchange5 --*.type1--> queue5-2[[消息队列]]
exchange5 --*.ly--> queue5-2[[消息队列]]
end
%% RPC
subgraph RPC
producer6((P)) --> exchange6-1[request] --> queue6-1[[消息队列]] --> consumer6((C))
--> queue6-2[[消息队列]] --> exchange6-2[replay] --> producer6
end
二、操作
RabbitMQ的常用命令
rabbitmqctl list_users # 查看用户列表
rabbitmqctl add_user admin 123456 #添加用户名和密码
rabbitmqctl set_permissions -p /admin".*" ".*" ".*" #修改权限
rabbitmqctl set_user_tags admin administrator #添加用户角色
#守护模式启动(后台运行)
rabbitmq-server -detached
# 停止服务
rabbitmqctl stop
# 查看状态
rabbitmqctl status
# 重启 rabbitmq 服务
rabbitmq-server restart
启用插件的同时并启动服务:rabbitmq-plugins enable rabbitmq_management,这个插件是 RabbitMQ 自带的,便于我们查看系统的状态数据,启用这个插件之后才可以使用 http://IP地址:15672 访问网页管理页面,查看具体信息。默认登录的初始账号密码为 guest。
注意: 在较新的版本中,默认的账号只能以本地
localhost的方式访问,如果服务在远程启动,本地进行远程访问操作时登录可能会出现User can only log in via localhost的情况,解决办法就是新添加一个超级管理员账户,使用这个新添加的账户登录。 5672 用于客户端使用,15672 用于网页控制。
镜像队列:(使用最多)
# 策略说明
rabbitmqctl set_policy [-p <vhost>] [--priority <priority>] [--apply-to <apply-to>] <name> <pattern> <definition>
# 查看当前策略:
rabbitmqctl list_policies
# 删除队列:
rabbitmqctl clear_policy ha-all
-
-p vhost 可选参数,针对指定 vhost 下的 queue 进行设置 name: policy 的名称 pattern: queue 的匹配模式(正则表达式) definition: 镜像定义,包括三个部分ha-mode、ha-params、ha-sync-mode
-
ha-mode 指明镜像队列的模式,有效值 all/exactly/nodes all 表示在集群所有的节点上进行镜像 exactly 表示在指定个数的节点上进行镜像,节点的个数由 ha-params 指定 nodes 表示在指定的节点上进行镜像,节点名通过 ha-params 指定
-
ha-params: ha-mode 模式需要用到的参数 ha-sync-mode: 进行队列中消息的同步方式,有效值为 automatic 和 manual priority: 可选参数,policy 的优先级。

说些什么吧!